In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import copy
import os
import random
import sys

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from PIL import Image
from sklearn.metrics import cohen_kappa_score, precision_score, recall_score, accuracy_score
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from torchvision.transforms.functional import to_pil_image
from tqdm import tqdm

# Hyper Parameters
batch_size = 24
num_classes = 5  # 5 DR levels
learning_rate = 0.001
num_epochs = 20


class RetinopathyDataset(Dataset):
    def __init__(self, ann_file, image_dir, transform=None, mode='single', test=False):
        self.ann_file = ann_file
        self.image_dir = image_dir
        self.transform = transform

        self.test = test
        self.mode = mode

        if self.mode == 'single':
            self.data = self.load_data()
        else:
            self.data = self.load_data_dual()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        if self.mode == 'single':
            return self.get_item(index)
        else:
            return self.get_item_dual(index)

    # 1. single image
    def load_data(self):
        df = pd.read_csv(self.ann_file)

        data = []
        for _, row in df.iterrows():
            file_info = dict()
            file_info['img_path'] = os.path.join(self.image_dir, row['img_path'])
            if not self.test:
                file_info['dr_level'] = int(row['patient_DR_Level'])
            data.append(file_info)
        return data

    def get_item(self, index):
        data = self.data[index]
        img = Image.open(data['img_path']).convert('RGB')
        if self.transform:
            img = self.transform(img)

        if not self.test:
            label = torch.tensor(data['dr_level'], dtype=torch.int64)
            return img, label
        else:
            return img

    # 2. dual image
    def load_data_dual(self):
        df = pd.read_csv(self.ann_file)

        df['prefix'] = df['image_id'].str.split('_').str[0]  # The patient id of each image
        df['suffix'] = df['image_id'].str.split('_').str[1].str[0]  # The left or right eye
        grouped = df.groupby(['prefix', 'suffix'])

        data = []
        for (prefix, suffix), group in grouped:
            file_info = dict()
            file_info['img_path1'] = os.path.join(self.image_dir, group.iloc[0]['img_path'])
            file_info['img_path2'] = os.path.join(self.image_dir, group.iloc[1]['img_path'])
            if not self.test:
                file_info['dr_level'] = int(group.iloc[0]['patient_DR_Level'])
            data.append(file_info)
        return data

    def get_item_dual(self, index):
        data = self.data[index]
        img1 = Image.open(data['img_path1']).convert('RGB')
        img2 = Image.open(data['img_path2']).convert('RGB')

        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        if not self.test:
            label = torch.tensor(data['dr_level'], dtype=torch.int64)
            return [img1, img2], label
        else:
            return [img1, img2]


class CutOut(object):
    def __init__(self, mask_size, p=0.5):
        self.mask_size = mask_size
        self.p = p

    def __call__(self, img):
        if np.random.rand() > self.p:
            return img

        # Ensure the image is a tensor
        if not isinstance(img, torch.Tensor):
            raise TypeError('Input image must be a torch.Tensor')

        # Get height and width of the image
        h, w = img.shape[1], img.shape[2]
        mask_size_half = self.mask_size // 2
        offset = 1 if self.mask_size % 2 == 0 else 0

        cx = np.random.randint(mask_size_half, w + offset - mask_size_half)
        cy = np.random.randint(mask_size_half, h + offset - mask_size_half)

        xmin, xmax = cx - mask_size_half, cx + mask_size_half + offset
        ymin, ymax = cy - mask_size_half, cy + mask_size_half + offset
        xmin, xmax = max(0, xmin), min(w, xmax)
        ymin, ymax = max(0, ymin), min(h, ymax)

        img[:, ymin:ymax, xmin:xmax] = 0
        return img


class SLORandomPad:
    def __init__(self, size):
        self.size = size

    def __call__(self, img):
        pad_width = max(0, self.size[0] - img.width)
        pad_height = max(0, self.size[1] - img.height)
        pad_left = random.randint(0, pad_width)
        pad_top = random.randint(0, pad_height)
        pad_right = pad_width - pad_left
        pad_bottom = pad_height - pad_top
        return transforms.functional.pad(img, (pad_left, pad_top, pad_right, pad_bottom))


class FundRandomRotate:
    def __init__(self, prob, degree):
        self.prob = prob
        self.degree = degree

    def __call__(self, img):
        if random.random() < self.prob:
            angle = random.uniform(-self.degree, self.degree)
            return transforms.functional.rotate(img, angle)
        return img


