In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import torchvision.models as models

from PIL import Image
import numpy as np
import os

import torch
import torch.nn as nn
import torchvision
from torchvision.transforms import v2

from functions import *
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import recall_score

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device used: {device.type}')

Device used: cpu


In [2]:
# Set seed for reproducibility
seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)

In [3]:
train_size = 0.75                     # In percent
test_size = 1 - train_size           # In percent, calculated dynamically from train_size
batch_size = 16                      # Size of batches

In [4]:
# Create an empty array to store the image arrays and class
X = []
Y = []

# Define the folder paths containing the images
folder_paths = ['Dataset/Non_Demented/', 'Dataset/Very_Mild_Demented/', 'Dataset/Mild_Demented/', 'Dataset/Moderate_Demented/']
classes = [r'Non demented', r'Very mildly demented', r'mild demented', r'moderate demented']

# Loop over the images to save them in the list
for path in folder_paths:
    c = folder_paths.index(path)
    items = os.listdir(path)
    for picture in items:
        file_path = os.path.join(path, picture)
        # Open the image and convert it to a NumPy array
        img = Image.open(file_path)
        array_representation = np.asarray(img)

        # Append the NumPy array to the list
        X.append(array_representation)
        Y.append(c)

# Convert the list of image arrays to a NumPy arrayF
X = np.array(X)

from sklearn.preprocessing import MinMaxScaler # , Normalizer, StandardScaler, RobustScaler
# Transpose to make each image a row

X = X.reshape(X.shape[0], -1)

# Normalize each row (i.e., each flattened image)
X = MinMaxScaler().fit_transform(X)
print(X.max())

# Reshape back to the original shape
X = X.reshape(len(X), 128, 128, 1)

# Dynamically calculate the number of classes in dataset
num_classes = len(np.unique(Y))

X.shape

1.0000000000000002


(6400, 128, 128, 1)

In [5]:
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import v2

# Assuming you have a class named MyDataset for your dataset
class MyDataset(Dataset):
    def __init__(self, X, Y, transform=None):
        self.X = X
        self.Y = Y
        self.transform = transform

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        img = self.X[idx]
        label = self.Y[idx]

        if self.transform:
            img = self.transform(img)

        return img, label

# Define transformations, you can adjust these based on your needs
transform = v2.Compose([
    v2.ToImage(), 
    v2.ToDtype(torch.float32, scale=True)
])

# Create an instance of your dataset
dataset = MyDataset(X=X, Y=Y, transform=transform)

In [6]:
train_dataset, test_dataset = torch.utils.data.random_split(
        dataset, [int(train_size * len(dataset)), len(dataset) - int(train_size * len(dataset))]
    )

train_dataset, val_dataset = torch.utils.data.random_split(
        train_dataset, [int(train_size * len(train_dataset)), len(train_dataset) - int(train_size * len(train_dataset))]
    )

# Define DataLoader for training and test sets
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
validation_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# ResNet 18

In [7]:
class CustomResNet18(nn.Module):
    def __init__(self, num_classes, evaluation_metrics):
        super(CustomResNet18, self).__init__()
        # resnet = models.resnet18(pretrained=True)
        resnet = models.resnet18(weights='ResNet18_Weights.DEFAULT')

        # Change number of input channels
        resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)

        # Remove the fully connected layer
        self.features = nn.Sequential(*list(resnet.children())[:-1])
        # Add a custom fully connected layer
        self.fc = nn.Linear(resnet.fc.in_features, num_classes)

        # Initialize history dict
        self.evaluation_metrics = evaluation_metrics
        self.history = {key: [] for key in evaluation_metrics.keys()}
        self.history_validation = {key: [] for key in evaluation_metrics.keys()}

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x
    
    def evaluate(self, dataloader):
        '''
        Returns the predicted labels in evaluation mode: y_pred, y_true
        '''
        state = False if self.training is False else True
        if state: self.eval()
        y_true = []
        y_pred = []
        with torch.no_grad():  # Disable gradient computation during validation
            for i, (x_minibatch, y_true_batch) in enumerate(dataloader):
                y_pred_batch = F.softmax(self(x_minibatch), dim=1)
                y_true.extend(y_true_batch.tolist())
                y_pred.extend(y_pred_batch.tolist())
        if state: self.train()
        return y_pred, y_true

