In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install torchio seaborn --quiet

# Import Necessary Libraries

In [None]:
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import torchio as tio
import matplotlib.pyplot as plt
import os
from os import getcwd
from pathlib import Path
import torchio
from torch.utils.data import DataLoader
from sklearn.metrics import roc_curve, auc, classification_report, confusion_matrix
from sklearn.preprocessing import label_binarize
import seaborn as sns
import matplotlib.pyplot as plt

# Load the Dataset 

In [None]:
class R2Plus1dStem4MRI(nn.Module):
    def __init__(self, num_classes=3):  # num_classes = 3 (HGG, LGG, IXI)
        super(R2Plus1dStem4MRI, self).__init__()
        self.stem = nn.Sequential(
            nn.Conv3d(1, 45, kernel_size=(1, 7, 7),
                      stride=(1, 2, 2), padding=(0, 3, 3),
                      bias=False),
            nn.BatchNorm3d(45),
            nn.ReLU(inplace=True),

            nn.Conv3d(45, 64, kernel_size=(3, 1, 1),
                      stride=(1, 1, 1), padding=(1, 0, 0),
                      bias=False),
            nn.BatchNorm3d(64),
            nn.ReLU(inplace=True)
        )
        self.global_pool = nn.AdaptiveAvgPool3d(1)  # Output shape will be [batch_size, 64, 1, 1, 1]
        # Fully connected layer to output class scores
        self.fc = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.stem(x)
        x = self.global_pool(x)  # Reduce to [batch_size, 64, 1, 1, 1]
        x = x.view(x.size(0), -1)  # Flatten to [batch_size, 64]
        x = self.fc(x)  # Output shape will be [batch_size, num_classes]
        return x

# Define the dataset
class Datasets:
    def __init__(self, dataset_path, ixi_path, target_shape=(240, 240, 155)):
        self.dataset_path = dataset_path
        self.ixi_path = ixi_path
        self.target_shape = target_shape
        self.transform = tio.CropOrPad(target_shape)

    def brats(self):
        HGG = []  # High-grade glioma samples
        for path, _, files in os.walk(os.path.join(self.dataset_path, 'MICCAI_BraTS_2019_Data_Training/MICCAI_BraTS_2019_Data_Training/HGG/')):
            for file in files:
                if file.endswith('_t1ce.nii'):
                    img_path = Path(path + '/' + file)
                    print(f"Loading HGG sample: {img_path}")
                    subject = tio.Subject(t1=tio.ScalarImage(img_path), label=1)
                    subject = self.transform(subject)
                    HGG.append(subject)
        
        LGG = []  # Low-grade glioma samples
        for path, _, files in os.walk(os.path.join(self.dataset_path, 'MICCAI_BraTS_2019_Data_Training/MICCAI_BraTS_2019_Data_Training/LGG/')):
            for file in files:
                if file.endswith('_t1ce.nii'):
                    img_path = Path(path + '/' + file)
                    print(f"Loading LGG sample: {img_path}")
                    subject = tio.Subject(t1=tio.ScalarImage(img_path), label=0)
                    subject = self.transform(subject)
                    LGG.append(subject)

        brats = HGG+LGG
        
        return brats  # Combine HGG and LGG
    
    def ixi(self):
         ixi = [] # Healthy samples
         for path, _, files in os.walk(self.ixi_path):
            for file in files:
                if file.endswith('.nii'):
                    img_path = Path(path + '/' + file)
                    print(f"Loading IXI sample: {img_path}")
                    subject = tio.Subject(t1=tio.ScalarImage(img_path), label=2)
                    subject = self.transform(subject)
                    ixi.append(subject)
         return ixi
        

    def return_total_samples(self):
        return self.ixi() + self.brats()

# Data loader preparation
def get_data_loaders(dataset, batch_size=1):
    subjects_dataset = tio.SubjectsDataset(dataset.return_total_samples())
    data_loader = DataLoader(subjects_dataset, batch_size=batch_size, shuffle=True)
    return data_loader

# Training loop for 5 samples
def train_model(model, data_loader, num_samples=1, device='cuda'):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    
    # Track the losses
    loss_values = []

    model.train()
    for i, batch in enumerate(data_loader):
        if i >= num_samples:
            break
        inputs = batch['t1'][tio.DATA].to(device).float()  # Convert input to FloatTensor
        labels = batch['label'].to(device).long()  # Ensure labels are LongTensor for classification
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Store the loss
        loss_values.append(loss.item())

        # Print loss for each sample
        print(f'Sample {i + 1}/{num_samples}, Loss: {loss.item():.4f}')
        
        # Visualize slices from this batch
        visualize_slices(inputs.cpu().detach())

# Visualization function remains the same
def visualize_slices(inputs):
    # inputs should be in the shape [batch_size, channels, depth, height, width]
    batch_size, channels, depth, height, width = inputs.shape
    for i in range(depth):
        plt.imshow(inputs[0, 0, i, :, :], cmap='gray')
        plt.title(f'Slice {i + 1}')
        plt.show()

