In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install torch-summary

In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as T

# DATA PREPARATION

In [None]:
transforms = T.Compose([
    #T.ToPILImage(),
    T.Resize((224,224)),
    T.RandomHorizontalFlip(p=0.5),   # Randomly flip horizontally with 50% probability
    T.RandomVerticalFlip(p=0.5),
    T.RandomRotation(degrees=10),     # Randomly rotate by up to 10 degrees
    T.ToTensor(),                    # Convert the image to a tensor
    #T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
])

In [None]:
train_data = torchvision.datasets.ImageFolder(root='/kaggle/input/brain-mri-images-for-brain-tumor-detection/brain_tumor_dataset/', transform=transforms)
train_data

In [None]:
encoded_labels = {'0': 'no', '1': 'yes'}

### SAMPLE IMAGE

Let's take a look at a sample image and its label

In [None]:
sample_image_tensor, sample_label = train_data.__getitem__(97)
#print(sample_img_tensor.shape)
#plt.imshow(sample_img_tensor.permute(1, 2, 0))
plt.imshow(sample_image_tensor.permute(1, 2, 0))
plt.title(encoded_labels[str(sample_label)])
plt.show()

### TRAIN_VAL SPLIT

In [None]:
split = int(np.floor(0.75*train_data.__len__()))
train_set, val_set = torch.utils.data.random_split(train_data, [split, train_data.__len__()-split])

In [None]:
len(train_set), len(val_set)

In [None]:
from torch.utils.data import DataLoader

batch_size = 16
train_dataloader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_set, batch_size=batch_size, shuffle=True)

In [None]:
from torchvision.utils import make_grid

train_features, train_labels = next(iter(train_dataloader))

grid = make_grid(train_features)
plt.imshow(grid.permute(1, 2, 0))

In [None]:
train_features.shape

In [None]:
val_features, val_labels = next(iter(val_dataloader))

grid = make_grid(val_features)
plt.imshow(grid.permute(1, 2, 0))

# DENSENET ARCHITECTURE

### TRANSITION LAYERS:

Dense blocks are connected with each other using transition layers, which do convolution and pooling to downsample the image. It simply consists of a Bnorm and a 1x1 Conv layer followed by a 2x2 average pooling layer.

In [None]:
class Transition_Block(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Transition_Block, self).__init__()
        
        self.bnorm = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1)
        self.avgpool = nn.AvgPool2d(kernel_size=2, stride=2)
        
    def forward(self, x):
        
        out = self.relu(self.bnorm(x))
        out = self.conv1(out)
        out = self.avgpool(out)
        
        return out

In [None]:
class BottleNeck_Layer(nn.Module):
    def __init__(self, in_channels, growth_rate):
        super(BottleNeck_Layer, self).__init__()
        
        inter_channels = 4 * growth_rate
        
        self.conv1 = nn.Sequential(
                        nn.BatchNorm2d(in_channels),
                        nn.ReLU(),
                        nn.Conv2d(in_channels, inter_channels, kernel_size=1))
        
        self.conv2 = nn.Sequential(
                        nn.BatchNorm2d(inter_channels),
                        nn.ReLU(),
                        nn.Conv2d(inter_channels, growth_rate, kernel_size=3, padding=1))
        
    def forward(self, x):
            
        out = self.conv1(x)
        out = self.conv2(out)
        #print(x.shape, out.shape)
        out = torch.cat((x, out), 1)
        
        return out

