In [1]:
import copy
import os
import random
import sys
from collections import Counter

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.metrics import cohen_kappa_score, precision_score, recall_score, accuracy_score
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from torchvision.transforms.functional import to_pil_image
from tqdm import tqdm
import cv2

In [2]:
# Hyper Parameters
batch_size = 100
num_classes = 5  # 5 DR levels
learning_rate = 0.0002
num_epochs = 50

In [3]:

import torch.utils
import torch.utils.data


class DeepDRiD(Dataset):
    def __init__(self, ann_file, image_dir, transform=None, mode='dual', test=False):
        self.ann_file = ann_file
        self.image_dir = image_dir
        self.transform = transform

        self.test = test
        self.mode = mode

        if self.mode == 'single':
            self.data = self.load_data()
        else:
            self.data = self.load_data_dual()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        if self.mode == 'single':
            return self.get_item(index)
        else:
            return self.get_item_dual(index)

    # 1. single image
    def load_data(self):
        df = pd.read_csv(self.ann_file)

        data = []
        for _, row in df.iterrows():
            file_info = dict()
            file_info['img_path'] = os.path.join(
                self.image_dir, row['img_path'])
            if not self.test:
                file_info['dr_level'] = int(row['patient_DR_Level'])
            data.append(file_info)
        return data

    def get_item(self, index):
        data = self.data[index]
        img = Image.open(data['img_path']).convert('RGB')
        if self.transform:
            img = self.transform(img)

        if not self.test:
            label = torch.tensor(data['dr_level'], dtype=torch.int64)
            return img, label
        else:
            return img

    # 2. dual image
    def load_data_dual(self):
        df = pd.read_csv(self.ann_file)

        df['prefix'] = df['image_id'].str.split(
            '_').str[0]  # The patient id of each image
        df['suffix'] = df['image_id'].str.split(
            '_').str[1].str[0]  # The left or right eye
        grouped = df.groupby(['prefix', 'suffix'])

        data = []
        for (prefix, suffix), group in grouped:
            file_info = dict()
            file_info['img_path1'] = os.path.join(
                self.image_dir, group.iloc[0]['img_path'])
            file_info['img_path2'] = os.path.join(
                self.image_dir, group.iloc[1]['img_path'])
            if not self.test:
                file_info['dr_level'] = int(group.iloc[0]['patient_DR_Level'])
            data.append(file_info)
        return data

    def get_item_dual(self, index):
        data = self.data[index]
        img1 = Image.open(data['img_path1']).convert('RGB')
        img2 = Image.open(data['img_path2']).convert('RGB')

        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        if not self.test:
            label = torch.tensor(data['dr_level'], dtype=torch.int64)
            return [img1, img2], label
        else:
            return [img1, img2]


def create_weighted_sampler_for_dataset(dataset):
    num_samples = len(dataset)
    labels = [dataset.__getitem__(i)[1].item() for i in range(num_samples)]
    
    c = Counter(labels)
    class_counts = list(dict(sorted(c.items(), key = lambda i: i[0])).values())
    print(class_counts)
    class_weights = [num_samples / class_counts[i] for i in range(len(class_counts))]
    weights = [class_weights[labels[i]] for i in range(num_samples)]

    return torch.utils.data.WeightedRandomSampler(torch.DoubleTensor(weights), max(class_counts) * len(class_counts))

In [4]:

