In [1]:
import torch
import torchvision
print(torch.__version__)
print(torchvision.__version__)

import random
import numpy as np
import matplotlib.pyplot as plt

from pathlib import Path
from PIL import Image

2.5.1+cu124
0.20.1+cu124


In [2]:
import os
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"  # Or ":16:8"
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
torch.use_deterministic_algorithms(True, warn_only=True)

In [3]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [4]:
SEED = 42

def set_seed(SEED):
    torch.manual_seed(SEED)
    random.seed(SEED)
    np.random.seed(SEED)

In [5]:
def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

## Dataset

In [6]:
from torchvision import transforms

RESIZE = 256
CROP = 224

transform_list = transforms.Compose([
    transforms.Resize(size=(RESIZE,RESIZE)),
    transforms.CenterCrop(size=(CROP,CROP)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
])

In [7]:
from torch.utils.data import Dataset

class mri_dataset(Dataset):
    def __init__(self, root_dir, transform = None):
        self.root_dir = root_dir
        self.imgs_path_list = list(root_dir.rglob("*.*"))
        self.transform = transform

    def __len__(self):
        return len(self.imgs_path_list)

    def __getitem__(self, idx):
        img_path = self.imgs_path_list[idx]
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)

        if img_path.parent.name == 'no':
            label = 0
        elif img_path.parent.name == 'yes':
            label = 1

        return img, label

In [8]:
root_dir = Path("/kaggle/input/tiny-brain-tumor-mri/brain_tumor_dataset")
total_ds = mri_dataset(root_dir, transform_list)
print(f"Total number of images: {len(total_ds)}")

Total number of images: 253


In [9]:
from torch.utils.data import random_split

train_size = 0.8

generator = torch.Generator().manual_seed(0)
train_ds, test_ds = random_split(total_ds, [train_size, 1-train_size], generator=generator)

In [10]:
from torch.utils.data import DataLoader
import os

NUM_WORKERS = os.cpu_count()
BATCH_SIZE = 64

g = torch.Generator()
g.manual_seed(0)

# You have to turn off shuffle to have sampler
train_dataloader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS, 
                              pin_memory=True, worker_init_fn=seed_worker, generator=g)
test_dataloader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS, 
                              pin_memory=True, worker_init_fn=seed_worker, generator=g)

print(f"No. of batch in training: {len(train_dataloader)}")
print(f"No. of batch in training: {len(test_dataloader)}")

No. of batch in training: 4
No. of batch in training: 1


## Model

In [11]:
from torch import nn
import torch.nn.functional as F

class Conv_Block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.conv2d = nn.Conv2d(in_channels=in_c, out_channels=out_c, kernel_size=3)
        self.bn = nn.BatchNorm2d(num_features=out_c)

    def forward(self, x):
        return F.relu(self.bn(self.conv2d(x)))

class Dense_Block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.dense = nn.Linear(in_features=in_c, out_features=out_c, bias=True)
        self.bn = nn.BatchNorm1d(num_features=out_c)

    def forward(self, x):
        return F.relu(self.bn(self.dense(x)))

