In [1]:
!pip install torch torchvision torchmetrics kaggle kagglehub

Collecting torchmetrics
  Downloading torchmetrics-1.6.1-py3-none-any.whl.metadata (21 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Col

In [2]:
import kagglehub

path = kagglehub.dataset_download("paultimothymooney/chest-xray-pneumonia")
path

Downloading from https://www.kaggle.com/api/v1/datasets/download/paultimothymooney/chest-xray-pneumonia?dataset_version_number=2...


100%|██████████| 2.29G/2.29G [00:19<00:00, 124MB/s] 

Extracting files...





'/root/.cache/kagglehub/datasets/paultimothymooney/chest-xray-pneumonia/versions/2'

In [3]:
import os

base = f"{path}/chest_xray/"
train_base = f"{base}/train"
val_base = f"{base}/val"

In [4]:
import torch

class XRayDataset(torch.utils.data.Dataset):
  def __init__(self, path):
    super(XRayDataset, self).__init__()

    self.imgs = [f"{path}/NORMAL/{f}" for f in os.listdir(f"{path}/NORMAL")] + [f"{path}/PNEUMONIA/{f}" for f in os.listdir(f"{path}/PNEUMONIA")]
    self.labels = [0]*len(os.listdir(f"{path}/NORMAL")) + [1]*len(os.listdir(f"{path}/PNEUMONIA"))

  def __getitem__(self, index):
    from PIL import Image
    import numpy as np

    img = Image.open(self.imgs[index])
    img = img.resize((224,224))

    if len(img.size) != 3:
      rgb = Image.new("RGB", img.size)
      rgb.paste(img)
      img = rgb

    img = np.array(img)
    img = img / 255.
    img = torch.from_numpy(img).float()
    # WxHxC -> CxHxW
    img = img.permute(2,1,0)
    return img, self.labels[index]

  def __len__(self):
    return len(self.imgs)

In [5]:
import torch.nn as nn
class VGG16(nn.Module):
  def __init__(self):
    super().__init__()

    self.layers = nn.ModuleList()
    self.layers.append(nn.Sequential(
        nn.Conv2d(3, 64, 3, 1, 1),
        nn.BatchNorm2d(64),
        nn.ReLU()
    ))
    self.layers.append(nn.Sequential(
        nn.Conv2d(64, 64, 3, 1, 1),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.MaxPool2d(2, 2)
    ))

    self.layers.append(nn.Sequential(
        nn.Conv2d(64, 128, 3, 1, 1),
        nn.BatchNorm2d(128),
        nn.ReLU()
    ))
    self.layers.append(nn.Sequential(
        nn.Conv2d(128, 128, 3, 1, 1),
        nn.BatchNorm2d(128),
        nn.ReLU(),
        nn.MaxPool2d(2, 2)
    ))

    self.layers.append(nn.Sequential(
        nn.Conv2d(128, 256, 3, 1, 1),
        nn.BatchNorm2d(256),
        nn.ReLU()
    ))
    self.layers.append(nn.Sequential(
        nn.Conv2d(256, 256, 3, 1, 1),
        nn.BatchNorm2d(256),
        nn.ReLU()
    ))
    self.layers.append(nn.Sequential(
        nn.Conv2d(256, 256, 3, 1, 1),
        nn.BatchNorm2d(256),
        nn.ReLU(),
        nn.MaxPool2d(2, 2)
    ))

    self.layers.append(nn.Sequential(
        nn.Conv2d(256, 512, 3, 1, 1),
        nn.BatchNorm2d(512),
        nn.ReLU()
    ))
    self.layers.append(nn.Sequential(
        nn.Conv2d(512, 512, 3, 1, 1),
        nn.BatchNorm2d(512),
        nn.ReLU()
    ))
    self.layers.append(nn.Sequential(
        nn.Conv2d(512, 512, 3, 1, 1),
        nn.BatchNorm2d(512),
        nn.ReLU(),
        nn.MaxPool2d(2, 2)
    ))

    self.layers.append(nn.Sequential(
        nn.Conv2d(512, 512, 3, 1, 1),
        nn.BatchNorm2d(512),
        nn.ReLU()
    ))
    self.layers.append(nn.Sequential(
        nn.Conv2d(512, 512, 3, 1, 1),
        nn.BatchNorm2d(512),
        nn.ReLU()
    ))
    self.layers.append(nn.Sequential(
        nn.Conv2d(512, 512, 3, 1, 1),
        nn.BatchNorm2d(512),
        nn.ReLU(),
        nn.MaxPool2d(2, 2)
    ))

    # Fully connected layers
    self.fc = nn.ModuleList()
    self.fc.append(nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(7*7*512, 4096),
        nn.ReLU()
    ))
    self.fc.append(nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(4096, 4096),
        nn.ReLU()
    ))
    self.fc.append(nn.Sequential(
        nn.Linear(4096, 2)
    ))

  def forward(self, x):
    out = x
    for layer in self.layers:
      out = layer(out)

    out = out.reshape(out.size(0), -1)
    for fc in self.fc:
      out = fc(out)

    return out

