<a href="https://colab.research.google.com/github/HusseinKasim/Brain-Tumor-MRI-Binary-Classification-Study/blob/main/Brain_Tumor_MRI_Binary_Classification_Study_Hussein_Kasim.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import numpy as np
import torchvision.transforms as tt
from torchvision.utils import make_grid
from torchvision.datasets import ImageFolder
import matplotlib.pyplot as plt
from google.colab import drive
import zipfile
import sklearn.metrics

In [None]:
drive.mount('/content/drive')

with zipfile.ZipFile("drive/MyDrive/Brain Tumor Split (0-1).zip", "r") as zip_ref:
  zip_ref.extractall()


  # 80-20 dataset code
  # with zipfile.ZipFile("drive/MyDrive/Brain Tumor Split (0-1) 20%.zip", "r") as zip_ref:
  #  zip_ref.extractall()

Mounted at /content/drive


In [None]:
data_dir = 'Brain Tumor Split (0-1)'

# 80-20 dataset code
# data_dir = 'Brain Tumor Split (0-1) 20%'


# Prints all the directories in the dataset
print("Directories in the dataset: ")
print(os.listdir(data_dir))

# Prints all directories under /train (non-tumor images folder 0 and tumor images folder 1)
print("\ntrain directories: ")
classes = os.listdir(data_dir  + "/train")
print(classes)

# Prints all directories under /validation (non-tumor images folder 0 and tumor images folder 1)
print("\nvalidation directories: ")
classes = os.listdir(data_dir  + "/validation")
print(classes)

Directories in the dataset: 
['train', 'validation']

train directories: 
['0', '1']

validation directories: 
['0', '1']


In [None]:
batch_size = 16 # or 32 (both give the same mean and standard deviation because same dataset)

# Transforms convert the data from PIL images to tensors
train_tfms = tt.Compose([tt.ToTensor()])
valid_tfms = tt.Compose([tt.ToTensor()])


# Apply transforms
train_ds = ImageFolder(data_dir+'/train', train_tfms)
valid_ds = ImageFolder(data_dir+'/validation', valid_tfms)


# PyTorch data loaders
train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=2, pin_memory=True)
valid_dl = DataLoader(valid_ds, batch_size, num_workers=2, pin_memory=True)

In [None]:
# Calculate the mean and standard deviation (which are used in data normalization)

def batch_mean_and_sd(loader):
  cnt = 0
  fst_moment = torch.empty(3)
  snd_moment = torch.empty(3)

  for images, _ in loader:
    b, c, h, w = images.shape
    nb_pixels = b * h * w
    sum_ = torch.sum(images, dim=[0,2,3])
    sum_of_squares = torch.sum(images ** 2, dim=[0,2,3])
    fst_moment = (cnt * fst_moment + sum_) / (cnt + nb_pixels)
    snd_moment = (cnt * snd_moment + sum_of_squares) / (cnt + nb_pixels)
    cnt += nb_pixels
  

  mean, std = fst_moment,  torch.sqrt(snd_moment- fst_moment ** 2)
  return mean,std
train_mean, train_std = batch_mean_and_sd(train_dl)
valid_mean, valid_std = batch_mean_and_sd(valid_dl)
print("Training dataset mean and standard deviation: ")
print(train_mean, train_std)

print("\nValidation dataset mean and standard deviation: ")
print(valid_mean, valid_std)

Training dataset mean and standard deviation: 
tensor([0.1235, 0.1235, 0.1235]) tensor([0.2630, 0.2630, 0.2630])

Validation dataset mean and standard deviation: 
tensor([0.1235, 0.1235, 0.1235]) tensor([0.2649, 0.2649, 0.2649])


In [None]:
# For 90-10 split:
# Training dataset mean and standard deviation: 
# tensor([0.1235, 0.1235, 0.1235]) tensor([0.2630, 0.2630, 0.2630])

# Validation dataset mean and standard deviation: 
 #tensor([0.1235, 0.1235, 0.1235]) tensor([0.2649, 0.2649, 0.2649])


# For 80-20 split:
# Training dataset mean and standard deviation: 
# tensor([0.1232, 0.1232, 0.1232]) tensor([0.2627, 0.2627, 0.2627]) 

# Validation dataset mean and standard deviation: 
# tensor([0.1246, 0.1246, 0.1246]) tensor([0.2651, 0.2651, 0.2651])

In [None]:
# Data transforms (normalization & data augmentation)
train_stats = ((0.1235, 0.1235, 0.1235), (0.2630, 0.2630, 0.2630))
valid_stats = ((0.1235, 0.1235, 0.1235), (0.2649, 0.2649, 0.2649))