transform_train = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop((210, 210)),
    SLORandomPad((224, 224)),
    FundRandomRotate(prob=0.5, degree=30),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.ColorJitter(brightness=(0.1, 0.9)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


def train_model(model, train_loader, val_loader, device, criterion, optimizer, lr_scheduler, num_epochs=25,
                checkpoint_path='model.pth'):
    best_model = model.state_dict()
    best_epoch = None
    best_val_kappa = -1.0  # Initialize the best kappa score

    for epoch in range(1, num_epochs + 1):
        print(f'\nEpoch {epoch}/{num_epochs}')
        running_loss = []
        all_preds = []
        all_labels = []

        model.train()

        with tqdm(total=len(train_loader), desc=f'Training', unit=' batch', file=sys.stdout) as pbar:
            for images, labels in train_loader:
                if not isinstance(images, list):
                    images = images.to(device)  # single image case
                else:
                    images = [x.to(device) for x in images]  # dual images case

                labels = labels.to(device)

                optimizer.zero_grad()

                outputs = model(images)
                loss = criterion(outputs, labels.long())

                loss.backward()
                optimizer.step()

                preds = torch.argmax(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

                running_loss.append(loss.item())

                pbar.set_postfix({'lr': f'{optimizer.param_groups[0]["lr"]:.1e}', 'Loss': f'{loss.item():.4f}'})
                pbar.update(1)

        lr_scheduler.step()

        epoch_loss = sum(running_loss) / len(running_loss)

        train_metrics = compute_metrics(all_preds, all_labels, per_class=True)
        kappa, accuracy, precision, recall = train_metrics[:4]

        print(f'[Train] Kappa: {kappa:.4f} Accuracy: {accuracy:.4f} '
              f'Precision: {precision:.4f} Recall: {recall:.4f} Loss: {epoch_loss:.4f}')

        if len(train_metrics) > 4:
            precision_per_class, recall_per_class = train_metrics[4:]
            for i, (precision, recall) in enumerate(zip(precision_per_class, recall_per_class)):
                print(f'[Train] Class {i}: Precision: {precision:.4f}, Recall: {recall:.4f}')

        # Evaluation on the validation set at the end of each epoch
        val_metrics = evaluate_model(model, val_loader, device)
        val_kappa, val_accuracy, val_precision, val_recall = val_metrics[:4]
        print(f'[Val] Kappa: {val_kappa:.4f} Accuracy: {val_accuracy:.4f} '
              f'Precision: {val_precision:.4f} Recall: {val_recall:.4f}')

        if val_kappa > best_val_kappa:
            best_val_kappa = val_kappa
            best_epoch = epoch
            best_model = model.state_dict()
            torch.save(best_model, checkpoint_path)

    print(f'[Val] Best kappa: {best_val_kappa:.4f}, Epoch {best_epoch}')

    return model


def evaluate_model(model, test_loader, device, test_only=False, prediction_path='./test_predictions.csv'):
    model.eval()

    all_preds = []
    all_labels = []
    all_image_ids = []

    with tqdm(total=len(test_loader), desc=f'Evaluating', unit=' batch', file=sys.stdout) as pbar:
        for i, data in enumerate(test_loader):

            if test_only:
                images = data
            else:
                images, labels = data

            if not isinstance(images, list):
                images = images.to(device)  # single image case
            else:
                images = [x.to(device) for x in images]  # dual images case

            with torch.no_grad():
                outputs = model(images)
                preds = torch.argmax(outputs, 1)

            if not isinstance(images, list):
                # single image case
                all_preds.extend(preds.cpu().numpy())
                image_ids = [
                    os.path.basename(test_loader.dataset.data[idx]['img_path']) for idx in
                    range(i * test_loader.batch_size, i * test_loader.batch_size + len(images))
                ]
                all_image_ids.extend(image_ids)
                if not test_only:
                    all_labels.extend(labels.numpy())
            else:
                # dual images case
                for k in range(2):
                    all_preds.extend(preds.cpu().numpy())
                    image_ids = [
                        os.path.basename(test_loader.dataset.data[idx][f'img_path{k + 1}']) for idx in
                        range(i * test_loader.batch_size, i * test_loader.batch_size + len(images[k]))
                    ]
                    all_image_ids.extend(image_ids)
                    if not test_only:
                        all_labels.extend(labels.numpy())

            pbar.update(1)

    # Save predictions to csv file for Kaggle online evaluation
    if test_only:
        df = pd.DataFrame({
            'ID': all_image_ids,
            'TARGET': all_preds
        })
        df.to_csv(prediction_path, index=False)
        print(f'[Test] Save predictions to {os.path.abspath(prediction_path)}')
    else:
        metrics = compute_metrics(all_preds, all_labels)
        return metrics


def compute_metrics(preds, labels, per_class=False):
    kappa = cohen_kappa_score(labels, preds, weights='quadratic')
    accuracy = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, average='weighted', zero_division=0)
    recall = recall_score(labels, preds, average='weighted', zero_division=0)

    # Calculate and print precision and recall for each class
    if per_class:
        precision_per_class = precision_score(labels, preds, average=None, zero_division=0)
        recall_per_class = recall_score(labels, preds, average=None, zero_division=0)
        return kappa, accuracy, precision, recall, precision_per_class, recall_per_class

    return kappa, accuracy, precision, recall


class MyModel(nn.Module):
    def __init__(self, num_classes=5, dropout_rate=0.5):
        super().__init__()

        self.backbone = models.resnet18(pretrained=True)
        self.backbone.fc = nn.Identity()  # Remove the original classification layer

        self.fc = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.backbone(x)
        x = self.fc(x)
        return x, self.backbone.layer4


class MyDualModel(nn.Module):
    def __init__(self, num_classes=5, dropout_rate=0.5):
        super().__init__()

        backbone = models.resnet18(pretrained=True)
        backbone.fc = nn.Identity()

        # Here the two backbones will have the same structure but unshared weights
        self.backbone1 = copy.deepcopy(backbone)
        self.backbone2 = copy.deepcopy(backbone)

        self.fc = nn.Sequential(
            nn.Linear(512 * 2, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(128, num_classes)
        )

    def forward(self, images):
        image1, image2 = images

        x1 = self.backbone1(image1)
        x2 = self.backbone2(image2)

        x = torch.cat((x1, x2), dim=1)
        x = self.fc(x)
        return x




In [4]:
from torchvision import models, transforms

class CustomResnet34(nn.Module):
    def __init__(self, num_classes=5, dropout_rate=0.3):
        super().__init__()

        #self.backbone = models.resnet18(pretrained=True)
        #state_dict = torch.load('pretrained_DR_resize/pretrained/resnet18.pth', map_location='cpu')
        model_path = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/pretrained_DR_resize/pretrained/resnet34.pth'

        self.backbone = models.resnet34(pretrained=False)
        state_dict = torch.load(model_path, map_location='cpu')

        info = self.backbone.load_state_dict(state_dict, strict=False)
        print('missing keys:', info[0])  # The missing fc or classifier layer is normal here
        print('unexpected keys:', info[1])

        self.backbone.fc = nn.Identity()  # Remove the original classification layer

        self.fc = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.backbone(x)
        x = self.fc(x)
        return x



class CustomDensenet121(nn.Module):
    def __init__(self, num_classes=5, dropout_rate=0.3):
        super().__init__()

        # Define the backbone using DenseNet121
        self.backbone = models.densenet121(pretrained=False)  # Use pretrained=False as custom weights are loaded
        model_path = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/pretrained_DR_resize/pretrained/densenet121.pth'

        # Load the custom pretrained weights
        state_dict = torch.load(model_path, map_location='cpu')
        info = self.backbone.load_state_dict(state_dict, strict=False)
        print('missing keys:', info.missing_keys)  # For PyTorch >= 1.9.0, use `.missing_keys`
        print('unexpected keys:', info.unexpected_keys)

        # Replace the original classifier
        self.backbone.classifier = nn.Identity()  # DenseNet uses `classifier` for its classification head

        # Define custom classification head
        self.fc = nn.Sequential(
            nn.Linear(1024, 256),  # Output from DenseNet121 backbone is 1024 features
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.backbone(x)  # Pass through DenseNet121 backbone
        x = self.fc(x)        # Pass through custom fully connected layers
        return x


class CustomEficientnet(nn.Module):
    def __init__(self, num_classes=5, dropout_rate=0.3):
        super().__init__()

        # Define the backbone using EfficientNet-B0
        model_path = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/pretrained_DR_resize/pretrained/efficientnet_b0.pth'
        self.backbone = models.efficientnet_b0(pretrained=False)  # Use pretrained=False as custom weights are loaded

        # Load the custom pretrained weights
        state_dict = torch.load(model_path, map_location='cpu')
        info = self.backbone.load_state_dict(state_dict, strict=False)
        print('missing keys:', info.missing_keys)  # For PyTorch >= 1.9.0, use `.missing_keys`
        print('unexpected keys:', info.unexpected_keys)

        # Replace the original classification layer
        self.backbone.classifier = nn.Identity()  # EfficientNet uses `classifier` instead of `fc`

        # Define custom classification head
        self.fc = nn.Sequential(
            nn.Linear(1280, 256),  # Output from EfficientNet-B0 backbone is 1280 features
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout_rate),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.backbone(x)  # Pass through EfficientNet backbone
        x = self.fc(x)        # Pass through custom fully connected layers
        return x


In [10]:
def weighted_average_ensemble(models, weights, dataloader):
    final_predictions = []

    for batch in dataloader:
        # Unpack the batch
        if isinstance(batch, (list, tuple)) and len(batch) == 2:
            inputs, _ = batch  # If labels are present
        else:
            inputs = batch  # If labels are absent (test mode)

        inputs = inputs.to(device)  # Ensure inputs are on the correct device
        predictions = []

        for model, weight in zip(models, weights):
            model.eval()  # Set model to evaluation mode
            with torch.no_grad():
                outputs = model(inputs)  # Get outputs
                probabilities = torch.softmax(outputs, dim=1)  # Convert to probabilities
                predictions.append(probabilities * weight)  # Apply weight

        # Combine predictions from all models
        combined = torch.sum(torch.stack(predictions), dim=0)  # Weighted sum of predictions
        final_predictions.append(combined.argmax(dim=1).cpu().numpy())  # Get class indices

    return np.concatenate(final_predictions)

In [14]:
import torch
import numpy as np
import pandas as pd
import os
from tqdm import tqdm

def weighted_average_ensemble(models, weights, dataloader, device, prediction_path='/kaggle/working/test_predictions.csv'):
    final_predictions = []
    all_image_ids = []

    with tqdm(total=len(dataloader), desc=f'Evaluating', unit=' batch') as pbar:
        for i, batch in enumerate(dataloader):
            # Unpack the batch
            if isinstance(batch, (list, tuple)) and len(batch) == 2:
                inputs, _ = batch  # If labels are present
            else:
                inputs = batch  # If labels are absent (test mode)

            inputs = inputs.to(device)  # Ensure inputs are on the correct device
            predictions = []

            for model, weight in zip(models, weights):
                model.eval()  # Set model to evaluation mode
                with torch.no_grad():
                    outputs = model(inputs)  # Get outputs
                    probabilities = torch.softmax(outputs, dim=1)  # Convert to probabilities
                    predictions.append(probabilities * weight)  # Apply weight

            # Combine predictions from all models (weighted sum)
            combined = torch.sum(torch.stack(predictions), dim=0)  # Weighted sum of predictions
            final_preds = combined.argmax(dim=1).cpu().numpy()  # Get class indices

            # Append predictions and image IDs
            final_predictions.extend(final_preds)

            # Collect image IDs for the current batch
            image_ids = [
                os.path.basename(dataloader.dataset.data[idx]['img_path']) for idx in
                range(i * dataloader.batch_size, i * dataloader.batch_size + len(inputs))
            ]
            all_image_ids.extend(image_ids)

            pbar.update(1)

    # Save predictions to CSV file for testing purposes
    df = pd.DataFrame({
        'ID': all_image_ids,
        'TARGET': final_predictions
    })
    df.to_csv(prediction_path, index=False)
    print(f'[Test] Saved predictions to {os.path.abspath(prediction_path)}')

    return final_predictions


In [5]:
batch_size = 24
num_classes = 5  # 5 DR levels
learning_rate = 0.0001
num_epochs = 20
mode = 'single'

assert mode in ('single', 'dual')

# Define the model
custom_resnet34 = CustomResnet34(num_classes=5, dropout_rate=0.3)
custom_efficientnet = CustomEficientnet(num_classes=5, dropout_rate=0.3)
custom_densenet = CustomDensenet121(num_classes=5, dropout_rate=0.3)

# Load the trained weights
custom_resnet34_weight_path = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/Part D/Weighted Ensemble/Resnet34.pth'
custom_efficientnet_weight_path = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/Part D/Weighted Ensemble/Efficientnet.pth'
custom_densenet_weight_path = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/Part D/Weighted Ensemble/Densenet121.pth'


state_dict1 = torch.load(custom_resnet34_weight_path, map_location='cpu')
custom_resnet34.load_state_dict(state_dict1, strict=True)

state_dict2 = torch.load(custom_efficientnet_weight_path, map_location='cpu')
custom_efficientnet.load_state_dict(state_dict2, strict=True)

state_dict3 = torch.load(custom_densenet_weight_path, map_location='cpu')
custom_densenet.load_state_dict(state_dict3, strict=True)

  state_dict = torch.load(model_path, map_location='cpu')


missing keys: ['fc.weight', 'fc.bias']
unexpected keys: []


  state_dict = torch.load(model_path, map_location='cpu')


missing keys: ['classifier.1.weight', 'classifier.1.bias']
unexpected keys: []


  state_dict = torch.load(model_path, map_location='cpu')


missing keys: ['classifier.weight', 'classifier.bias']
unexpected keys: []


  state_dict1 = torch.load(custom_resnet34_weight_path, map_location='cpu')
  state_dict2 = torch.load(custom_efficientnet_weight_path, map_location='cpu')
  state_dict3 = torch.load(custom_densenet_weight_path, map_location='cpu')


<All keys matched successfully>

In [16]:
ensemble_models = [custom_densenet, custom_efficientnet, custom_resnet34]

weights = [0.4, 0.3, 0.4]  # Model weights based on performance

test_csv_path = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/DeepDRiD/test.csv'
test_data_path = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/DeepDRiD/test/'

test_dataset = RetinopathyDataset(test_csv_path, test_data_path, transform_test, mode='single', test=True)
test_loader = DataLoader(test_dataset, batch_size=24, shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)

# Move models to the correct device
for model in ensemble_models:
    model.to(device)

prediction_paths = '/content/drive/MyDrive/Colab Notebooks/Deep Learning/Part D/Weighted Ensemble/weighted_ensemble_prediction.csv'
final_predictions = weighted_average_ensemble(ensemble_models, weights, test_loader, device, prediction_path = prediction_paths)

Device: cuda


Evaluating: 100%|██████████| 17/17 [00:06<00:00,  2.72 batch/s]

[Test] Saved predictions to /content/drive/MyDrive/Colab Notebooks/Deep Learning/Part D/Weighted Ensemble/weighted_ensemble_prediction.csv





In [13]:
print(final_predictions)

[4 1 4 4 3 3 3 4 3 3 3 3 3 3 3 3 3 2 3 2 4 4 3 4 2 3 4 4 3 2 3 3 3 3 4 4 0
 2 0 0 3 3 3 3 3 3 3 3 4 4 1 1 2 2 2 3 3 3 4 4 3 3 3 3 3 2 3 2 4 3 3 4 3 3
 3 3 3 3 3 4 0 0 0 0 3 3 1 2 0 0 0 0 0 0 0 0 0 0 0 0 2 1 2 3 0 0 0 0 0 0 0
 0 3 3 3 3 2 3 2 2 3 3 2 2 0 0 0 0 3 2 2 1 0 0 0 0 2 2 3 3 2 2 1 1 0 0 0 0
 0 0 0 0 2 3 4 2 2 2 2 1 2 2 1 2 2 2 3 2 3 4 3 3 3 3 3 3 0 0 0 0 3 3 3 3 1
 1 1 1 0 0 0 0 0 0 0 0 3 3 2 2 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 2 2 2 0 0
 0 0 3 2 3 2 1 1 1 1 2 3 2 2 0 0 0 0 1 1 1 1 0 0 0 0 0 2 0 0 0 0 0 0 2 2 2
 2 2 2 2 2 0 0 0 0 3 3 2 3 0 0 0 0 2 2 2 0 1 1 1 1 1 1 1 1 1 1 1 1 3 3 2 1
 1 1 1 1 1 1 1 2 0 0 0 0 1 2 1 2 0 0 0 0 0 0 0 0 3 1 4 4 0 0 0 0 0 0 0 0 0
 0 0 0 1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 2 3 3 0 0
 0 0 0 0 0 0 0 0 0 0 1 1 1 2 0 0 0 0 2 2 2 3 2 2 1 2 0 0 0 0]