In [12]:
class ChannelAttentionModule(nn.Module):
    def __init__(self, in_c, ratio):
        super().__init__()
        self.shared_mlp = nn.Sequential(
            nn.Linear(in_c, in_c//ratio, bias=True),
            nn.ReLU(),
            nn.Linear(in_c//ratio, in_c, bias=True)
        )

    def forward(self, x):
        y1 = F.max_pool2d(x, kernel_size=x.shape[2:])
        y2 = F.avg_pool2d(x, kernel_size=x.shape[2:])

        y1 = y1.view(y1.shape[0], -1)
        y1 = self.shared_mlp(y1)
        y2 = y2.view(y2.shape[0], -1)
        y2 = self.shared_mlp(y2)

        out = y1 + y2
        out = F.sigmoid(out)
        return out

class SpatialAttentionModule(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv2d = nn.Conv2d(in_channels=2, out_channels=1, kernel_size=7, padding=3)

    def forward(self, x):
        y_max = x.max(dim=1, keepdim=True).values
        y_avg = x.mean(dim=1, keepdim=True)
        y = torch.cat((y_max, y_avg), dim=1)
        y = F.sigmoid(self.conv2d(y))
        return y

class CBAM_Block(nn.Module):
    def __init__(self, in_c, ratio=8):
        super().__init__()
        self.cam = ChannelAttentionModule(in_c, ratio=8)
        self.sam = SpatialAttentionModule()

    def forward(self, x):
        y = self.cam(x)
        y_cam = x * y.unsqueeze(2).unsqueeze(3)
        y = self.sam(y_cam)
        out = y_cam * y 
        return out

In [13]:
class Residual_Block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.conv2d1 = nn.Conv2d(in_channels=in_c, out_channels=out_c, kernel_size=3, padding=1)
        self.conv2d2 = nn.Conv2d(in_channels=out_c, out_channels=out_c, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_c)
        self.bn2 = nn.BatchNorm2d(out_c)

    def forward(self, x):
        y = self.bn2(self.conv2d2(F.leaky_relu(self.bn1(self.conv2d1(x)))))
        out = y + x
        return F.leaky_relu(out)

In [14]:
class BrainMRNet(nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        # Stage 1
        self.convblock11 = Conv_Block(in_c=3, out_c=32)
        self.convblock12 = Conv_Block(in_c=32, out_c=32)
        self.maxpool1 = torch.nn.MaxPool2d(kernel_size=2, stride=2)

        # Stage 2
        self.convblock21 = Conv_Block(in_c=32, out_c=64)
        self.cbamblock21 = CBAM_Block(in_c=64, ratio=8)
        self.residualblock21 = Residual_Block(in_c=64, out_c=64)
        self.maxpool2 = torch.nn.MaxPool2d(kernel_size=2, stride=2)

        # Stage 3
        self.convblock31 = Conv_Block(in_c=64, out_c=128)
        self.cbamblock31 = CBAM_Block(in_c=128, ratio=8)
        self.residualblock31 = Residual_Block(in_c=128, out_c=128)
        self.maxpool3 = torch.nn.MaxPool2d(kernel_size=2, stride=2)

        self.denseblock1 = Dense_Block(in_c=16, out_c=256)
        self.dropout = nn.Dropout(p=0.3)
        self.denseblock2 = Dense_Block(in_c=256, out_c=256)
        self.classifier = nn.Linear(in_features=256, out_features=num_classes)
        

    def forward(self, x):
        y1 = self.maxpool1(self.convblock12(self.convblock11(x)))
        y2 = self.maxpool2(self.residualblock21(self.cbamblock21(self.convblock21(y1))))
        y3 = self.maxpool3(self.residualblock31(self.cbamblock31(self.convblock31(y2))))

        y1 = F.interpolate(y1, size=(4,4), mode='bilinear')
        y2 = F.interpolate(y2, size=(4,4), mode='bilinear')
        y3 = F.interpolate(y3, size=(4,4), mode='bilinear')
        
        y = torch.cat((y1, y2, y3), dim=1)
        y = y.mean(dim=1, keepdim=True)
        y = y.view(y.shape[0], -1)
        y = self.classifier(self.denseblock2(self.dropout(self.denseblock1(y))))
        return y

In [15]:
set_seed(SEED)
model = BrainMRNet(num_classes=2).to(device)

In [32]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Number of parameters: {total_params / 1e6} M")

Number of parameters: 0.550016 M


## Training

In [16]:
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer):
    # Put model in train mode
    model.train()

    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)
        y_pred = y_pred.squeeze() # [1, BATCH_SIZE] -> [BATCH_SIZE]
        
        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)

    warmup_scheduler.step() # Step the learning rate scheduler
    current_lr = warmup_scheduler.get_last_lr()[0]
    
    return train_loss, train_acc

In [17]:
def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module):
    # Put model in eval mode
    model.eval()

    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0
    
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_pred_logits = model(X)
            test_pred_logits = test_pred_logits.squeeze() # [1, BATCH_SIZE] -> [BATCH_SIZE]

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            # Calculate and accumulate accuracy

            y_pred_class = torch.argmax(torch.softmax(test_pred_logits, dim=1), dim=1)
            test_acc += (y_pred_class == y).sum().item()/len(test_pred_logits)

    # Adjust metrics to get average loss and accuracy per batch
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [18]:
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0):
        """
        Args:
            patience (int): How many epochs to wait after the last time the test loss improved.
            min_delta (float): Minimum change in the monitored quantity to qualify as an improvement.
        """
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float('inf')
        self.early_stop = False

    def __call__(self, test_loss):
        if test_loss < self.best_loss - self.min_delta:
            self.best_loss = test_loss
            self.counter = 0  # reset counter if validation loss improves
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