# Apply new transforms
train_tfms = tt.Compose([tt.RandomCrop(240, padding=4, padding_mode='reflect'), tt.RandomHorizontalFlip(),
                         tt.ToTensor(), tt.Normalize(*train_stats,inplace=True)])
valid_tfms = tt.Compose([tt.RandomCrop(240, padding=4, padding_mode='reflect'), tt.RandomHorizontalFlip(),
                         tt.ToTensor(), tt.Normalize(*valid_stats)])

# Load datasets (with new transforms)
train_ds = ImageFolder(data_dir+'/train', train_tfms)
valid_ds = ImageFolder(data_dir+'/validation', valid_tfms)

In [None]:
# batch size for training and validation
batch_size = 16 # or 32 depending on the experiment

# PyTorch data loaders
train_dl = DataLoader(train_ds, batch_size, shuffle=True, num_workers=2, pin_memory=True)
valid_dl = DataLoader(valid_ds, batch_size, num_workers=2, pin_memory=True)


def get_default_device():
    # Pick GPU if available, else CPU
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    # Move tensor to chosen device
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    # Wrap a dataloader to move data to a device
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        # Yield a batch of data after moving it to device
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        # Number of batches
        return len(self.dl)

device = get_default_device()

train_dl = DeviceDataLoader(train_dl, device)
valid_dl = DeviceDataLoader(valid_dl, device)

In [None]:
# List of predictions
preds_list = []   

# List of labels (targets)
actual_list = []


# Calculates the predictions from probabilities, move them to the CPU, and store them in a NumPy array
def predictions(outputs):
   _, preds = torch.max(outputs, dim=1) # Produces max value from outputs(discarded) and index of max value from outputs (0 or 1)
   preds = preds.cpu()
   predsNP = preds.numpy()
   return predsNP


class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate Predictions out = [pred_0_val, pred_1_val]
        loss = F.cross_entropy(out, labels) # Calculate Loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate Predictions out = [pred_0_val, pred_1_val]
        loss = F.cross_entropy(out, labels)   # Calculate Loss
        pred_batch = predictions(out)        
        for i in pred_batch:
         preds_list.append(i)                 # Calculate Predictions List
        labels = labels.cpu()              
        for i in labels:
          actual_list.append(i)               # Calculate Labels List
        return {'val_loss': loss.detach()} 
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()                       # Combine losses
        actual_listNP = np.array(actual_list)
        preds_listNP = np.array(preds_list)
        acc = sklearn.metrics.accuracy_score(actual_listNP, preds_listNP)       # Calculate Epoch Accuracy Score
        er = 1-acc                                                              # Calculate the Error Rate Score
        ps = sklearn.metrics.precision_score(actual_listNP, preds_listNP, zero_division=1)       # Calculate Epoch Precision Score
        rs = sklearn.metrics.recall_score(actual_listNP, preds_listNP, zero_division=1)          # Calculate Epoch Recall Score
        cm = sklearn.metrics.confusion_matrix(actual_listNP, preds_listNP)      # Calculate Epoch Confusion Matrix
        preds_list.clear()
        actual_list.clear()
        return {'val_loss': epoch_loss.item(), 'acc': acc, 'ps': ps, 'rs': rs, 'er': er, 'cm': cm}
  
    def epoch_end(self, epoch, result):
        print("Epoch [{}]".format(epoch))
        print("Accuracy: {:.3f}, Error Rate: {:.3f}, Precision: {:.3f}, Recall: {:.3f}".format(result['acc'], result['er'], result['ps'], result['rs'] ))
        print("Last Learning Rate: {:.3f}, Training Loss: {:.3f}, Validation Loss: {:.3f}".format(result['lrs'][-1], result['train_loss'], result['val_loss']))
        print("Confusion Matrix: \n {} \n\n".format(result['cm']))

In [None]:
def conv_block(in_channels, out_channels, pool=False):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), 
              nn.BatchNorm2d(out_channels), 
              nn.ReLU(inplace=True)]
    if pool: layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

class ResNet(ImageClassificationBase):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        
        self.conv1 = conv_block(in_channels, 64)
        self.conv2 = conv_block(64, 128, pool=True)
        self.res1 = nn.Sequential(conv_block(128, 128), conv_block(128, 128))
        
        self.conv3 = conv_block(128, 256, pool=True)
        self.conv4 = conv_block(256, 512, pool=True)
        self.res2 = nn.Sequential(conv_block(512, 512), conv_block(512, 512))
        
        self.classifier = nn.Sequential(nn.MaxPool2d(4), 
                                        nn.Flatten(), 
                                        nn.Dropout(0.2),
                                        nn.Linear(25088, num_classes))
        
    def forward(self, xb):
        out = self.conv1(xb)
        out = self.conv2(out)
        out = self.res1(out) + out
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.res2(out) + out
        out = self.classifier(out)
        return out


