In [1]:
# installing the necessary environment variables
%pip install torch torchvision opencv-python albumentations efficientnet_pytorch kagglehub pandas scikit-learn matplotlib simplejson

Note: you may need to restart the kernel to use updated packages.


In [None]:
# importing required libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torchvision.models as models
from efficientnet_pytorch import EfficientNet
import cv2
import json
import numpy as np
import pandas as pd
import os
from PIL import Image
import kagglehub

In [None]:
# Data preparation
# PyTorch dataset for loading both G1020 and ORIGA data

class GlaucomaDataset(Dataset):
    def __init__(self, image_paths, mask_paths, labels, transform=None, mask_transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.labels = labels
        self.transform = transform
        self.mask_transform = mask_transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Load the image
        image = cv2.imread(self.image_paths[idx])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Load the corresponding mask
        mask = cv2.imread(self.mask_paths[idx], cv2.IMREAD_GRAYSCALE)
        if mask is None:
            raise FileNotFoundError(f"Mask not found at: {self.mask_paths[idx]}")

        # Apply transformations to the image
        if self.transform:
            image = self.transform(image)

        # Apply transformations to the mask
        if self.mask_transform:
            mask = self.mask_transform(mask)
        else:
            mask = torch.tensor(mask, dtype=torch.float32).unsqueeze(0)

        # Get the label
        label = torch.tensor(self.labels[idx], dtype=torch.float32)

        return image, mask, label

# Define the transformations for images and masks (same as before)
image_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])

mask_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

# Load G1020 dataset using kagglehub
# https://www.kaggle.com/datasets/arnavjain1/glaucoma-datasets
path = kagglehub.dataset_download("arnavjain1/glaucoma-datasets")

# Set up paths for G1020
image_dir_g1020 = os.path.join(path, 'G1020', 'Images_Square')
mask_dir_g1020 = os.path.join(path, 'G1020', 'Masks_Square')
csv_file_g1020 = os.path.join(path, 'G1020', 'G1020.csv')

# Load G1020 CSV
df_g1020 = pd.read_csv(csv_file_g1020)
image_paths_g1020 = [os.path.join(image_dir_g1020, img_name) for img_name in df_g1020['imageID']]
mask_paths_g1020 = [os.path.join(mask_dir_g1020, img_name.replace('.jpg', '.png')) for img_name in df_g1020['imageID']]
labels_g1020 = df_g1020['binaryLabels'].values

# Load ORIGA dataset
image_dir_origa = os.path.join(path, 'ORIGA', 'Images_Square')
mask_dir_origa = os.path.join(path, 'ORIGA', 'Masks_Square')
csv_file_origa = os.path.join(path, 'ORIGA', 'OrigaList.csv')

# Load ORIGA CSV
df_origa = pd.read_csv(csv_file_origa)
image_paths_origa = [os.path.join(image_dir_origa, img_name) for img_name in df_origa['Filename']]
mask_paths_origa = [os.path.join(mask_dir_origa, img_name.replace('.jpg', '.png')) for img_name in df_origa['Filename']]
labels_origa = df_origa['Glaucoma'].values

# Combine datasets
image_paths = image_paths_g1020 + image_paths_origa
mask_paths = mask_paths_g1020 + mask_paths_origa
labels = np.concatenate((labels_g1020, labels_origa))

# Create the dataset and dataloader
dataset = GlaucomaDataset(image_paths, mask_paths, labels, transform=image_transform, mask_transform=mask_transform)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

# Testing: Load a batch of images, masks, and labels
images, masks, labels = next(iter(dataloader))
print(images.shape, masks.shape, labels.shape)  # Should print shapes accordingly


In [None]:
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()

        # Encoding
        self.enc1 = self.conv_block(3, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)

        # Decoding
        self.upconv4 = nn.ConvTranspose2d(512, 256, 2, stride=2)
        self.dec4 = self.conv_block(512, 256)
        self.upconv3 = nn.ConvTranspose2d(256, 128, 2, stride=2)
        self.dec3 = self.conv_block(256, 128)
        self.upconv2 = nn.ConvTranspose2d(128, 64, 2, stride=2)
        self.dec2 = self.conv_block(128, 64)
        self.conv_last = nn.Conv2d(64, 1, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        # Encoding
        e1 = self.enc1(x)
        e2 = self.enc2(nn.MaxPool2d(2)(e1))
        e3 = self.enc3(nn.MaxPool2d(2)(e2))
        e4 = self.enc4(nn.MaxPool2d(2)(e3))

        # Decoding
        d4 = self.upconv4(e4)
        d4 = torch.cat((d4, e3), dim=1)
        d4 = self.dec4(d4)
        d3 = self.upconv3(d4)
        d3 = torch.cat((d3, e2), dim=1)
        d3 = self.dec3(d3)
        d2 = self.upconv2(d3)
        d2 = torch.cat((d2, e1), dim=1)
        d2 = self.dec2(d2)
        return self.conv_last(d2)

unet_model = UNet()

In [None]:
efficientnet_model = EfficientNet.from_pretrained('efficientnet-b0')
num_features = efficientnet_model._fc.in_features
efficientnet_model._fc = nn.Linear(num_features, 1)

In [None]:
from torch.cuda.amp import GradScaler, autocast

# Training Loop for U-Net (Segmentation)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
unet_model = unet_model.to(device)
criterion_seg = nn.BCEWithLogitsLoss()  # Using BCEWithLogitsLoss, so no sigmoid in model
optimizer_seg = optim.Adam(unet_model.parameters(), lr=0.0001)

# Using GradScaler for mixed precision training
scaler = GradScaler()

# Optimizing data loading
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)