# Initialize the dataset and dataloader
dataset_path_1 = '/kaggle/input/miccaibrats2019'
dataset_path_2 = '/kaggle/input/ixi-t1'
dataset = Datasets(dataset_path_1, dataset_path_2)
data_loader = get_data_loaders(dataset, batch_size=1)

# Initialize and train the model
model = R2Plus1dStem4MRI(num_classes=3)
train_model(model, data_loader, num_samples=1, device='cuda')

In [None]:
class Datasets:
    def __init__(self, dataset_path):
        self.dataset_path = dataset_path

    def brats(self):
        HGG = []  # High-grade glioma samples
        for path, _, files in os.walk(os.path.join(self.dataset_path, 'MICCAI_BraTS_2019_Data_Training/MICCAI_BraTS_2019_Data_Training/HGG/')):
            for file in files:
                if file.endswith('_t1ce.nii'):
                    img_path = Path(path + '/' + file)
                    print(f"Loading HGG sample: {img_path}")  # Print the image path
                    HGG.append(tio.Subject(t1=tio.ScalarImage(img_path), label=1,))
        
        LGG = []  # Low-grade glioma samples
        for path, _, files in os.walk(os.path.join(self.dataset_path, 'MICCAI_BraTS_2019_Data_Training/MICCAI_BraTS_2019_Data_Training/LGG/')):
            for file in files:
                if file.endswith('_t1ce.nii'):
                    img_path = Path(path + '/' + file)
                    LGG.append(tio.Subject(t1=tio.ScalarImage(img_path), label=0,))

        brats = HGG+LGG
        
        return brats

    def ixi(self):
        ixi = []  # Healthy samples
        for path, currentDirectory, files in os.walk(os.path.join(self.dataset_path, '/kaggle/input/ixit2')):
            for file in files:
                if file.endswith('.nii'):
                    img_path = Path(path + '/' + file)
                    ixi.append(tio.Subject(t1=tio.ScalarImage(img_path), label=2,))
        return ixi

    def return_total_samples(self):
        return self.brats()+self.ixi()

# ResNet(2+1)D Architecture

In [None]:
import torch.nn as nn
from torch.nn import Sequential

class R2Plus1dStem4MRI(nn.Sequential):
    def __init__(self, num_classes=3):  # 3 classes: HGG, LGG, Healthy
        super(R2Plus1dStem4MRI, self).__init__()
        self.stem = nn.Sequential(
            nn.Conv3d(1, 45, kernel_size=(1, 7, 7),
                      stride=(1, 2, 2), padding=(0, 3, 3), bias=False),
            nn.BatchNorm3d(45),
            nn.ReLU(inplace=True),

            nn.Conv3d(45, 64, kernel_size=(3, 1, 1),
                      stride=(1, 1, 1), padding=(1, 0, 0), bias=False),
            nn.BatchNorm3d(64),
            nn.ReLU(inplace=True)
        )

        self.global_pool = nn.AdaptiveAvgPool3d(1)  # Global pooling
        self.fc = nn.Linear(64, num_classes)  # Fully connected for class scores

    def forward(self, x):
        x = self.stem(x)
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc(x)
        return x

# ResNet3D Architecture

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# 3D Basic Block
class BasicBlock3D(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock3D, self).__init__()
        self.conv1 = nn.Conv3d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv3d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm3d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out


