In [None]:
import cv2
import torch
import numpy as np
import os
import pandas as pd
from collections import Counter
from torch.utils.data import DataLoader
from torchvision import models, transforms
from PIL import Image

In [None]:
from google.colab import drive
drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/Colab Notebooks/DeepLearning_Final_Project_2024')
print(os.listdir('DeepDRiD'))
print(os.path.exists('./model_task_b_resnet18.pth'))
print(os.path.exists('./model_task_b_densenet121.pth'))



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
['sample_submission.csv', 'test.csv', 'train.csv', 'val.csv', 'test', 'train', 'val']
True
True


In [None]:
# Define Dataset Class for Test Set
class TestDataset(torch.utils.data.Dataset):
    def __init__(self, csv_file, image_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.data.iloc[index]['img_path'])
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, self.data.iloc[index]['image_id']  # Return image + ID

# Define Transformations for Test Data
transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# ========== IMAGE PREPROCESSING FUNCTIONS ==========
def ben_graham_preprocess(image):
    """Ben Graham preprocessing (sharpening with Gaussian Blur)."""
    image = np.array(image)
    image = cv2.addWeighted(image, 4, cv2.GaussianBlur(image, (0, 0), 30), -4, 128)
    return Image.fromarray(image)

def circle_crop(image):
    """Applies a circular crop to keep the center and remove background."""
    image = np.array(image)
    height, width, _ = image.shape
    mask = np.zeros((height, width), dtype=np.uint8)
    cv2.circle(mask, (width//2, height//2), min(width, height)//2, 255, -1)
    masked_image = cv2.bitwise_and(image, image, mask=mask)
    return Image.fromarray(masked_image)

def apply_clahe(image):
    """Applies CLAHE (Contrast Limited Adaptive Histogram Equalization)."""
    image = np.array(image)
    lab = cv2.cvtColor(image, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    l = clahe.apply(l)
    enhanced_lab = cv2.merge((l, a, b))
    enhanced_image = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2RGB)
    return Image.fromarray(enhanced_image)

# Define Functions for Ensemble Learning:

# Majority Voting (Max Voting)
def majority_voting(predictions_list):
    """Perform Majority Voting across multiple model predictions."""
    final_predictions = []
    for i in range(len(predictions_list[0])):
        preds = [pred[i] for pred in predictions_list]
        final_predictions.append(Counter(preds).most_common(1)[0][0])
    return np.array(final_predictions)

# Weighted Averaging

def weighted_average(predictions_list, weights):
    """Perform Weighted Averaging of multiple model predictions."""
    predictions_array = np.array(predictions_list)  # Convert list to NumPy array

    if predictions_array.ndim == 1:  # If predictions are 1D, reshape them
        predictions_array = predictions_array.reshape(1, -1)

    final_predictions = np.average(predictions_array, axis=0, weights=weights)
    return np.argmax(final_predictions, axis=0)  # Ensure correct axis


# Load Trained Models
def load_model(model_name, checkpoint_path):
    """Load a trained model from checkpoint, ensuring it matches training architecture."""
    if model_name == "resnet18":
        model = models.resnet18(weights=None)  # No need to load pretrained weights again
        model.fc = torch.nn.Linear(512, 5)  # Match output layer

    elif model_name == "resnet34":
        model = models.resnet34(weights=None)
        model.fc = torch.nn.Linear(512, 5)

    elif model_name == "densenet121":
        model = models.densenet121(weights=None)
        model.classifier = torch.nn.Linear(1024, 5)

    elif model_name == "efficientnet_b0":
        model = models.efficientnet_b0(weights=None)
        model.classifier = torch.nn.Linear(1280, 5)

    elif model_name == "vgg16":
        model = models.vgg16(weights=None)
        model.classifier[6] = torch.nn.Linear(4096, 5)

    else:
        raise ValueError("Invalid model name")

    # 🔹 Load the model state dict
    state_dict = torch.load(checkpoint_path, map_location="cuda" if torch.cuda.is_available() else "cpu")

    # 🔹 Check if the model has a `backbone` (used during training)
    if "backbone.conv1.weight" in state_dict:
        model = torch.nn.Sequential(
            torch.nn.Identity(),  # Dummy layer
            model
        )

    model.load_state_dict(state_dict, strict=False)  # Allow missing keys if necessary
    model.eval()  # Set to evaluation mode
    return model

# Run Predictions
def get_predictions(model, dataloader, device):
    """Generate predictions for test set using a given model."""
    model.to(device)
    all_preds = []

    with torch.no_grad():
        for images, _ in dataloader:
            images = images.to(device)
            outputs = model(images)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())

    return np.array(all_preds)

# Main Script
if __name__ == '__main__':
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Define test dataset & dataloader
    test_dataset = TestDataset('./DeepDRiD/test.csv', './DeepDRiD/test/', transform_test)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Load models
    model_paths = {
        "resnet18": "./model_task_b_resnet18.pth",
        "resnet34": "./model_task_b_resnet34.pth",
        "densenet121": "./model_task_b_densenet121.pth",
        "efficientnet_b0": "./model_task_b_efficientnet_b0.pth",
        "vgg16": "./model_task_b_vgg16.pth"
    }

    models = {name: load_model(name, path) for name, path in model_paths.items()}

    # Generate predictions for each model
    predictions_list = []
    for model_name, model in models.items():
        print(f"Generating predictions for {model_name}...")
        predictions = get_predictions(model, test_loader, device)
        predictions_list.append(predictions)

    # Perform Majority Voting
    majority_preds = majority_voting(predictions_list)

    # Normalize the Kappa Scores
    kappa_scores = [0.8714, 0.8644, 0.8554, 0.8314, 0.8283]  # our best Kappa scores
    normalized_weights = [k / sum(kappa_scores) for k in kappa_scores]
    print('normalized weights',normalized_weights)

    # Perform Weighted Averaging
    #weights = [0.25, 0.25, 0.2, 0.15, 0.15]
    weights = normalized_weights
    weighted_preds = weighted_average(predictions_list, weights)

    # Save Final Predictions
    submission_df = pd.read_csv('./DeepDRiD/sample_submission.csv')
    submission_df['TARGET'] = majority_preds
    submission_df.to_csv('./sample_submission_task_d_majority.csv', index=False)
    print("Saved Majority Voting predictions to sample_submission_task_d_majority.csv")

    submission_df['TARGET'] = weighted_preds
    submission_df.to_csv('./sample_submission_task_d_weighted.csv', index=False)
    print("Saved Weighted Averaging predictions to sample_submission_task_d_weighted.csv")


  state_dict = torch.load(checkpoint_path, map_location="cuda" if torch.cuda.is_available() else "cpu")
  state_dict = torch.load(checkpoint_path, map_location="cuda" if torch.cuda.is_available() else "cpu")
  state_dict = torch.load(checkpoint_path, map_location="cuda" if torch.cuda.is_available() else "cpu")
  state_dict = torch.load(checkpoint_path, map_location="cuda" if torch.cuda.is_available() else "cpu")


Generating predictions for resnet18...
Generating predictions for resnet34...
Generating predictions for densenet121...
Generating predictions for efficientnet_b0...
Generating predictions for vgg16...
normalized weights [0.20499188407160837, 0.20334517396316074, 0.20122797525229955, 0.19558211202333625, 0.19485285468959518]
Saved Majority Voting predictions to sample_submission_task_d_majority.csv
Saved Weighted Averaging predictions to sample_submission_task_d_weighted.csv
