In [1]:
Dataset_url = "https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia"

In [2]:
# Upload Kaggle json file
!mkdir ~/.kaggle/
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [3]:
!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia

Dataset URL: https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia
License(s): other
Downloading chest-xray-pneumonia.zip to /content
100% 2.29G/2.29G [01:33<00:00, 27.5MB/s]
100% 2.29G/2.29G [01:33<00:00, 26.4MB/s]


In [4]:
!unzip -q chest-xray-pneumonia.zip

In [5]:
# Import
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import datasets, transforms, models
from PIL import Image
from tqdm.auto import tqdm
import time

In [6]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
device

'cuda'

In [7]:
train_transform = transforms.Compose([transforms.Resize((255, 255)),
                                      transforms.AutoAugment(),
                                      transforms.ToTensor(),
                                      transforms.RandomRotation(degrees=10),
                                      transforms.RandomHorizontalFlip(p=0.5),
                                      transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                           std=[0.229, 0.224, 0.225]),
                                     ])

transform = transforms.Compose([transforms.Resize((255, 255)),
                                transforms.ToTensor(),
                                transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                     std=[0.229, 0.224, 0.225])])

In [8]:
class CustomDataset(Dataset):
    def __init__(self, root_dir, split='train', transform=None):
        self.root_dir = root_dir
        self.split = split  # 'train', 'test', or 'val'
        self.transform = transform
        self.classes = sorted(os.listdir(os.path.join(root_dir, 'train')))
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(self.classes)}
        self.samples = self._load_samples()

    def _load_samples(self):
        samples = []
        split_path = os.path.join(self.root_dir, self.split)
        for cls_name in self.classes:
            cls_path = os.path.join(split_path, cls_name)
            class_idx = self.class_to_idx[cls_name]
            for file_name in os.listdir(cls_path):
                if file_name.endswith(('.jpg', '.jpeg', '.png')):
                    file_path = os.path.join(cls_path, file_name)
                    samples.append((file_path, class_idx))
        return samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        image_path, label = self.samples[index]
        image = Image.open(image_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, label

In [9]:
root_dir = '/content/chest_xray/'

In [10]:
# Example of creating instances of your custom dataset
train_dataset = CustomDataset(root_dir=root_dir, split='train', transform=train_transform)
test_dataset = CustomDataset(root_dir=root_dir, split='test', transform=transform)
val_dataset = CustomDataset(root_dir=root_dir, split='val', transform=transform)

In [11]:
len(train_dataset),len(test_dataset),len(val_dataset)

(5216, 624, 16)

In [12]:
# Define batch size for DataLoader
batch_size = 16

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last = True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [13]:
import torch.nn.functional as F
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, input_shape, hidden_units, output_shape):
        super(ResNet, self).__init__()
        self.initial_conv = nn.Conv2d(in_channels=input_shape, out_channels=hidden_units, kernel_size=3, stride=1, padding=1)
        self.initial_bn = nn.BatchNorm2d(hidden_units)
        self.initial_relu = nn.ReLU()
        self.initial_pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.layer1 = self._make_layer(hidden_units, hidden_units, stride=1)
        self.layer2 = self._make_layer(hidden_units, hidden_units, stride=2)

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(204800, hidden_units * 10),
            nn.ReLU(),
            nn.BatchNorm1d(hidden_units * 10),
            nn.Linear(hidden_units * 10, hidden_units * 5),
            nn.ReLU(),
            nn.BatchNorm1d(hidden_units * 5),
            nn.Linear(hidden_units * 5, hidden_units),
            nn.ReLU(),
            nn.BatchNorm1d(hidden_units),
            nn.Linear(hidden_units, output_shape),
            nn.Softmax(dim=1)
        )

    def _make_layer(self, in_channels, out_channels, stride):
        layer = ResidualBlock(in_channels, out_channels, stride)
        return layer

    def forward(self, x):
        x = self.initial_conv(x)
        x = self.initial_bn(x)
        x = self.initial_relu(x)
        x = self.initial_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)

        x = self.classifier(x)
        return x

torch.manual_seed(23)
model = ResNet(input_shape=3, hidden_units=50, output_shape=2)
model

