In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from transformers import ConvNextV2ForImageClassification
from sklearn.metrics import precision_recall_curve


  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(
  _torch_pytree._register_pytree_node(


Model Weights obtained from David Restrepo: https://drive.google.com/file/d/1ExReZmG3yKUbNWgrovIKNSdRJq6ZWV3O/view

In [2]:
model_weights = torch.load('fine_tuned_convnextv2_binary_DR_ICDR_byol_best.pth')
corrected_weights = {}
for k, v in model_weights.items():
    corrected_weights[k.replace('module.', '')] = v

In [3]:
# Copied and adapted from https://github.com/luisnakayama/BRSET/blob/main/src/model.py
class FoundationalCVModel(torch.nn.Module):
    def __init__(self):
        super(FoundationalCVModel, self).__init__()
        self.backbone = ConvNextV2ForImageClassification.from_pretrained('facebook/convnextv2-base-22k-224')
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])

    def forward(self, x):
        features = self.backbone(x)
        features = features['pooler_output']
        return features

class FoundationalCVModelWithClassifier(torch.nn.Module):
    def __init__(self, backbone, hidden, num_classes, mode='eval', backbone_mode='eval'):
        super(FoundationalCVModelWithClassifier, self).__init__()
        self.backbone = backbone
        self.hidden = hidden
        output_dim = self.calculate_backbone_out()
        layers = []
        
        # Add the linear layer and ReLU activation if 'hidden' is an integer
        if isinstance(hidden, int):
            layers.append(nn.Linear(output_dim, hidden))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(p=0.2))
            layers.append(nn.BatchNorm1d(hidden))
            output_dim = hidden
            
        # Add the linear layer and ReLU activation for each element in 'hidden' if it's a list
        elif isinstance(hidden, list):
            for h in hidden:
                layers.append(nn.Linear(output_dim, h))
                layers.append(nn.ReLU())
                layers.append(nn.Dropout(p=0.2))
                layers.append(nn.BatchNorm1d(h))
                output_dim = h
        
        if hidden:
            self.hidden_layers = nn.Sequential(*layers)
        else:
            self.norm = nn.BatchNorm1d(output_dim)

        self.classifier = nn.Linear(output_dim, num_classes)
            
        # Set the mode
        self.mode = mode
        self.backbone_mode = backbone_mode
        
        if backbone_mode == 'eval':
            self.backbone.eval()
        elif backbone_mode == 'fine_tune':
            self.backbone.train()
            
        if mode == 'eval':
            self.eval()
        elif mode == 'fine_tune':
            self.train()    

    def calculate_backbone_out(self):
        sample_input = torch.randn(1, 3, 224, 224)
        
        self.backbone.eval()
        # Forward pass the sample input through the model
        with torch.no_grad():
            output = self.backbone(sample_input)
        return output.shape[1]
    
    def forward(self, x):
        """
        Forward pass to obtain class predictions from input data.

        Args:
        - x (torch.Tensor): Input data to obtain class predictions for.

        Returns:
        torch.Tensor: Class predictions generated by the model for the input data.
        """
        # Pass the input through the backbone
        features = self.backbone(x)
        
        if self.hidden:
            features = self.hidden_layers(features)
        else:
            features = self.norm(features)

        # Apply the classifier to obtain class predictions
        logits = self.classifier(features)
        
        # Get the probabilities
        # probabilities = self.activation_f(logits)

        return logits    

HIDDEN = [128]
num_classes = 2
MODE = 'eval'
backbone_mode = 'eval'

backbone_model = FoundationalCVModel()
model = FoundationalCVModelWithClassifier(backbone_model, hidden=HIDDEN, num_classes=num_classes, mode=MODE, backbone_mode=backbone_mode)
model.load_state_dict(corrected_weights)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
_ = model.to(device)


In [4]:
brset_labels = pd.read_csv('labels.csv') # From Original BRSET Dataset
brset_embed = pd.read_csv('embeddings.csv') # From Embeddings archive

In [5]:
brset_labels['split'] = brset_embed['split']
brset_labels['DR_2'] = brset_embed['DR_2']
image_dir = '/home/doug/data/brset/images/'
image_list = brset_labels['image_id'] + ".jpg"
test_list = image_list[brset_labels['split'] == 'test']
test_labels = brset_labels[brset_labels['split'] == 'test']['DR_2']

In [6]:
from torchvision import transforms
from PIL import Image
import os

load_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
])

class ImageDataset(Dataset):
    def __init__(self, image_paths, labels, image_dir, transform):
        self.image_dir = image_dir
        self.image_list = image_list
        self.transform = transform
        self.labels = labels
        self.images = []
        for img_name in image_paths:
            img_path = os.path.join(image_dir, img_name)
            img = Image.open(img_path)
            img = self.transform(img)
            self.images.append(img)


    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels.iloc[idx]
        label = torch.tensor(label, dtype=torch.long)
        return image, label
    
test_dataset = ImageDataset(test_list, test_labels, image_dir, load_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [7]:
def get_predicted_probs(model, loader):
    model.eval()
    model.to(device)
    all_logits = []
    with torch.no_grad():
        for X,_ in loader:
            X = X.to(device)
            model_output = model(X).cpu()
            all_logits.append(model_output[:,0].numpy())
            # y_hat = torch.cat((y_hat, model(X)))
    all_logits = np.concatenate(all_logits)
    return nn.Sigmoid()(torch.tensor(all_logits)).numpy()

def get_optimal_f1_threshold(y_true, y_pred):
    epsilon = 1e-10
    precision, recall, thresholds = precision_recall_curve(y_true, y_pred)
    f1 = 2 * precision * recall / (precision + recall + epsilon)
    return thresholds[np.argmax(f1)]

In [9]:
test_probs = get_predicted_probs(model, test_loader)
threshold = get_optimal_f1_threshold(test_labels, test_probs)
test_preds = (test_probs > threshold).astype(int)

test_roc = roc_auc_score(test_labels, test_probs)
test_acc = accuracy_score(test_labels, test_preds)
test_f1 = f1_score(test_labels, test_preds)

print(f"Test ROC: {test_roc}")
print(f"Test Accuracy: {test_acc}")
print(f"Test F1: {test_f1}")
np.save('probs/convnextv2-base_test_emsplit_probs.npy', test_probs)

Test ROC: 0.9911692284210674
Test Accuracy: 0.9849416103257529
Test F1: 0.8778054862842892
