In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import glob
from PIL import Image
import numpy as np
import random
from sklearn.model_selection import train_test_split
from google.colab import files
import torch.nn.functional as F
from sklearn.metrics import classification_report
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, WeightedRandomSampler

# Donwload the df from kaggle

In [None]:
files.upload()  # Upload the kaggle.json file

In [None]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/

In [None]:
!chmod 600 ~/.kaggle/kaggle.json

# Replace 'tolgadincer/labeled-chest-xray-images' with the dataset's Kaggle API link
!kaggle datasets download -d tolgadincer/labeled-chest-xray-images

# Unzip the downloaded dataset
!unzip -q labeled-chest-xray-images.zip -d chest_xray_dataset

In [None]:
train_normal = glob.glob('/content/chest_xray_dataset/chest_xray/train/NORMAL/*')
train_pneumonia = glob.glob('/content/chest_xray_dataset/chest_xray/train/PNEUMONIA/*')

test_normal = glob.glob('/content/chest_xray_dataset/chest_xray/test/NORMAL/*')
test_pneumonia = glob.glob('/content/chest_xray_dataset/chest_xray/test/PNEUMONIA/*')

In [None]:
train_paths = train_normal + train_pneumonia
train_labels = [0] * len(train_normal) + [1] * len(train_pneumonia)

test_paths = test_normal + test_pneumonia
test_labels = [0] * len(test_normal) + [1] * len(test_pneumonia)

train_paths_split, val_paths, train_labels_split, val_labels = train_test_split(train_paths, train_labels, stratify=train_labels, test_size = 0.1, random_state=42)

# Create data loaders

In [None]:
class CustomXrayDataset(Dataset):
    def __init__(self, labels, paths, transform=None, target_transform=None):
        self.img_labels = labels
        self.img_paths = paths
        self.transform = transform
        self.target_transform = target_transform

        # Calculate class counts
        self.class_counts = torch.bincount(torch.tensor(labels))

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        image = Image.open(img_path).convert('L')
        label = self.img_labels[idx]
        label = torch.tensor(label)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label


## Train

In [None]:
# Assuming you have already defined your CustomXrayDataset class and the necessary variables like train_labels, train_paths, etc.
batch_size = 64
# Define transformations
resize_transform = transforms.Resize((64, 64))
normalize = transforms.Normalize(mean=[0.5], std=[0.5])

# Combine the transformations into a single Compose object
train_transform = transforms.Compose([
    resize_transform,
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(180),  # Rotate images randomly up to 10 degrees
    transforms.ToTensor(),
    normalize
])

# Create train_dataset with the specified transformations
train_dataset = CustomXrayDataset(labels=train_labels_split, paths=train_paths_split, transform=train_transform)

# Calculate class weights
class_weights = [1 / (train_dataset.class_counts[i] + 1e-6) for i in range(len(train_dataset.class_counts))]
# Convert class weights to tensor
class_weights_tensor = torch.tensor(class_weights)

sample_weights = [class_weights_tensor[label] for label in train_dataset.img_labels]

# Use WeightedRandomSampler to handle class imbalance
sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(train_dataset), replacement=True)
# Create DataLoader with the specified sampler
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)

## Validation and test

In [None]:
resize_transform = transforms.Resize((64, 64))
normalize = transforms.Normalize(mean=[0.5],  # Assuming single channel for black and white images
                                 std=[0.5])

# Combine the transformations into a single Compose object
transform = transforms.Compose([
    resize_transform,
    transforms.ToTensor(),
    normalize
])

In [None]:
val_dataset = CustomXrayDataset(labels=val_labels, paths=val_paths, transform=transform)
test_dataset = CustomXrayDataset(labels=test_labels, paths=test_paths, transform=transform)

In [None]:
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

# CNN

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(64)

        # Max pooling layer
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        # Fully connected layers
        self.fc1 = nn.Linear(64 * 8 * 8, 512)  # Adjusted input size after two max-pooling layers
        self.fc2 = nn.Linear(512, 128)
        self.fc3 = nn.Linear(128, 32)
        self.fc4 = nn.Linear(32, 1)

        # Dropout layer to prevent overfitting
        self.dropout = nn.Dropout(p=0.25)

        # ReLU activation function
        self.relu = nn.ReLU()
        self.prob_func = nn.Sigmoid()

    def weight_init(self, mean, std):
      for m in self._modules:
        if isinstance(self._modules[m], nn.ConvTranspose2d) or isinstance(self._modules[m], nn.Conv2d):
          self._modules[m].weight.data.normal_(mean, std)
          self._modules[m].bias.data.zero_()

    def forward(self, x):
        x = self.pool(self.relu(self.bn1(self.conv1(x))))
        x = self.pool(self.relu(self.bn2(self.conv2(x))))
        x = self.pool(self.relu(self.bn3(self.conv3(x))))
        # Flatten the output for the fully connected layers
        x = x.view(-1, 64 * 8 * 8)  # Adjusted size after two max-pooling layers

        x = self.dropout(self.relu(self.fc1(x)))
        x = self.dropout(self.relu(self.fc2(x)))
        x = self.dropout(self.relu(self.fc3(x)))
        x = self.prob_func(self.fc4(x))

        return x

net = Net()
net.train()


# Training loop


In [None]:
# Initialize loss function
criterion = nn.BCELoss()

# Initialize optimizer
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)

# Initialize weights
net.weight_init(mean=0.0, std=0.02)

# Initialize scheduler
scheduler = StepLR(optimizer, step_size=1, gamma=0.1)

In [None]:
# Create tables to plot learning curves
train_losses = []
val_losses = []

In [None]:
# Initialize loss function
criterion = nn.BCELoss()

# Initialize optimizer
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)

# Initialize weights
net.weight_init(mean=0.0, std=0.02)

# Initialize scheduler
scheduler = StepLR(optimizer, step_size=1, gamma=0.1)

# Evaluation

In [None]:
# Set your model to evaluation mode
net.eval()
device = 'cuda'
# Initialize lists to store predictions and true labels
all_predictions = []
all_targets = []

# Iterate over the validation dataloader
with torch.no_grad():
  for batch_idx, (inputs, targets) in enumerate(test_dataloader):
      # Move inputs and targets to the selected device
      inputs, targets = inputs.to(device), targets.to(device)

      # Forward pass
      outputs = net(inputs)

      # Convert outputs to binary predictions (0 or 1)
      predicted_classes = torch.round(outputs).squeeze().cpu().detach().numpy()

      # Append predictions and targets to the lists
      all_predictions.extend(predicted_classes)
      all_targets.extend(targets.cpu().numpy())

# Convert lists to NumPy arrays
all_predictions = np.array(all_predictions)
all_targets = np.array(all_targets)

# Generate the classification report
report = classification_report(all_targets, all_predictions)

# Print the classification report
print(report)

net.train()