In [None]:
class DenseNet_Model_Torch(nn.Module):
    def __init__(self, block, num_layers, growth_rate, num_classes):
        super(DenseNet_Model_Torch, self).__init__()
        
        self.conv1 = nn.Sequential(
                        nn.Conv2d(in_channels=3, out_channels=64, kernel_size=7, stride=2, padding=3),
                        nn.BatchNorm2d(64),
                        nn.ReLU())
    
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        in_channels=64
        
        self.dense1 = self.make_layer(BottleNeck_Layer, in_channels=in_channels, growth_rate=growth_rate, num_layers=num_layers[0])
        in_channels += num_layers[0]*growth_rate                                               ### in_channels => 64+(6x32) = 256
        out_channels = in_channels//2                                                          ### out_channels => 256//2 = 128
        self.trans1 = Transition_Block(in_channels=in_channels, out_channels=out_channels)
        
        in_channels = out_channels                                                             ### in_channels = 128
        
        self.dense2 = self.make_layer(BottleNeck_Layer, in_channels=in_channels, growth_rate=growth_rate, num_layers=num_layers[1])
        in_channels += num_layers[1]*growth_rate                                               ### in_channels => 128+(12x32) = 512
        out_channels = in_channels//2                                                          ### out_channels => 512//2 = 256
        self.trans2 = Transition_Block(in_channels=in_channels, out_channels=out_channels)
        
        in_channels = out_channels                                                             ### in_channels = 256
        
        self.dense3 = self.make_layer(BottleNeck_Layer, in_channels=in_channels, growth_rate=growth_rate, num_layers=num_layers[2])
        in_channels += num_layers[2]*growth_rate                                               ### in_channels => 256+(24x32) = 1024
        out_channels = in_channels//2                                                          ### out_channels => 1024//2 = 512
        self.trans3 = Transition_Block(in_channels=in_channels, out_channels=out_channels)
        
        in_channels = out_channels                                                             ### in_channels = 512
        
        self.dense4 = self.make_layer(BottleNeck_Layer, in_channels=in_channels, growth_rate=growth_rate, num_layers=num_layers[3])
        in_channels += num_layers[3]*growth_rate                                                ### in_channels => 512+(16x32) = 1024
        #out_channels = in_channels//2                                                          ### out_channels => 1024//2 = 512
        
        self.bnorm = nn.BatchNorm2d(in_channels)
        self.avgpool = nn.AvgPool2d(7)
        self.flat = nn.Flatten()
        self.fc = nn.Linear(in_channels, num_classes)
        
    def make_layer(self, block, in_channels, growth_rate, num_layers):            
        
        layers = []
        for i in range(num_layers):
            layers.append(block(in_channels, growth_rate))
            in_channels += growth_rate
            
        return nn.Sequential(*layers)    
    
    def forward(self, x):
        
        out = self.conv1(x)
        out = self.maxpool(out)
        
        out = self.dense1(out)
        out = self.trans1(out)
        
        out = self.dense2(out)
        out = self.trans2(out)
        
        out = self.dense3(out)
        out = self.trans3(out)
        
        out = self.dense4(out)
        out = self.bnorm(out)
        
        out = self.avgpool(out)
        out = self.flat(out)
        
        out = self.fc(out)
        
        return out
    
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = DenseNet_Model_Torch(BottleNeck_Layer, [6, 12, 24, 16], 32, 1).to(device)                  ### DenseNet121 Architecture
model = model.to(device)
#loss_fn = nn.CrossEntropyLoss()
#loss_fn = nn.BCELoss()
loss_fn = nn.BCEWithLogitsLoss()
optim = torch.optim.Adam(model.parameters(), lr=0.001)

### Note: 
Since we are using BCEWithLogitsLoss() we don't need to use a sigmoid function in the end, as opposed to using BCELoss()
<br>Also since this is a binary classification problem we don't need to pass num_classes as 2 instead we can compute class probabilities of a single class and use it to predict labels

In [None]:
print(model)

In [None]:
from torchsummary import summary
summary(model, (3, 224, 224))

In [None]:
device

In [None]:
trainSteps = len(train_dataloader.dataset)//batch_size
trainSteps

In [None]:
valSteps = len(val_dataloader.dataset)//batch_size
valSteps

In [None]:
class ValidationLossEarlyStopping:
    def __init__(self, patience=1, min_delta=0.0):
        self.patience = patience  # number of times to allow for no improvement before stopping the execution
        self.min_delta = min_delta  # the minimum change to be counted as improvement
        self.counter = 0  # count the number of times the validation accuracy not improving
        self.min_validation_loss = np.inf