# 3D ResNet Model
class ResNet3D(nn.Module):
    def __init__(self, block, layers, num_classes=3):
        super(ResNet3D, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv3d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm3d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool3d(1)
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv3d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm3d(out_channels)
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for _ in range(1, num_blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x


# Instantiate the model (e.g., ResNet-18)
def resnet3d(num_classes=3):
    return ResNet3D(BasicBlock3D, [2, 2, 2, 2], num_classes)


# Example usage:
model = resnet3d(num_classes=3)
print(model)

# Model Training for ResNet(2+1)D

In [None]:
def get_data_loaders(dataset, batch_size=2):
    subjects_dataset = tio.SubjectsDataset(dataset.return_total_samples())
    return DataLoader(subjects_dataset, batch_size=batch_size, shuffle=True)

In [None]:
def train_model(model, data_loader, num_epochs=10, device='cuda'):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    loss_values = []
    all_preds = []
    all_labels = []
    all_outputs = []

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        model.train()
        epoch_loss = 0.0

        for batch_idx, batch in enumerate(data_loader):
            inputs = batch['t1'][tio.DATA].to(device).float()
            labels = batch['label'].to(device).long()
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            labels = labels.cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels)
            all_outputs.append(outputs.detach().cpu().numpy())

            print(f"Batch {batch_idx + 1}/{len(data_loader)}, Loss: {loss.item():.4f}")

        avg_loss = epoch_loss / len(data_loader)
        print(f"Epoch {epoch + 1} Loss: {avg_loss:.4f}")
        loss_values.append(avg_loss)

    torch.save(model.state_dict(), '/kaggle/working/r2plus1d_mri_model.pth')
    print("Model saved as 'r2plus1d_mri_model.pth'")

    # Evaluation
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=['LGG', 'HGG', 'Healthy']))
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['LGG', 'HGG', 'Healthy'], yticklabels=['LGG', 'HGG', 'Healthy'])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

    plt.figure()
    plt.plot(loss_values, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss Over Epochs')
    plt.legend()
    plt.show()

    all_outputs = np.concatenate(all_outputs, axis=0)  # Convert list of arrays to a single array
    all_labels_one_hot = label_binarize(all_labels, classes=[0, 1, 2])  # One-hot encode labels for ROC calculation

    fpr = {}
    tpr = {}
    roc_auc = {}
    for i in range(3):  # Loop over classes (0: LGG, 1: HGG, 2: Healthy)
        fpr[i], tpr[i], _ = roc_curve(all_labels_one_hot[:, i], all_outputs[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Micro-average ROC curve
    fpr['micro'], tpr['micro'], _ = roc_curve(all_labels_one_hot.ravel(), all_outputs.ravel())
    roc_auc['micro'] = auc(fpr['micro'], tpr['micro'])

    # Macro-average ROC curve
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(3)]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(3):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= 3
    fpr['macro'] = all_fpr
    tpr['macro'] = mean_tpr
    roc_auc['macro'] = auc(fpr['macro'], tpr['macro'])

    plt.figure()
    plt.plot(fpr['micro'], tpr['micro'], label=f'micro-average ROC curve (area = {roc_auc["micro"]:.2f})', linestyle=':')
    plt.plot(fpr['macro'], tpr['macro'], label=f'macro-average ROC curve (area = {roc_auc["macro"]:.2f})', linestyle=':')

    for i, class_name in enumerate(['LGG', 'HGG', 'Healthy']):
        plt.plot(fpr[i], tpr[i], label=f'Class {class_name} (area = {roc_auc[i]:.2f})')

    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc='lower right')
    plt.grid()
    plt.show()

In [None]:
# Initialize and train the model
model = R2Plus1dStem4MRI(num_classes=3)
train_model(model, data_loader, num_epochs=3, device='cuda')

# Model Training for ResNet3D

In [None]:
def train_model3(model, data_loader, num_epochs=5, device='cuda'):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    loss_values = []
    all_preds = []
    all_labels = []
    all_outputs = []

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        model.train()
        epoch_loss = 0.0

        for batch_idx, batch in enumerate(data_loader):
            inputs = batch['t1'][tio.DATA].to(device).float()
            labels = batch['label'].to(device).long()
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            labels = labels.cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels)
            all_outputs.append(outputs.detach().cpu().numpy())

            print(f"Batch {batch_idx + 1}/{len(data_loader)}, Loss: {loss.item():.4f}")

        avg_loss = epoch_loss / len(data_loader)
        print(f"Epoch {epoch + 1} Loss: {avg_loss:.4f}")
        loss_values.append(avg_loss)

    torch.save(model.state_dict(), '/kaggle/working/r3d_mri_model.pth')
    print("Model saved as 'r3d_mri_model.pth'")

    # Evaluation
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=['LGG', 'HGG', 'Healthy']))
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['LGG', 'HGG', 'Healthy'], yticklabels=['LGG', 'HGG', 'Healthy'])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

    plt.figure()
    plt.plot(loss_values, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss Over Epochs')
    plt.legend()
    plt.show()
    
    all_outputs = np.concatenate(all_outputs, axis=0)  # Convert list of arrays to a single array
    all_labels_one_hot = label_binarize(all_labels, classes=[0, 1, 2])  # One-hot encode labels for ROC calculation

    fpr = {}
    tpr = {}
    roc_auc = {}
    for i in range(3):  # Loop over classes (0: LGG, 1: HGG, 2: Healthy)
        fpr[i], tpr[i], _ = roc_curve(all_labels_one_hot[:, i], all_outputs[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Micro-average ROC curve
    fpr['micro'], tpr['micro'], _ = roc_curve(all_labels_one_hot.ravel(), all_outputs.ravel())
    roc_auc['micro'] = auc(fpr['micro'], tpr['micro'])

    # Macro-average ROC curve
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(3)]))
    mean_tpr = np.zeros_like(all_fpr)
    for i in range(3):
        mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
    mean_tpr /= 3
    fpr['macro'] = all_fpr
    tpr['macro'] = mean_tpr
    roc_auc['macro'] = auc(fpr['macro'], tpr['macro'])

    plt.figure()
    plt.plot(fpr['micro'], tpr['micro'], label=f'micro-average ROC curve (area = {roc_auc["micro"]:.2f})', linestyle=':')
    plt.plot(fpr['macro'], tpr['macro'], label=f'macro-average ROC curve (area = {roc_auc["macro"]:.2f})', linestyle=':')

    for i, class_name in enumerate(['LGG', 'HGG', 'Healthy']):
        plt.plot(fpr[i], tpr[i], label=f'Class {class_name} (area = {roc_auc[i]:.2f})')

    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc='lower right')
    plt.grid()
    plt.show()

In [None]:
# Initialize and train the model
model = resnet3d(num_classes=3)
train_model3(model, data_loader, num_epochs=3, device='cuda')