In [8]:
criterion = nn.CrossEntropyLoss()

# Now define the metrics you want to monitor during training and save them in a dict (Important: All need to take y_pred, y_cls, y_true as input (This order!))
def criterion_function(y_pred, y_cls, y_true):
    return criterion(torch.tensor(y_pred), torch.tensor(y_true))
def accuracy_function(y_pred, y_cls, y_true):
    return accuracy_score(y_true, y_cls)
def recall_function(y_pred, y_cls, y_true):
    return recall_score(y_true, y_cls, average='macro')
metrics = {'loss' : criterion_function, 'acc' : accuracy_function, 'macro recall' : recall_function}

# Define Model
model18 = CustomResNet18(num_classes=4, evaluation_metrics=metrics)

# Now define the loss (criterion), optimizer, lr_scheduler, 
early_stopper = EarlyStopper(model18, patience=5, min_delta=0)
optimizer = torch.optim.Adam(model18.parameters(), lr=0.01)
scheduler = StepLR(optimizer, step_size=5, gamma=0.5)

train_network(model18, train_loader, criterion, optimizer, 70, scheduler, validation_loader, device, early_stopper)

torch.save(model18, 'model_resnet_18.pth')

Device used for training: cpu


KeyboardInterrupt: 

In [None]:
y_pred, y_true = model18.evaluate(test_loader)
y_cls = np.argmax(y_pred, axis=1)
print(classification_report(y_true, y_cls))
print(confusion_matrix(y_true, y_cls))

# ResNet 50

In [None]:
class CustomResNet50(nn.Module):
    def __init__(self, num_classes, evaluation_metrics):
        super(CustomResNet50, self).__init__()
        # resnet = models.resnet50(pretrained=True)
        resnet = models.resnet50(weights='ResNet50_Weights.DEFAULT')

        # Change number of input channels
        resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)

        # Remove the fully connected layer
        self.features = nn.Sequential(*list(resnet.children())[:-1])
        # Add a custom fully connected layer
        self.fc = nn.Linear(resnet.fc.in_features, num_classes)

        # Initialize history dict
        self.evaluation_metrics = evaluation_metrics
        self.history = {key: [] for key in evaluation_metrics.keys()}
        self.history_validation = {key: [] for key in evaluation_metrics.keys()}

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x
    
    def evaluate(self, dataloader):
        '''
        Returns the predicted labels in evaluation mode: y_pred, y_true
        '''
        state = False if self.training is False else True
        if state: self.eval()
        y_true = []
        y_pred = []
        with torch.no_grad():  # Disable gradient computation during validation
            for i, (x_minibatch, y_true_batch) in enumerate(dataloader):
                y_pred_batch = F.softmax(self(x_minibatch), dim=1)
                y_true.extend(y_true_batch.tolist())
                y_pred.extend(y_pred_batch.tolist())
        if state: self.train()
        return y_pred, y_true

In [None]:
criterion = nn.CrossEntropyLoss()

# Now define the metrics you want to monitor during training and save them in a dict (Important: All need to take y_pred, y_cls, y_true as input (This order!))
def criterion_function(y_pred, y_cls, y_true):
    return criterion(torch.tensor(y_pred), torch.tensor(y_true))
def accuracy_function(y_pred, y_cls, y_true):
    return accuracy_score(y_true, y_cls)
def recall_function(y_pred, y_cls, y_true):
    return recall_score(y_true, y_cls, average='macro')
metrics = {'loss' : criterion_function, 'acc' : accuracy_function, 'macro recall' : recall_function}

# Define Model
model50 = CustomResNet50(num_classes=4, evaluation_metrics=metrics)