model = to_device(ResNet(3, 2), device)
model

In [None]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']



# Training and Validation
def fit_one_cycle(epochs, max_lr, model, train_loader, val_loader, 
                  weight_decay=0, grad_clip=None, opt_func=torch.optim.SGD):
    torch.cuda.empty_cache()
    history = []   
    # Optimizer with weight decay
    optimizer = opt_func(model.parameters(), max_lr, weight_decay=weight_decay)
    # One-cycle learning rate scheduler
    sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs, 
                                                steps_per_epoch=len(train_loader)) 
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        lrs = []

        for batch in train_loader:
          # Training
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward() # Gradient calculation

            # Gradient clipping
            if grad_clip: 
                nn.utils.clip_grad_value_(model.parameters(), grad_clip)
            
            optimizer.step() # Model's parameters updated
            optimizer.zero_grad() # Gradients set to zero
            
            # Record & update learning rate
            lrs.append(get_lr(optimizer))
            sched.step()
        
        # Validation phase
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        result['lrs'] = lrs
        model.epoch_end(epoch, result)
        history.append(result)
    return history

In [None]:
history = [evaluate(model, valid_dl)]
history

In [None]:
epochs = 10 # or 20 or 30
max_lr = 0.01 # or 0.05 
grad_clip = 0.1
weight_decay = 1e-4
opt_func = torch.optim.Adam

In [None]:
history += fit_one_cycle(epochs, max_lr, model, train_dl, valid_dl, 
                             grad_clip=grad_clip, 
                             weight_decay=weight_decay, 
                             opt_func=opt_func)

In [None]:
def plot_accuracy(history):
    accuracies = [x['acc'] for x in history]
    plt.plot(accuracies, '-x')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Accuracy vs. No. of Epochs');


plot_accuracy(history)

In [None]:
def plot_error_rate(history):
    error_rates = [x['er'] for x in history]
    plt.plot(error_rates, '-x')
    plt.xlabel('Epoch')
    plt.ylabel('Error Rate')
    plt.title('Error Rate vs. No. of Epochs');


plot_error_rate(history)

In [None]:
def plot_precision(history):
    precision_scores = [x['ps'] for x in history]
    plt.plot(precision_scores, '-x')
    plt.xlabel('Epoch')
    plt.ylabel('Precision')
    plt.title('Precision vs. No. of Epochs');


plot_precision(history)

In [None]:
def plot_recall(history):
    recall_scores = [x['rs'] for x in history]
    plt.plot(recall_scores, '-x')
    plt.xlabel('Epoch')
    plt.ylabel('Recall')
    plt.title('Recall vs. No. of Epochs');


plot_recall(history)


In [None]:
def plot_losses(history):
    train_losses = [x.get('train_loss') for x in history]
    val_losses = [x['val_loss'] for x in history]
    plt.plot(train_losses, '-bx')
    plt.plot(val_losses, '-rx')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(['Training', 'Validation'])
    plt.title('Loss vs. No. of Epochs');


plot_losses(history)

In [None]:
def plot_lrs(history):
    lrs = np.concatenate([x.get('lrs', []) for x in history])
    plt.plot(lrs)
    plt.xlabel('Batch No.')
    plt.ylabel('Learning Rate')
    plt.title('Learning Rate vs. Batch No.');


plot_lrs(history)

In [None]:
def predict_image(img, model):
    xb = to_device(img.unsqueeze(0), device)  # Convert to a batch of 1
    yb = model(xb)
    _, preds  = torch.max(yb, dim=1)
    return train_ds.classes[preds[0].item()]

# Non-Tumor Image Example
img, label = valid_ds[25]
plt.imshow(img.permute(1, 2, 0).clamp(0, 1))
print("No Tumor: 0 \t\t Tumor: 1")
print('Actual Value:', train_ds.classes[label], ', Predicted Value:', predict_image(img, model))

In [None]:
# Tumor Image Example 
img, label = valid_ds[250]
plt.imshow(img.permute(1, 2, 0).clamp(0, 1))
print("No Tumor: 0 \t\t Tumor: 1")
print('Actual Value:', train_ds.classes[label], ', Predicted Value:', predict_image(img, model))