for epoch in range(5):
    print(f"Starting epoch {epoch + 1} for U-Net...")
    running_loss = 0.0

    for batch_idx, (images, masks, labels) in enumerate(dataloader):
        images = images.to(device)
        masks = masks.to(device)

        #forward pass with mixed precision
        with autocast():
            outputs = unet_model(images)
            loss = criterion_seg(outputs, masks)
        
        #backward pass
        optimizer_seg.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer_seg)
        scaler.update()

        running_loss += loss.item()

        #print progress for every 10 batches
        if (batch_idx + 1) % 10 == 0:
            print(f"Epoch [{epoch + 1}/{10}], Batch [{batch_idx + 1}/{len(dataloader)}], Loss: {loss.item():.4f}")
            
print(f"Epoch {epoch + 1} completed. Average Loss: {running_loss / len(dataloader):.4f}\n")

In [None]:
from sklearn.metrics import accuracy_score, classification_report

def evaluate_accuracy(model, dataloader):
    model.eval().to(device)  # Ensure the model is on the right device
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in dataloader:
            if len(batch) == 3:
                images, masks, labels = batch  # Unpack images, masks, labels
            elif len(batch) == 2:
                images, labels = batch  # For cases with only images and labels

            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(images)
            preds = torch.sigmoid(outputs).cpu().numpy()  # Convert to probabilities

            # Convert probabilities to binary predictions (thresholding at 0.5)
            binary_preds = (preds > 0.5).astype(int).flatten()

            # Collect predictions and true labels
            all_preds.extend(binary_preds)
            all_labels.extend(labels.cpu().numpy())

    # Compute accuracy
    accuracy = accuracy_score(all_labels, all_preds)
    print(f"Accuracy: {accuracy:.4f}")

# Run the accuracy evaluation
evaluate_accuracy(efficientnet_model, dataloader)


In [None]:
refuge_json_path = os.path.join(path, 'REFUGE', 'val', 'index.json')
image_dir_refuge = os.path.join(path, 'REFUGE', 'val', 'Images_Cropped')
mask_dir_refuge = os.path.join(path, 'REFUGE', 'val', 'Masks_Cropped')

# Load the JSON file containing the actual values
with open(refuge_json_path) as f:
    refuge_data = json.load(f)

# Prepare image paths, mask paths, and labels
image_paths_refuge = []
mask_paths_refuge = []
labels_refuge = []

for key in refuge_data:
    img_info = refuge_data[key]
    img_name = img_info['ImgName']
    label = img_info['Label']

    # Construct the full paths
    image_paths_refuge.append(os.path.join(image_dir_refuge, img_name))
    mask_paths_refuge.append(os.path.join(mask_dir_refuge, img_name.replace('.jpg', '.png')))  # Assuming masks are PNGs
    labels_refuge.append(label)

# Convert labels to a numpy array
labels_refuge = np.array(labels_refuge)

# Create a DataLoader for the REFUGE dataset
refuge_dataset = GlaucomaDataset(image_paths_refuge, mask_paths_refuge, labels_refuge, transform=image_transform, mask_transform=mask_transform)
refuge_dataloader = DataLoader(refuge_dataset, batch_size=16, shuffle=False)

# Set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
efficientnet_model.to(device)

# Evaluate the REFUGE dataset
evaluate_accuracy(efficientnet_model, refuge_dataloader)

y_true = []
y_pred = []

for images, masks, labels in refuge_dataloader:
    images = images.to(device)
    outputs = efficientnet_model(images)
    _, predicted = torch.max(outputs.data, 1)

    y_true.extend(labels.numpy())
    y_pred.extend(predicted.cpu().numpy())

# Print classification report
print(classification_report(y_true, y_pred, target_names=["No Glaucoma", "Glaucoma"]))


In [None]:
import time

def calculate_cdr(disc_mask, cup_mask):

    disc_contours, _ = cv2.findContours(disc_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if len(disc_contours) > 0:
        disc_cnt = max(disc_contours, key=cv2.contourArea)
        _, _, _, disc_height = cv2.boundingRect(disc_cnt)
    else:
        disc_height = 1  # Avoid division by zero

    cup_contours, _ = cv2.findContours(cup_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if len(cup_contours) > 0:
        cup_cnt = max(cup_contours, key=cv2.contourArea)
        _, _, _, cup_height = cv2.boundingRect(cup_cnt)
    else:
        cup_height = 0

    cdr = cup_height / disc_height
    return cdr
unet_model.eval()
cdr_list = []

with torch.no_grad():
    for images, _ in dataloader:
        images = images.to(device)
        start_time = time.time()
        output_masks = unet_model(images)
        inference_time = time.time() - start_time

        for i in range(images.size(0)):
            output_mask = output_masks[i].cpu().numpy().squeeze()
            output_mask = (output_mask > 0.5).astype(np.uint8)

            # Assume disc_mask and cup_mask separation logic here
            disc_mask = output_mask  # Modify this based on your actual model output
            cup_mask = output_mask    # Modify this based on your actual model output

            # Calculate CDR
            cdr = calculate_cdr(disc_mask, cup_mask)
            cdr_list.append(cdr)

print(cdr_list)  # Output the CDRs for verification