# Now define the loss (criterion), optimizer, lr_scheduler, 
early_stopper = EarlyStopper(model50, patience=5, min_delta=0)
optimizer = torch.optim.Adam(model50.parameters(), lr=0.01)
scheduler = StepLR(optimizer, step_size=5, gamma=0.5)

train_network(model50, train_loader, criterion, optimizer, 70, scheduler, validation_loader, device, early_stopper)

torch.save(model50, 'model_resnet_50.pth')

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /home/toby_linux/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:30<00:00, 3.37MB/s]


Device used for training: cpu
Epoch [1/1]  loss: 1.2698, acc: 0.4635, macro recall: 0.2500


In [None]:
y_pred, y_true = model50.evaluate(test_loader)
y_cls = np.argmax(y_pred, axis=1)
print(classification_report(y_true, y_cls))
print(confusion_matrix(y_true, y_cls))

# GoogleNet

In [None]:
class CustomGoogleNet(nn.Module):
    def __init__(self, num_classes, evaluation_metrics):
        super(CustomGoogleNet, self).__init__()
        # googlenet = models.googlenet(pretrained=True)
        googlenet = models.googlenet(weights='GoogLeNet_Weights.DEFAULT')

        # Change number of input channels
        googlenet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)

        # Remove the fully connected layer
        self.features = nn.Sequential(*list(googlenet.children())[:-1])
        # Add a custom fully connected layer
        self.fc = nn.Linear(googlenet.fc.in_features, num_classes)

        # Initialize history dict
        self.evaluation_metrics = evaluation_metrics
        self.history = {key: [] for key in evaluation_metrics.keys()}
        self.history_validation = {key: [] for key in evaluation_metrics.keys()}

    def forward(self, x):
        # Ensure input has spatial dimensions (e.g., [batch_size, channels, height, width])
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

    def evaluate(self, dataloader):
        '''
        Returns the predicted labels in evaluation mode: y_pred, y_true
        '''
        state = False if self.training is False else True
        if state: self.eval()
        y_true = []
        y_pred = []
        with torch.no_grad():  # Disable gradient computation during validation
            for i, (x_minibatch, y_true_batch) in enumerate(dataloader):
                y_pred_batch = F.softmax(self(x_minibatch), dim=1)
                y_true.extend(y_true_batch.tolist())
                y_pred.extend(y_pred_batch.tolist())
        if state: self.train()
        return y_pred, y_true

In [None]:
criterion = nn.CrossEntropyLoss()

# Now define the metrics you want to monitor during training and save them in a dict (Important: All need to take y_pred, y_cls, y_true as input (This order!))
def criterion_function(y_pred, y_cls, y_true):
    return criterion(torch.tensor(y_pred), torch.tensor(y_true))
def accuracy_function(y_pred, y_cls, y_true):
    return accuracy_score(y_true, y_cls)
def recall_function(y_pred, y_cls, y_true):
    return recall_score(y_true, y_cls, average='macro')
metrics = {'loss' : criterion_function, 'acc' : accuracy_function, 'macro recall' : recall_function}

# Define Model
model_googlenet = CustomGoogleNet(num_classes=4, evaluation_metrics=metrics)

# Now define the loss (criterion), optimizer, lr_scheduler, 
early_stopper = EarlyStopper(model_googlenet, patience=5, min_delta=0)
optimizer = torch.optim.Adam(model_googlenet.parameters(), lr=0.01)
scheduler = StepLR(optimizer, step_size=5, gamma=0.5)

train_network(model_googlenet, train_loader, criterion, optimizer, 70, scheduler, validation_loader, device, early_stopper)

torch.save(model_googlenet, 'model_googlenet.pth')

Device used for training: cpu
Epoch [1/1]  loss: 1.2801, acc: 0.4635, macro recall: 0.2500


In [None]:
y_pred, y_true = model_googlenet.evaluate(test_loader)
y_cls = np.argmax(y_pred, axis=1)
print(classification_report(y_true, y_cls))
print(confusion_matrix(y_true, y_cls))