ResNet(
  (initial_conv): Conv2d(3, 50, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (initial_bn): BatchNorm2d(50, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (initial_relu): ReLU()
  (initial_pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (layer1): ResidualBlock(
    (conv1): Conv2d(50, 50, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn1): BatchNorm2d(50, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(50, 50, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn2): BatchNorm2d(50, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (shortcut): Sequential()
  )
  (layer2): ResidualBlock(
    (conv1): Conv2d(50, 50, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (bn1): BatchNorm2d(50, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(50, 50, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn2): BatchNor

In [14]:
from torch.optim.lr_scheduler import StepLR
# Setup loss and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(),
                             lr=1e-4)
scheduler = StepLR(optimizer, step_size=7, gamma=0.1)

In [18]:
def train(train_dataloader, val_dataloader, model, loss_fn, optimizer, scheduler, device):
    model.to(device)
    model.train()
    train_loss = 0.0
    correct = 0
    for batch_idx, (X_train, y_train) in enumerate(train_dataloader):
        X_train, y_train = X_train.to(device), y_train.to(device)

        # Forward pass
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = loss_fn(outputs, y_train)
        train_loss += loss.item()

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Track the accuracy
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == y_train).sum().item()

        # Print training progress
        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch+1} [{batch_idx * len(X_train)}/{len(train_dataloader.dataset)} '
                  f'({100. * batch_idx / len(train_dataloader):.0f}%)]\tLoss: {loss.item():.6f}')

    train_loss /= len(train_dataloader.dataset)
    accuracy = 100. * correct / len(train_dataloader.dataset)
    print(f'Train Epoch: {epoch+1}\t Average Loss: {train_loss:.6f}, Accuracy: {accuracy:.2f}%')

    # Validation phase
    model.eval()
    val_loss = 0.0
    correct = 0
    with torch.no_grad():
        for X_val, y_val in val_dataloader:
            X_val, y_val = X_val.to(device), y_val.to(device)
            outputs = model(X_val)
            loss = loss_fn(outputs, y_val)
            val_loss += loss.item()

            # Track the accuracy
            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == y_val).sum().item()

    val_loss /= len(val_dataloader.dataset)
    accuracy = 100. * correct / len(val_dataloader.dataset)
    print(f'Validation Epoch: {epoch+1}\t Average Loss: {val_loss:.6f}, Accuracy: {accuracy:.2f}%')

    # Adjust learning rate
    scheduler.step()


In [19]:
def test(dataloader, model, loss_fn, device):
    size = len(dataloader.dataset)
    model.to(device)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [20]:
from tqdm.auto import tqdm
import time
start_time = time.time()


# Train and test model
epochs = 10
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n---------")
    train(train_loader,val_dataloader, model, loss_fn, optimizer, scheduler, device)
    test(test_dataloader, model, loss_fn,device)

current_time = time.time()
total = current_time - start_time
print("Done!")
print(f'Training Took: {total/60} minutes!')

  0%|          | 0/10 [00:00<?, ?it/s]

Epoch: 0
---------
Train Epoch: 1	 Average Loss: 0.031791, Accuracy: 83.07%
Validation Epoch: 1	 Average Loss: 0.032537, Accuracy: 81.25%
Test Error: 
 Accuracy: 81.9%, Avg loss: 0.518218 

Epoch: 1
---------
Train Epoch: 2	 Average Loss: 0.031095, Accuracy: 84.68%
Validation Epoch: 2	 Average Loss: 0.032227, Accuracy: 81.25%
Test Error: 
 Accuracy: 81.4%, Avg loss: 0.509005 

Epoch: 2
---------
Train Epoch: 3	 Average Loss: 0.030721, Accuracy: 85.01%
Validation Epoch: 3	 Average Loss: 0.032807, Accuracy: 87.50%
Test Error: 
 Accuracy: 81.2%, Avg loss: 0.505370 

Epoch: 3
---------
Train Epoch: 4	 Average Loss: 0.029921, Accuracy: 87.33%
Validation Epoch: 4	 Average Loss: 0.032332, Accuracy: 81.25%
Test Error: 
 Accuracy: 81.9%, Avg loss: 0.511029 

Epoch: 4
---------
Train Epoch: 5	 Average Loss: 0.029915, Accuracy: 86.83%
Validation Epoch: 5	 Average Loss: 0.032462, Accuracy: 81.25%
Test Error: 
 Accuracy: 80.3%, Avg loss: 0.511243 

Epoch: 5
---------
Train Epoch: 6	 Average Loss: 0