In [1]:
import pandas as pd
import cv2
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os

import pytorch_lightning as pl
import torch.nn.functional as F
from torch import nn
from torch.optim import Adam
from torchvision.models import resnet50, ResNet50_Weights
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor
import torch


from PIL import Image
import multiprocessing

num_cpu_cores = multiprocessing.cpu_count()
num_cpu_cores



6

In [2]:
# to read .csv files from another directory
location = "detect-pneumonia-spring-2024/" # "/<path>"

df = pd.read_csv(location + "labels_train.csv")

class_id
1     762
0    1773
2    1793
Name: count, dtype: int64


In [None]:
if torch.cuda.is_available():
    epochs = 100, 
    accelerator = 'cuda',
    target_images_per_class = 10000
else:
    epochs = 10
    accelerator = 'cpu',
    target_images_per_class = 5000

# Get the count of images in each class
class_counts = df['class_id'].value_counts()
# Calculate the number of additional images needed for each class
additional_images_needed = target_images_per_class - class_counts
print(additional_images_needed)


In [4]:
# Define the augmentation function using OpenCV
def augment_image(image):
    # Random rotation
    angle = np.random.uniform(-10, 10)
    M = cv2.getRotationMatrix2D((image.shape[1] // 2, image.shape[0] // 2), angle, 1)
    image = cv2.warpAffine(image, M, (image.shape[1], image.shape[0]))

    # Random width and height shift
    tx = np.random.uniform(-0.1, 0.1) * image.shape[1]
    ty = np.random.uniform(-0.1, 0.1) * image.shape[0]
    M = np.float32([[1, 0, tx], [0, 1, ty]])
    image = cv2.warpAffine(image, M, (image.shape[1], image.shape[0]))

    # Random shear
    shear_factor = np.random.uniform(-0.2, 0.2)
    M = np.float32([[1, shear_factor, 0], [0, 1, 0]])
    image = cv2.warpAffine(image, M, (image.shape[1], image.shape[0]))

    # Random zoom
    zx, zy = np.random.uniform(0.9, 1.1, 2)
    image = cv2.resize(image, None, fx=zx, fy=zy, interpolation=cv2.INTER_LINEAR)

    # Random horizontal flip
    if np.random.rand() < 0.5:
        image = cv2.flip(image, 1)
    
    return image

# Define the custom dataset
class PneumoniaDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, target_images_per_class=10000):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        self.target_images_per_class = target_images_per_class
        
        # Calculate the number of images needed for each class
        self.class_counts = self.annotations['class_id'].value_counts()
        self.additional_images_needed = target_images_per_class - self.class_counts

        # Create lists of images per class
        self.image_paths_by_class = {class_id: [] for class_id in self.additional_images_needed.index}
        for idx, row in self.annotations.iterrows():
            self.image_paths_by_class[row['class_id']].append(row['file_name'])
        
    def __len__(self):
        return self.target_images_per_class * len(self.additional_images_needed)

    def __getitem__(self, idx):
        # Determine the class and the specific image index within that class
        class_id = idx // self.target_images_per_class
        img_idx = idx % self.target_images_per_class
        
        if img_idx < self.class_counts[class_id]:
            # Original image
            img_name = self.image_paths_by_class[class_id][img_idx]
            img_path = os.path.join(self.root_dir, img_name)
            image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        else:
            # Augmented image
            img_name = np.random.choice(self.image_paths_by_class[class_id])
            img_path = os.path.join(self.root_dir, img_name)
            image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
            image = augment_image(image)
        
        if self.transform:
            image = self.transform(image)
        
        label = class_id
        
        return image, label

# Define transformations
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# Create the dataset and dataloader
dataset = PneumoniaDataset(
    csv_file= location + 'labels_train.csv',
    root_dir= location + 'train_images/train_images',
    transform=transform,
    target_images_per_class= target_images_per_class
)

dataloader = DataLoader(dataset, batch_size=100, shuffle=False, num_workers= num_cpu_cores, pin_memory=True, persistent_workers=True)

# Example of iterating through the dataloader
for images, labels in dataloader:
    print(images.shape, labels.shape)
    break

torch.Size([100, 1, 224, 224]) torch.Size([100])


In [5]:
class PneumoniaModel(pl.LightningModule):
    def __init__(self, num_classes=3):
        super(PneumoniaModel, self).__init__()
        self.model = resnet50(weights=ResNet50_Weights.DEFAULT)
        self.model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = F.cross_entropy(outputs, labels)
        self.log('train_loss', loss)
        return loss

    def configure_optimizers(self):
        return Adam(self.parameters(), lr=1e-4)
    
# Define callbacks
early_stopping_callback = EarlyStopping(monitor='train_loss', patience=3, verbose=True, mode='min')
lr_monitor = LearningRateMonitor(logging_interval='epoch')


In [6]:
model = PneumoniaModel()

trainer = pl.Trainer(max_epochs=epochs, accelerator=accelerator)
trainer.fit(model, dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
2024-05-29 13:03:40.428941: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-29 13:03:40.526036: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-29 13:03:40.950271: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.

  | Name  | Type   | Params
---------------------------------
0 | model | ResNet | 23.5 M
---------------------------------
23.5 M    Trainable params
0         Non-trainable params
23.5 M    Total params
94.032    Total estimated model params size (MB

Epoch 9: 100%|██████████| 90/90 [29:37<00:00,  0.05it/s, v_num=5]

`Trainer.fit` stopped: `max_epochs=10` reached.


Epoch 9: 100%|██████████| 90/90 [29:39<00:00,  0.05it/s, v_num=5]


* 10 epochs completed in 260 minutes and 38.2 seconds with intel i5 8400 with an accuract = 0.79109.

In [7]:
# Specify the directory and file name to save the model
model_save_path = location + "models/resnet50_v1_cpu.pth"

# Save the trained model
torch.save(model.state_dict(), model_save_path)


In [8]:
# Define the transform to preprocess the images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Define a function to predict the class for a single image
def predict_image(model, image_path, transform):
    image = Image.open(image_path).convert('L')  # Ensure the image is grayscale
    image = transform(image).unsqueeze(0)  # Add batch dimension
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():  # Disable gradient tracking during inference
        outputs = model(image)  # Forward pass
        _, predicted = torch.max(outputs, 1)  # Get predicted label
    return predicted.item()

# Path to the folder containing testing images
test_folder = location + 'test_images/test_images'

# Initialize model
num_classes = 3  # Update with the number of classes in your dataset
model = PneumoniaModel(num_classes=num_classes)

# Try to load the model state dictionary
try:
    state_dict = torch.load(model_save_path)
    model.load_state_dict(state_dict)
    print("Loaded model state dictionary successfully.")
except Exception as e:
    print(f"Error loading the model state dictionary: {e}")

# Iterate over testing images and make predictions
predictions = []
file_names = []
for file_name in os.listdir(test_folder):
    image_path = os.path.join(test_folder, file_name)
    predicted_label = predict_image(model, image_path, transform)
    predictions.append(predicted_label)
    file_names.append(file_name)

# Create a DataFrame with file names and predicted labels
results_df = pd.DataFrame({'file_name': file_names, 'class_id': predictions})

# Save the DataFrame to a CSV file
results_df.to_csv(location + 'results/labels_test_cpu_v1.csv', index=False)

Loaded model state dictionary successfully.