# return True when encountering _patience_ times decrease in validation loss 
    def early_stop_check(self, validation_loss):
        if ((validation_loss+self.min_delta) < self.min_validation_loss):
            self.min_validation_loss = validation_loss
            self.counter = 0  # reset the counter if validation loss decreased at least by min_delta
        elif ((validation_loss+self.min_delta) > self.min_validation_loss):
            self.counter += 1 # increase the counter if validation loss is not decreased by the min_delta
            if self.counter >= self.patience:
                return True
        return False

early_stopping = ValidationLossEarlyStopping(patience=5, min_delta=0.01)

Ref.: https://stackoverflow.com/questions/71998978/early-stopping-in-pytorch

# TRAINING MODEL

In [None]:
num_epochs = 50
train_loss_list = []
val_loss_list = []
train_acc_list = []
val_acc_list = []
count1 = len(train_dataloader.dataset)
count2 = len(val_dataloader.dataset)

for epoch in range(num_epochs):
    epoch_acc = 0
    train_loss = 0
    val_loss = 0
    val_epoch_acc = 0
    for images, labels in train_dataloader:
        #images = images.view(-1,1,28,28)
        #count += len(labels)

        images, labels = images.to(device), labels.to(device)

        images = images.float()
        y_pred = model(images)
        #y_pred = y_pred.argmax(1)
        y_pred = y_pred.squeeze(-1)
        loss = loss_fn(y_pred, labels.float())

        optim.zero_grad()
        loss.backward()
        optim.step()

        train_loss += loss.item()
        #print(y_pred)
        y_pred = torch.tensor([1 if i>0 else 0 for i in y_pred]).to(device) 
        #print(y_pred)
        epoch_acc += (y_pred == labels).sum().item()

  ### Validation script

    with torch.no_grad():
        model.eval()

        for val_images, val_labels in val_dataloader:
        
            val_images, val_labels = val_images.to(device), val_labels.to(device)

            val_images = val_images.float()
            y_pred_val = model(val_images)
            y_pred_val = y_pred_val.squeeze(-1)
            loss = loss_fn(y_pred_val, val_labels.float())

            val_loss += loss.item()
            y_pred_val = torch.tensor([1 if i>0 else 0 for i in y_pred_val]).to(device) 
            val_epoch_acc += (y_pred_val == val_labels).sum().item()
        
#     if early_stopping.early_stop_check(val_loss/valSteps):
#         print("We are at epoch:", epoch)
#         break

    print('Epoch: {} - Loss: {:.6f}, Training Acc.: {:.3%}, Val. Loss: {:.6f}, Validation Acc.: {:.3%}'.format(epoch + 1, train_loss/trainSteps, epoch_acc/count1, val_loss/valSteps, val_epoch_acc/count2))
    train_loss_list.append(train_loss/trainSteps)
    val_loss_list.append(val_loss/valSteps)
    train_acc_list.append(epoch_acc/count1)
    val_acc_list.append(val_epoch_acc/count2)

# MODEL PERFORMANCE

In [None]:
plt.plot(train_acc_list)
plt.plot(val_acc_list)
# plt.plot(history.history["val_loss"])
plt.title("Train & Val accuracy")
plt.ylabel("Accuracy")
plt.xlabel("epoch")
plt.legend(["train","test"],loc="upper left")
plt.show()

In [None]:
plt.plot(train_loss_list)
plt.plot(val_loss_list)
# plt.plot(history.history["val_loss"])
plt.title("Model Loss")
plt.ylabel("Loss")
plt.xlabel("epoch")
plt.legend(["train","test"],loc="upper left")
plt.show()

# SAMPLE RESULTS

In [None]:
model.eval()

val_features, val_labels = next(iter(val_dataloader))

val_images = val_images.to(device)
        
val_images = val_images.float()
y_pred = model(val_images)

y_pred_val = torch.tensor([1 if i>0 else 0 for i in y_pred_val]).to(device) 
print(y_pred_val)

for i in range(len(val_images)):
    plt.imshow(val_images[i].permute(1, 2, 0).cpu())
    plt.title(f"Actual: {encoded_labels[str(val_labels[i].item())]}, Pred: {encoded_labels[str(y_pred_val[i].item())]}")
    plt.show()