In [19]:
MODEL_SAVE_NAME = 'best_brainmrnet_v1_brain_tumor_mri.pth'

In [20]:
from tqdm.notebook import tqdm

# 1. Take in various parameters required for training and test steps
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs):

    # 2. Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }
    
    best_loss = float('inf')
    early_stopping = EarlyStopping(patience=10, min_delta=0.001)
    
    # 3. Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        test_loss, test_acc = test_step(model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn)

        # 4. Print out what's happening
        print(
            f"Epoch: {epoch} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f} |"
        )

        # 5. Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)
        
        if test_loss < best_loss:
            torch.save(model.state_dict(), MODEL_SAVE_NAME)
            print("model saved named " + MODEL_SAVE_NAME)
            best_loss = test_loss

        early_stopping(test_loss)
        if early_stopping.early_stop:
            print(f"Early stopping at epoch {epoch}")
            break

    # 6. Return the filled results at the end of the epochs
    return results

In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import LambdaLR
import torchvision.models as models

class WarmupLRScheduler:
    def __init__(self, optimizer, warmup_epochs, total_epochs, start_lr=1e-5, max_lr=1e-3):
        """
        Create a learning rate warmup scheduler
        
        Args:
            optimizer (torch.optim.Optimizer): The optimizer
            warmup_epochs (int): Number of warmup epochs
            total_epochs (int): Total number of training epochs
            start_lr (float): Initial learning rate during warmup
            max_lr (float): Maximum learning rate
        """
        self.optimizer = optimizer
        self.warmup_epochs = warmup_epochs
        self.total_epochs = total_epochs
        self.start_lr = start_lr
        self.max_lr = max_lr
        
        # Create lambda function for learning rate scheduling
        self.lr_lambda = self._lr_lambda()
        self.scheduler = LambdaLR(optimizer, lr_lambda=self.lr_lambda)
    
    def _lr_lambda(self):
        def lr_lambda(current_epoch):
            if current_epoch < self.warmup_epochs:
                # Linear warmup
                return self.start_lr / self.max_lr + \
                       (current_epoch / self.warmup_epochs) * (1 - self.start_lr / self.max_lr)
            
            # Cosine annealing after warmup
            progress = (current_epoch - self.warmup_epochs) / (self.total_epochs - self.warmup_epochs)
            return max(0.0, 0.5 * (1.0 + torch.cos(torch.tensor(torch.pi * progress))))
        
        return lr_lambda
    
    def step(self):
        """Perform a scheduler step"""
        self.scheduler.step()
    
    def get_last_lr(self):
        """Get the last computed learning rate"""
        return self.scheduler.get_last_lr()

In [22]:
# loss_fn = torch.nn.CrossEntropyLoss(weight=class_weights)
loss_fn = torch.nn.CrossEntropyLoss()

LR = 1e-3
WEIGHT_DECAY = LR / 10 # 1e-4
optimizer = torch.optim.Adam(params=model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)

In [23]:
# set_seed(SEED)

# Set number of epochs
NUM_EPOCHS = 100

# Create warmup learning rate scheduler
warmup_scheduler = WarmupLRScheduler(
    optimizer, 
    warmup_epochs=5, 
    total_epochs=NUM_EPOCHS,
    start_lr=1e-5,  # Starting very low
    max_lr=LR     # Maximum learning rate
)

# Start the timer
from timeit import default_timer as timer
start_time = timer()


model_0_results = train(model=model,
                        train_dataloader=train_dataloader,
                        test_dataloader=test_dataloader,
                        optimizer=optimizer,
                        loss_fn=loss_fn,
                        epochs=NUM_EPOCHS)


# End the timer and print out how long it took
end_time = timer()
print(f"Total training time: {(end_time-start_time)/60.0:.3f} minutes")
torch.save(model.state_dict(), 'last' + MODEL_SAVE_NAME[4:])

  0%|          | 0/100 [00:00<?, ?it/s]