class APTOS2019(Dataset):
    def __init__(self, ann_file, image_dir, transform=None, mode='dual'):
        self.ann_file = ann_file
        self.image_dir = image_dir
        self.transform = transform
        self.mode = mode

        self.data = self.load_data()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        if self.mode == 'single':
            return self.get_item(index)
        else:
            return self.get_item_dual(index)

    def load_data(self):
        df = pd.read_csv(self.ann_file)

        data = []
        for _, row in df.iterrows():
            file_info = dict()
            if self.mode == 'single':
                file_info['img_path'] = os.path.join(
                    self.image_dir, row['id_code']) + '.png'
            else:
                file_info['img_path1'] = os.path.join(
                    self.image_dir, row['id_code']) + '.png'
                file_info['img_path2'] = file_info['img_path1']
            file_info['dr_level'] = int(row['diagnosis'])
            data.append(file_info)
        return data

    def get_item(self, index):
        data = self.data[index]
        img = Image.open(data['img_path']).convert('RGB')
        if self.transform:
            img = self.transform(img)

        label = torch.tensor(data['dr_level'], dtype=torch.int64)
        return img, label

    def get_item_dual(self, index):
        data = self.data[index]
        img1 = Image.open(data['img_path1']).convert('RGB')
        img2 = Image.open(data['img_path2']).convert('RGB')

        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        label = torch.tensor(data['dr_level'], dtype=torch.int64)
        return [img1, img2], label


In [5]:
class CutOut(object):
    def __init__(self, mask_size, p=0.5):
        self.mask_size = mask_size
        self.p = p

    def __call__(self, img):
        if np.random.rand() > self.p:
            return img

        # Ensure the image is a tensor
        if not isinstance(img, torch.Tensor):
            raise TypeError('Input image must be a torch.Tensor')

        # Get height and width of the image
        h, w = img.shape[1], img.shape[2]
        mask_size_half = self.mask_size // 2
        offset = 1 if self.mask_size % 2 == 0 else 0

        cx = np.random.randint(mask_size_half, w + offset - mask_size_half)
        cy = np.random.randint(mask_size_half, h + offset - mask_size_half)

        xmin, xmax = cx - mask_size_half, cx + mask_size_half + offset
        ymin, ymax = cy - mask_size_half, cy + mask_size_half + offset
        xmin, xmax = max(0, xmin), min(w, xmax)
        ymin, ymax = max(0, ymin), min(h, ymax)

        img[:, ymin:ymax, xmin:xmax] = 0
        return img


class SLORandomPad:
    def __init__(self, size):
        self.size = size

    def __call__(self, img):
        pad_width = max(0, self.size[0] - img.width)
        pad_height = max(0, self.size[1] - img.height)
        pad_left = random.randint(0, pad_width)
        pad_top = random.randint(0, pad_height)
        pad_right = pad_width - pad_left
        pad_bottom = pad_height - pad_top
        return transforms.functional.pad(img, (pad_left, pad_top, pad_right, pad_bottom))


class FundRandomRotate:
    def __init__(self, prob, degree):
        self.prob = prob
        self.degree = degree

    def __call__(self, img):
        if random.random() < self.prob:
            angle = random.uniform(-self.degree, self.degree)
            return transforms.functional.rotate(img, angle)
        return img

In [6]:

transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    #transforms.RandomCrop((210, 210)),
    #SLORandomPad((224, 224)),
    FundRandomRotate(prob=0.5, degree=30),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.ColorJitter(brightness=(0.1, 0.9)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [7]:
def train_model(model, train_loader, val_loader, device, criterion, optimizer, lr_scheduler, num_epochs=25,
                checkpoint_path='model.pth'):
    best_model = model.state_dict()
    best_epoch = None
    best_val_kappa = -1.0  # Initialize the best kappa score

    train_accuracies = []
    train_losses = []
    val_accuracies = []
    val_losses = []

    for epoch in range(1, num_epochs + 1):
        print(f'\nEpoch {epoch}/{num_epochs}')
        running_loss = []
        all_preds = []
        all_labels = []

        model.train()

        with tqdm(total=len(train_loader), desc=f'Training', unit=' batch', file=sys.stdout) as pbar:
            for images, labels in train_loader:
                if not isinstance(images, list):
                    images = images.to(device)  # single image case
                else:
                    images = [x.to(device) for x in images]  # dual images case

                labels = labels.to(device)

                optimizer.zero_grad()

                outputs = model(images)
                loss = criterion(outputs, labels.long())

                loss.backward()
                optimizer.step()

                preds = torch.argmax(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

                running_loss.append(loss.item())

                pbar.set_postfix(
                    {'lr': f'{optimizer.param_groups[0]["lr"]:.1e}', 'Loss': f'{loss.item():.4f}'})
                pbar.update(1)

        lr_scheduler.step()

        epoch_loss = sum(running_loss) / len(running_loss)

        train_metrics = compute_metrics(all_preds, all_labels, per_class=True)
        kappa, accuracy, precision, recall = train_metrics[:4]

        print(f'[Train] Kappa: {kappa:.4f} Accuracy: {accuracy:.4f} '
              f'Precision: {precision:.4f} Recall: {recall:.4f} Loss: {epoch_loss:.4f}')

        if len(train_metrics) > 4:
            precision_per_class, recall_per_class = train_metrics[4:]
            for i, (precision, recall) in enumerate(zip(precision_per_class, recall_per_class)):
                print(
                    f'[Train] Class {i}: Precision: {precision:.4f}, Recall: {recall:.4f}')

        # Evaluation on the validation set at the end of each epoch
        val_metrics, val_loss = evaluate_model(model, val_loader, device, criterion)
        val_kappa, val_accuracy, val_precision, val_recall = val_metrics[:4]
        print(f'[Val] Kappa: {val_kappa:.4f} Accuracy: {val_accuracy:.4f} '
              f'Precision: {val_precision:.4f} Recall: {val_recall:.4f} Loss: {val_loss:.4f}')

        if val_kappa > best_val_kappa:
            best_val_kappa = val_kappa
            best_epoch = epoch
            best_model = model.state_dict()
            torch.save(best_model, checkpoint_path)

        print(f'[Val] Best kappa: {best_val_kappa:.4f}, Epoch {best_epoch}')

        train_accuracies.append(accuracy)
        train_losses.append(epoch_loss)
        val_accuracies.append(val_accuracy)
        val_losses.append(val_loss)

    return model, train_accuracies, val_accuracies, train_losses, val_losses


def evaluate_model(model, test_loader, device, criterion, test_only=False, prediction_path='./test_predictions.csv'):
    model.eval()

    all_preds = []
    all_labels = []
    all_image_ids = []
    all_losses = []

    with tqdm(total=len(test_loader), desc=f'Evaluating', unit=' batch', file=sys.stdout) as pbar:
        for i, data in enumerate(test_loader):

            if test_only:
                images = data
            else:
                images, labels = data

            if not isinstance(images, list):
                images = images.to(device)  # single image case
            else:
                images = [x.to(device) for x in images]  # dual images case

            with torch.no_grad():
                outputs = model(images)
                if not test_only:
                    all_losses.append(criterion(outputs.cpu(), labels.long()))
                preds = torch.argmax(outputs, 1)

            if not isinstance(images, list):
                # single image case
                all_preds.extend(preds.cpu().numpy())
                image_ids = [
                    os.path.basename(test_loader.dataset.data[idx]['img_path']) for idx in
                    range(i * test_loader.batch_size, i *
                          test_loader.batch_size + len(images))
                ]
                all_image_ids.extend(image_ids)
                if not test_only:
                    all_labels.extend(labels.numpy())
            else:
                # dual images case
                for k in range(2):
                    all_preds.extend(preds.cpu().numpy())
                    image_ids = [
                        os.path.basename(test_loader.dataset.data[idx][f'img_path{k + 1}']) for idx in
                        range(i * test_loader.batch_size, i *
                              test_loader.batch_size + len(images[k]))
                    ]
                    all_image_ids.extend(image_ids)
                    if not test_only:
                        all_labels.extend(labels.numpy())

            pbar.update(1)

    # Save predictions to csv file for Kaggle online evaluation
    if test_only:
        df = pd.DataFrame({
            'ID': all_image_ids,
            'TARGET': all_preds
        })
        df.to_csv(prediction_path, index=False)
        print(f'[Test] Save predictions to {os.path.abspath(prediction_path)}')
    else:
        metrics = compute_metrics(all_preds, all_labels)
        return metrics, sum(all_losses)/len(all_losses)


def compute_metrics(preds, labels, per_class=False):
    kappa = cohen_kappa_score(labels, preds, weights='quadratic')
    accuracy = accuracy_score(labels, preds)
    precision = precision_score(
        labels, preds, average='weighted', zero_division=0)
    recall = recall_score(labels, preds, average='weighted', zero_division=0)

    # Calculate and print precision and recall for each class
    if per_class:
        precision_per_class = precision_score(
            labels, preds, average=None, zero_division=0)
        recall_per_class = recall_score(
            labels, preds, average=None, zero_division=0)
        return kappa, accuracy, precision, recall, precision_per_class, recall_per_class

    return kappa, accuracy, precision, recall

In [8]:

# Use GPU device is possible
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)

backbone = models.resnet18(pretrained=True)

backbone_output_dim = 512


class MyDualModel(nn.Module):
    def __init__(self, num_classes=5, dropout_rate=0.6, attention=False, custom_backbone = None):
        super().__init__()

        self.use_attention = attention

        _backbone = backbone
        _backbone.fc = nn.Identity()
        
        if custom_backbone is not None:
            _backbone.load_state_dict(custom_backbone)

        # Here the two backbones will have the same structure but unshared weights
        self.backbone1 = copy.deepcopy(_backbone)
        self.backbone2 = copy.deepcopy(_backbone)

        fc_input_dim = backbone_output_dim*2

        if self.use_attention == True:

            embed_dim = 512

            self.q_embd = nn.Linear(backbone_output_dim*2, embed_dim)
            self.k_embd = nn.Linear(backbone_output_dim*2, embed_dim)
            self.v_embd = nn.Linear(backbone_output_dim*2, embed_dim)

            self.att = nn.MultiheadAttention(embed_dim, 2, 0.2, batch_first=True)
            #self.norm = nn.LayerNorm(embed_dim)

            fc_input_dim = embed_dim


        self.fc = nn.Sequential(
            nn.Linear(fc_input_dim, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, num_classes)
        )

    def forward(self, images):
        image1, image2 = images

        x1 = self.backbone1(image1)
        x2 = self.backbone2(image2)

        x = torch.cat((x1, x2), dim=1)

        if self.use_attention == True:
            q = self.q_embd(x)
            k = self.k_embd(x)
            v = self.v_embd(x)

            x = self.att(q, k, v, need_weights=False)[0]
            #x = self.norm(x)

        x = self.fc(x)
        return x
    
    def freeze(self, doFreeze):
        for p in self.backbone1.parameters(True):
            p.requires_grad = not doFreeze
        for p in self.backbone2.parameters(True):
            p.requires_grad = not doFreeze


class StackingModel(nn.Module):
    def __init__(self, ensemble_models: list[nn.Module], num_classes=5, dropout_rate=0.6):
        super().__init__()

        self.ensemble_models = copy.deepcopy(ensemble_models)

        for model in self.ensemble_models:
            for p in model.parameters(True):
                p.requires_grad = False

        self.n_models = len(ensemble_models)

        self.meta_model = nn.Sequential(
            nn.Linear(self.n_models * num_classes, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(128, num_classes)
        )

    def forward(self, images):

        meta_inputs = [model(images) for model in self.ensemble_models]
        meta_inputs = torch.cat(meta_inputs, dim=1)
        
        x = self.meta_model(meta_inputs)
        return x
    
    
class VotingModel(nn.Module):
    def __init__(self, ensemble_models: list[nn.Module]):
        super().__init__()

        self.ensemble_models = copy.deepcopy(ensemble_models)

        for model in self.ensemble_models:
            for p in model.parameters(True):
                p.requires_grad = False

        self.n_models = len(ensemble_models)

    def forward(self, images):

        ensemble_outputs = [model(images) for model in self.ensemble_models]

        ensemble_outputs = torch.stack(ensemble_outputs, dim=0)

        votes = torch.argmax(ensemble_outputs, dim=2).T

        most_voted = torch.zeros_like(ensemble_outputs[0])
        
        for i, sample_votes in enumerate(votes):
            max_voted = Counter(sample_votes).most_common(1)[0][0].item()
            most_voted[i, max_voted] = 1.0

        return most_voted

Device: cuda




In [9]:
# Choose between 'single image' and 'dual images' pipeline
# This will affect the model definition, dataset pipeline, training and evaluation

#mode = 'single'  # forward single image to the model each time
mode = 'dual'  # forward two images of the same eye to the model and fuse the features

assert mode in ('dual')

print('Pipeline Mode:', mode)

# Create datasets
drid_train_dataset = DeepDRiD(
	'./DeepDRiD/train.csv', 
	'./DeepDRiD/train/', 
	transform_train, mode)
drid_val_dataset = DeepDRiD(
	'./DeepDRiD/val.csv', 
	'./DeepDRiD/val/', 
	transform_test, mode)
drid_test_dataset = DeepDRiD(
	'./DeepDRiD/test.csv', 
	'./DeepDRiD/test/', 
	transform_test, mode, test=True)

aptos_train_dataset = APTOS2019(
	'./APTOS-2019/train_1.csv', 
	'./APTOS-2019/train_images/train_images/', 
	transform_train, mode)
aptos_val_dataset = APTOS2019(
	'./APTOS-2019/valid.csv', 
	'./APTOS-2019/val_images/val_images/', 
	transform_test, mode)

print(len(drid_train_dataset))
print(len(aptos_train_dataset))

drid_train_sampler = create_weighted_sampler_for_dataset(drid_train_dataset)

# Create dataloaders
drid_train_loader = DataLoader(
	drid_train_dataset, batch_size=batch_size, sampler=drid_train_sampler)
drid_val_loader = DataLoader(drid_val_dataset, batch_size=batch_size, shuffle=False)
drid_test_loader = DataLoader(
	drid_test_dataset, batch_size=batch_size, shuffle=False)

aptos_train_sampler = create_weighted_sampler_for_dataset(aptos_train_dataset)
aptos_train_loader = DataLoader(
	aptos_train_dataset, batch_size=batch_size, sampler=aptos_train_sampler)
aptos_val_loader = DataLoader(aptos_val_dataset, batch_size=batch_size, shuffle=False)

# Define the weighted CrossEntropyLoss
criterion = nn.CrossEntropyLoss()


def print_stats(train_acc, val_acc, train_losses, val_losses, grad_cam_img = None):
    f = plt.figure(figsize=(20, 5))
    f.add_subplot(1, 2, 1)
    plt.title("Accuracy per epoch")
    plt.plot(np.arange(len(train_acc)), train_acc, color='blue', label='Training')
    plt.plot(np.arange(len(val_acc)), val_acc, color='red', label='Validation')
    plt.legend()
    f.add_subplot(1, 2, 2)
    plt.title("Loss per epoch")
    plt.plot(np.arange(len(train_losses)), train_losses, color='blue', label='Training')
    plt.plot(np.arange(len(val_losses)), val_losses, color='red', label='Validation')
    plt.legend()
    f.show()

    if grad_cam_img is None:
        return
    
    f2 = plt.figure(figsize=(10, 10))
    plt.imshow(grad_cam_img)
    f2.show()

    
"""
Source:
https://medium.com/@codetrade/grad-cam-in-pytorch-a-powerful-tool-for-visualize-explanations-from-deep-networks-bdc7caf0b282
"""
def grad_cam(model, image_path):
    model.eval()
    img = Image.open(image_path).convert('RGB')
    img_tensor = transform_test(img).unsqueeze(0)
    img_tensor = img_tensor.cpu()

    layer = model.backbone1.layer4
    activations = []
    gradients = []

    def forward_hook(module, input, output):
        activations.append(output)
    
    def backward_hook(module, grad_input, grad_output):
        gradients.append(grad_output[0])

    layer.register_forward_hook(forward_hook)
    layer.register_full_backward_hook(backward_hook)

    output = model([img_tensor, img_tensor])
    pred = output.argmax(dim=1).item()

    model.zero_grad()
    output[:, pred].backward()
    
    weights = torch.mean(gradients[0], dim=[0, 2, 3])
    for i in range(activations[0].size()[1]):
        activations[0][:, i, :, :] *= weights[i]
    heatmap = torch.mean(activations[0], dim=1).squeeze()
    heatmap = np.maximum(heatmap.cpu().detach().numpy(), 0)
    heatmap /= np.max(heatmap)
    heatmap = cv2.resize(heatmap, (img.size[1], img.size[0]))
    heatmap = cv2.applyColorMap(np.uint8(255 * heatmap), cv2.COLORMAP_JET)

    return Image.blend(img, to_pil_image(heatmap, mode='RGB'), 0.4)

Pipeline Mode: dual
600
2930
[180, 120, 120, 120, 60]
[1434, 300, 808, 154, 234]


In [10]:
def task_a():

    model = MyDualModel()

    #print(model, '\n')
    
    # Move class weights to the device
    model = model.to(device)

    # Optimizer and Learning rate scheduler
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(
        optimizer, step_size=10, gamma=0.25)

    # Train and evaluate the model with the training and validation set
    model, train_acc, val_acc, train_losses, val_losses = train_model(
        model, drid_train_loader, drid_val_loader, device, criterion, optimizer,
        lr_scheduler=lr_scheduler, num_epochs=num_epochs,
        checkpoint_path='./model_a.pth'
    )

    # Load the pretrained checkpoint
    state_dict = torch.load('./model_a.pth', map_location='cpu')
    model.load_state_dict(state_dict, strict=True)

    # Make predictions on testing set and save the prediction results
    evaluate_model(model, drid_test_loader, device, criterion, test_only=True, prediction_path='./test_predictions_a.csv')

    model = model.to('cpu')
    img_path = drid_train_dataset.data[0]['img_path1']
    grad_cam_img = grad_cam(model, img_path)

    print_stats(train_acc, val_acc, train_losses, val_losses, grad_cam_img)

#task_a()

In [11]:
def task_b():
    
    aptos_finetune = False

    if aptos_finetune:
    
        model = MyDualModel()

        # Move class weights to the device
        model = model.to(device)

        model.freeze(False)

        # Optimizer and Learning rate scheduler
        optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)
        lr_scheduler = torch.optim.lr_scheduler.StepLR(
            optimizer, step_size=10, gamma=0.5)

        # Train and evaluate the model with the training and validation set
        model, train_acc, val_acc, train_losses, val_losses = train_model(
            model, aptos_train_loader, aptos_val_loader, device, criterion, optimizer,
            lr_scheduler=lr_scheduler, num_epochs=num_epochs,
            checkpoint_path='./model_b_aptos.pth'
        )
        
        print_stats(train_acc, val_acc, train_losses, val_losses)

        # Load the pretrained checkpoint
        state_dict = torch.load('./model_b_aptos.pth', map_location=device)
        model.load_state_dict(state_dict, strict=True)

        torch.save(model.backbone1.state_dict(), './ResNet18_aptos_pretrained.pth')

    backbone = torch.load('./ResNet18_aptos_pretrained.pth')

    model = MyDualModel(custom_backbone=backbone)
    
    # We leave all layers unfreezed for DeepDRiD fine-tuning too
    #model.freeze(True)
    model.freeze(False)

    model = model.to(device)

    # Optimizer and Learning rate scheduler
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(
        optimizer, step_size=10, gamma=0.5)

    # Train and evaluate the model with the training and validation set
    model, train_acc, val_acc, train_losses, val_losses = train_model(
        model, drid_train_loader, drid_val_loader, device, criterion, optimizer,
        lr_scheduler=lr_scheduler, num_epochs=num_epochs,
        checkpoint_path='./model_b.pth'
    )

    # Load the pretrained checkpoint
    state_dict = torch.load('./model_b.pth', map_location='cpu')
    model.load_state_dict(state_dict, strict=True)

    # Make predictions on testing set and save the prediction results
    evaluate_model(model, drid_test_loader, device, criterion, test_only=True, prediction_path='./test_predictions_b.csv')
    
    model = model.to('cpu')
    img_path = drid_train_dataset.data[0]['img_path1']
    grad_cam_img = grad_cam(model, img_path)

    print_stats(train_acc, val_acc, train_losses, val_losses, grad_cam_img)

#task_b()

In [12]:
def task_c():

    model = MyDualModel(attention = True)

    #print(model, '\n')
    
    # Move class weights to the device
    model = model.to(device)

    # Optimizer and Learning rate scheduler
    optimizer = torch.optim.Adam(params=model.parameters(), lr=0.0001)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(
        optimizer, step_size=10, gamma=0.5)

    # Train and evaluate the model with the training and validation set
    model, train_acc, val_acc, train_losses, val_losses = train_model(
        model, drid_train_loader, drid_val_loader, device, criterion, optimizer,
        lr_scheduler=lr_scheduler, num_epochs=50,
        checkpoint_path='./model_c.pth'
    )

    # Load the pretrained checkpoint
    state_dict = torch.load('./model_c.pth', map_location='cpu')
    model.load_state_dict(state_dict, strict=True)

    # Make predictions on testing set and save the prediction results
    evaluate_model(model, drid_test_loader, device, criterion, test_only=True, prediction_path='./test_predictions_c.csv')

    model = model.to('cpu')
    img_path = drid_train_dataset.data[0]['img_path1']
    grad_cam_img = grad_cam(model, img_path)

    print_stats(train_acc, val_acc, train_losses, val_losses, grad_cam_img)

#task_c()

In [13]:
model_paths = ['./model_b1.pth',
                './model_b2.pth',
                './model_b3.pth']

models = [MyDualModel() for i in range(len(model_paths))]

for i, model_path in enumerate(model_paths):
    models[i].load_state_dict(torch.load(model_path, map_location='cpu'), strict=True)

def task_d_stacking():

    model = StackingModel(models)

    #print(model, '\n')
    
    # Move class weights to the device
    model = model.to(device)
    for _model in model.ensemble_models:
        _model.to(device)

    # Optimizer and Learning rate scheduler
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(
        optimizer, step_size=10, gamma=0.5)

    # Train and evaluate the model with the training and validation set
    model, train_acc, val_acc, train_losses, val_losses = train_model(
        model, drid_train_loader, drid_val_loader, device, criterion, optimizer,
        lr_scheduler=lr_scheduler, num_epochs=num_epochs,
        checkpoint_path='./model_d.pth'
    )

    # Load the pretrained checkpoint
    state_dict = torch.load('./model_d.pth', map_location='cpu')
    model.load_state_dict(state_dict, strict=True)

    # Make predictions on testing set and save the prediction results
    evaluate_model(model, drid_test_loader, device, criterion, test_only=True, prediction_path='./test_predictions_d_stacking.csv')

    print_stats(train_acc, val_acc, train_losses, val_losses)

    
def task_d_voting():
    model = VotingModel(models)
    
    model = model.to(device)
    for _model in model.ensemble_models:
        _model.to(device)
    val_metrics, val_loss = evaluate_model(model, drid_val_loader, device, criterion)
    val_kappa, val_accuracy, val_precision, val_recall = val_metrics[:4]
    print(f'[Val] Kappa: {val_kappa:.4f} Accuracy: {val_accuracy:.4f} '
            f'Precision: {val_precision:.4f} Recall: {val_recall:.4f} Loss: {val_loss:.4f}')
    evaluate_model(model, drid_test_loader, device, criterion, test_only=True, prediction_path='./test_predictions_d_voting.csv')


#task_d_stacking()
task_d_voting()


Evaluating: 100%|██████████| 2/2 [00:02<00:00,  1.18s/ batch]
[Val] Kappa: 0.6658 Accuracy: 0.5400 Precision: 0.5628 Recall: 0.5400 Loss: 1.3648
Evaluating: 100%|██████████| 2/2 [00:02<00:00,  1.04s/ batch]
[Test] Save predictions to c:\Users\mk473\source\repos\deeplearning\project\test_predictions_d_voting.csv