In [6]:
train_ds = XRayDataset(train_base)
val_ds = XRayDataset(val_base)

len(train_ds), len(val_ds)

(5216, 16)

In [7]:
# verify
for data in train_ds:
  print(data[0].shape, data[1])
  print(data[0].min(), data[0].max())
  break

torch.Size([3, 224, 224]) 0
tensor(0.) tensor(1.)


In [8]:
# verify
for data in val_ds:
  print(data[0].shape, data[1])
  print(data[0].min(), data[0].max())
  break

torch.Size([3, 224, 224]) 0
tensor(0.) tensor(1.)


In [9]:
model = VGG16()

In [10]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

In [11]:
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True, collate_fn=lambda x: x)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=True, collate_fn=lambda x: x)

In [21]:
len(train_dl), len(val_dl)

(326, 1)

In [38]:
from time import time

opt = torch.optim.Adam(model.parameters(), lr=1e-2)
loss_fn = torch.nn.CrossEntropyLoss()

model.to(device)

n_epochs = 3
for epoch in range(1, n_epochs+1):
  print(f"Epoch {epoch}/{n_epochs}", end=' ')

  start = time()
  loss_per_epoch = 0
  for batch in train_dl:
    X = []
    y = []
    for data in batch:
      X.append(data[0])
      y.append(data[1])

    X = torch.stack(X)
    y = torch.Tensor(y)

    if device == 'cuda':
      X = X.cuda()
      y = y.cuda()

    # 16 -> 16x2
    y = torch.nn.functional.one_hot(y.to(torch.long))

    preds = model(X)
    loss = loss_fn(preds, y.float())

    loss_per_epoch += loss

    opt.zero_grad()
    loss.backward()
    opt.step()

  end = time()
  print(f"| Loss {loss_per_epoch / len(train_dl)} [Took {end - start}s]", end='\n')

Epoch 1/3 | Loss 0.5704289674758911 [Took 92.6587507724762s]
Epoch 2/3 | Loss 0.5706160068511963 [Took 92.0543737411499s]
Epoch 3/3 | Loss 0.5706800818443298 [Took 91.36654448509216s]


In [39]:
torch.save(model.state_dict(), './trained-vgg16.pth')

In [40]:
model.load_state_dict(torch.load('./trained-vgg16.pth', weights_only=True))
model.eval()

VGG16(
  (layers): ModuleList(
    (0): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (1): Sequential(
      (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (2): Sequential(
      (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (3): Sequential(
      (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): MaxPool2d(kernel_size=

In [41]:
model.to(device)

X = []
y = []
preds = None
for batch in val_dl:
  for data in batch:
    X.append(data[0])
    y.append(data[1])

  X = torch.stack(X)
  y = torch.Tensor(y)

  if device == 'cuda':
    X = X.cuda()
    y = y.cuda()

  y = torch.nn.functional.one_hot(y.to(torch.long))
  preds = model(X)
  break

In [42]:
from torchmetrics import Accuracy, Precision, Recall, F1Score

proba = torch.nn.functional.softmax(preds, dim=-1)
proba = proba > 0.5

acc = Accuracy(task='multiclass', num_classes=2).to(device)
prec = Precision(task='multiclass', num_classes=2).to(device)
rec = Recall(task='multiclass', num_classes=2).to(device)
f1 = F1Score(task='multiclass', num_classes=2).to(device)

acc(proba, y), prec(proba, y), rec(proba, y), f1(proba, y)

(tensor(0.5000, device='cuda:0'),
 tensor(0.5000, device='cuda:0'),
 tensor(0.5000, device='cuda:0'),
 tensor(0.5000, device='cuda:0'))