Epoch: 0 | train_loss: 0.6585 | train_acc: 0.6577 | test_loss: 0.6875 | test_acc: 0.6000 |
model saved named best_brainmrnet_v1_brain_tumor_mri.pth
Epoch: 1 | train_loss: 0.6388 | train_acc: 0.6136 | test_loss: 0.6761 | test_acc: 0.6000 |
model saved named best_brainmrnet_v1_brain_tumor_mri.pth
Epoch: 2 | train_loss: 0.5282 | train_acc: 0.7795 | test_loss: 0.6690 | test_acc: 0.6000 |
model saved named best_brainmrnet_v1_brain_tumor_mri.pth
Epoch: 3 | train_loss: 0.4991 | train_acc: 0.7379 | test_loss: 0.6911 | test_acc: 0.6000 |
Epoch: 4 | train_loss: 0.4703 | train_acc: 0.7379 | test_loss: 0.7413 | test_acc: 0.6000 |
Epoch: 5 | train_loss: 0.3416 | train_acc: 0.8679 | test_loss: 0.8049 | test_acc: 0.6000 |
Epoch: 6 | train_loss: 0.3435 | train_acc: 0.8491 | test_loss: 0.7383 | test_acc: 0.6000 |
Epoch: 7 | train_loss: 0.3117 | train_acc: 0.8835 | test_loss: 0.5979 | test_acc: 0.6400 |
model saved named best_brainmrnet_v1_brain_tumor_mri.pth
Epoch: 8 | train_loss: 0.3082 | train_acc: 0

## Evaluation

In [24]:
checkpoint_file_path = Path("/kaggle/working/" + MODEL_SAVE_NAME)
model.load_state_dict(torch.load(checkpoint_file_path, map_location=device, weights_only=True))

<All keys matched successfully>

In [25]:
import numpy as np

def evaluation_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module):
    # Put model in eval mode
    model.eval()

    all_preds = []
    all_labels = []
    all_probs = []  # To store probabilities for AUC calculation
    
    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)

            test_pred_logits = model(X)
            test_pred_logits = test_pred_logits.squeeze() # [1, BATCH_SIZE] -> [BATCH_SIZE]

            probs = torch.softmax(test_pred_logits, dim=1)
            preds = torch.argmax(probs, dim=1)
            
            # Append predictions and labels to the list         
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())  # For AUC calculation

    return all_preds, all_labels, all_probs

In [26]:
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score

def print_report(y_true, y_pred, y_score, TB_label, TB_class_index):
    accuracy = accuracy_score(y_true, y_pred)
    auc = roc_auc_score(y_true == TB_label, y_score[:, TB_class_index])
#     sensitivity = recall_score(y_true, y_pred, labels=[TB_label], average='macro')
    true_positives = np.sum((all_preds == TB_label) & (all_labels == TB_label))
    false_negatives = np.sum((all_preds != TB_label) & (all_labels == TB_label))
    sensitivity = true_positives / (true_positives + false_negatives) # Sensitivity (Recall for TB class)
    
    # specificity calculation
    true_negatives = np.sum((all_preds != TB_label) & (all_labels != TB_label)) # True Negatives: non-TB cases (Healthy or Sick but Non-TB) predicted as non-TB
    false_positives = np.sum((all_preds == TB_label) & (all_labels != TB_label)) # False Positives: non-TB cases incorrectly predicted as TB
    specificity = true_negatives / (true_negatives + false_positives) # Specificity (Recall for non-TB classes)
    
    ap = precision_score(y_true, y_pred, average='macro')
    ar = recall_score(y_true, y_pred, average='macro')
    
    print(f"accuracy: {accuracy:.4f}")
    print(f"auc: {auc:.4f}")
    print(f"sensitivity: {sensitivity:.4f}")
    print(f"specificity: {specificity:.4f}")
    print(f"average precision: {ap:.4f}")
    print(f"average recall: {ar:.4f}")

In [27]:
TB_label = 1  # TB is represented by label 1 in the dataset
TB_class_index = 1 # TB corresponds to index 1 in the probability outputs

all_preds, all_labels, all_probs = evaluation_step(model, test_dataloader, loss_fn)
# Convert lists to numpy arrays for metric calculations
all_preds = np.array(all_preds)
all_labels = np.array(all_labels)
all_probs = np.array(all_probs)
print_report(y_true=all_labels, y_pred=all_preds, y_score=all_probs, TB_label=TB_label, TB_class_index=TB_class_index)

accuracy: 0.8800
auc: 0.9167
sensitivity: 0.9333
specificity: 0.8000
average precision: 0.8819
average recall: 0.8667
