In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!unzip /content/drive/MyDrive/unibuc-brain-ad.zip

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: data/data/017153.png    
  inflating: data/data/017154.png    
  inflating: data/data/017155.png    
  inflating: data/data/017156.png    
  inflating: data/data/017157.png    
  inflating: data/data/017158.png    
  inflating: data/data/017159.png    
  inflating: data/data/017160.png    
  inflating: data/data/017161.png    
  inflating: data/data/017162.png    
  inflating: data/data/017163.png    
  inflating: data/data/017164.png    
  inflating: data/data/017165.png    
  inflating: data/data/017166.png    
  inflating: data/data/017167.png    
  inflating: data/data/017168.png    
  inflating: data/data/017169.png    
  inflating: data/data/017170.png    
  inflating: data/data/017171.png    
  inflating: data/data/017172.png    
  inflating: data/data/017173.png    
  inflating: data/data/017174.png    
  inflating: data/data/017175.png    
  inflating: data/data/017176.png    
  inflating: data/data/

In [58]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

from sklearn.utils.class_weight import compute_class_weight

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import torchvision as tv
from torchvision import transforms
from torchvision.transforms import ToTensor
from torchvision.transforms.functional import adjust_contrast
import torchvision.transforms.v2 as T
from torchvision.io import read_image, ImageReadMode


from skimage import io
from sklearn.metrics import classification_report
from torchvision.transforms import v2

In [112]:
class CTImages(Dataset):
    def __init__(self, filename, root_dir, eqn=True, train=True):
        self.annotations = pd.read_csv(filename, skipinitialspace=True, dtype={'id': 'string', 'class': 'int8'}).to_numpy()
        if eqn == True:
            tr = []
            fl = []
            for x in self.annotations:
                if x[1] == 1:
                    tr.append(x)
                else:
                    fl.append(x)

            self.annotations = np.append(fl, [tr, tr])
            self.annotations = self.annotations.reshape(len(self.annotations) // 2, 2)
        # self.annotations = self.annotations.reshape(len(self.annotations) // 2, 2)
        self.root_dir = root_dir
        self.transform = None
        if train:
          self.transform = torch.nn.Sequential(
              T.RandomHorizontalFlip(p=0.5),
              # T.RandomAffine(degrees=6.0, translate=(0.1, 0.1), scale=(0.90, 1.0), fill=-1.0),
              # T.ColorJitter())
          )


    def __len__(self):
        # return 10
        return len(self.annotations)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, f'{self.annotations[index][0]}.png')
        image = io.imread(img_path)
        image = torch.tensor(image, dtype=torch.float32) / 255.0
        image = torch.permute(image, (2, 0, 1))
        y_label = torch.tensor(int(self.annotations[index][1]))
        if self.transform:
            image = self.transform(image)

        return image, y_label


train_set = CTImages(filename='/content/data/train_labels.txt', root_dir='/content/data/data', eqn=False, train=True)
test_set = CTImages(filename='/content/data/validation_labels.txt', root_dir='/content/data/data', eqn=False, train=False)

# Batch Size -> 4 8 16 32
train_loader = DataLoader(dataset=train_set, batch_size=50, shuffle=True)
test_loader = DataLoader(dataset=test_set, batch_size=50, shuffle=False)

In [101]:
from torchvision.models import mobilenet_v3_large

# MobileNet (224 x 224 x 3)
model = mobilenet_v3_large(weights = None)
model.classifier = nn.Sequential(*list(model.classifier.children())[:-1], nn.Linear(in_features=1280,out_features=2))
print(model)

MobileNetV3(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): Hardswish()
    )
    (1): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
          (2): ReLU(inplace=True)
        )
        (1): Conv2dNormActivation(
          (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(16, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
        )
      )
    )
    (2): InvertedResidual(
      (block): Sequential(
        (0): Conv2dNormActivation(
          (0): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1), bi

In [102]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 16, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 8, 5)
        self.fc1 = nn.Linear(53 * 53 * 8, 32)
        self.fc2 = nn.Linear(32, 2)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        return x

model = CNN()

def init_weights(m):
  if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
      torch.nn.init.xavier_uniform(m.weight)
      m.bias.data.fill_(0.01)

model.apply(init_weights)

  torch.nn.init.xavier_uniform(m.weight)


CNN(
  (conv1): Conv2d(3, 16, kernel_size=(5, 5), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(16, 8, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=22472, out_features=32, bias=True)
  (fc2): Linear(in_features=32, out_features=2, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
)

In [118]:
# Cross Entropy
optimizer = torch.optim.SGD(model.parameters(), lr=0.0001, momentum=0.9) # Learning Rate 0.01 / 0.001     # - Optimizer
# optimizer = torch.optim.Adam(model.parameters(), lr=3e-5)                 # Learning Rate 0.001 / 0.0001
loss_fn = nn.CrossEntropyLoss(weight = torch.tensor([1, 4], dtype=torch.float32))
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

def train_one_epoch(epoch_index, loader):
    running_loss = 0.
    last_loss = 0.

    for i, data in enumerate(loader):
        # Every data instance is an input + label pair
        inputs, labels = data

        # Zero your gradients for every batch!
        optimizer.zero_grad()

        # Make predictions for this batch
        outputs = model(inputs)

        # Compute the loss and its gradients
        loss = loss_fn(outputs, labels)
        loss.backward()

        # Adjust learning weights
        optimizer.step()

        # Gather data and report
        running_loss += loss.item()
        if i % 1000 == 999:
            last_loss = running_loss / 1000 # loss per batch
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            running_loss = 0.

        scheduler.step()

    return last_loss

def test(loader):
    model.eval()

    test_loss = 0
    correct = 0
    true_positives, false_positives, false_negatives = 0, 0, 0

    all_pred = []
    all_target = []

    with torch.no_grad():
        for data, target in loader:
            for t in target:
              all_target.append(t)

            output = model(data)

            test_loss += loss_fn(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            for p in pred:
              all_pred.append(p)
            correct += pred.eq(target.view_as(pred)).sum().item()
            true_positives += ((pred == 1) & (target == 1)).sum().item()
            false_positives += ((pred == 1) & (target != 1)).sum().item()
            false_negatives += ((pred != 1) & (target == 1)).sum().item()

    print("F1-score: ", classification_report(all_target, all_pred))
    test_loss /= len(loader.dataset)
    print(
        f'\nTest set: Average loss: {test_loss:.4f}, Accuracy {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%\n)')
    return true_positives, false_positives, false_negatives

In [None]:
EPOCH = 30

for e in range(EPOCH):
  train_one_epoch(e, train_loader)
  test(test_loader)