In [None]:
import os
import pandas as pd
from PIL import Image
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score


In [None]:
def get_y_pred(model, test_loader):
    model.eval()
    y_true, y_pred = [], []
    device = 'cpu'
    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            probs = torch.sigmoid(outputs).cpu().numpy()
            preds = (probs > 0.5).astype(int)
            y_true.extend(labels.numpy())
            y_pred.extend(preds)

    y_true = torch.tensor(y_true).numpy()
    y_pred = torch.tensor(y_pred).numpy()

    
    return y_pred

    

In [None]:
def submitify(y_pred, csv_submit="../data/onsite_test_submission.csv"):
    disease_names = ["D", "G", "A"]
    submit_dict = pd.read_csv(csv_submit).to_dict(orient="list")
    ids = submit_dict["id"]
    submit_dict = {k: [] for k in submit_dict.keys()}
    print(submit_dict.keys())
    for i, disease in enumerate(disease_names):  
        y_p = y_pred[:, i]
        submit_dict[disease_names[i]].extend(y_p.tolist())
        submit_dict["id"] = ids

    df_submit =  pd.DataFrame(submit_dict)
    return df_submit

In [None]:
from dl.model import init_model
from omegaconf import OmegaConf
from dl.data import RetinaMultiLabelDataset, aug_test_tf

def init_all(backbone="resnet18"):
    
    cfg = OmegaConf.load("../cfgs/task_1/resnet/fine-tuning.yaml")
    cfg.model.ckpt = None
    cfg.task = "eval"
    cfg.model.pretrained = False
    cfg.model.backbone = "resnet18" if backbone=="resnet" else "efficientnet" 
    model = init_model(cfg)
    ds = RetinaMultiLabelDataset("../data/onsite_test_submission.csv",
                                image_dir="../data/images/onsite_test",
                                transform= aug_test_tf)

    loader = DataLoader(ds, batch_size=32, shuffle=False, num_workers=0)

    return model, loader

In [None]:
def test_method(taskid, model_dir, sub_task):
    model, loader = init_all(backbone= model_dir)
    if taskid in ["1", "2"]:
        models_path = os.path.join("../eval",  "models", f"Task {taskid}", f"{model_dir}", f"{sub_task}/")
        ckpt = torch.load(os.path.join(models_path, "ckpt.pt"), map_location=torch.device('cpu'))

        model.load_state_dict(ckpt['model'])

        y_pred = get_y_pred(model, loader)
        df_submit = submitify(y_pred)

        csv_paths = os.path.join("../eval/preds", f"Task {taskid}", f"{model_dir}", f"{sub_task}", f"Task-{taskid}-{model_dir}-{sub_task}_submission.csv")
        if not os.path.exists(os.path.dirname(csv_paths)):
            os.makedirs(os.path.dirname(csv_paths))
            
        df_submit.to_csv(csv_paths, index=False)
        print(f"Submission file saved at {csv_paths}")

    else:
        print("Evaluation for this task is not implemented yet.")

In [None]:
test_method(taskid="2", model_dir="effnet", sub_task="focal")

  ckpt = torch.load(os.path.join(models_path, "ckpt.pt"), map_location=torch.device('cpu'))


dict_keys(['id', 'D', 'G', 'A'])
Submission file saved at ../eval/preds/Task 2/effnet/focal/Task-2-effnet-focal_submission.csv


In [None]:
test_method(taskid="1", model_dir="effnet", sub_task="probing")
test_method(taskid="2", model_dir="effnet", sub_task="focal")
test_method(taskid="2", model_dir="effnet", sub_task="balanced")

# test_method(taskid="1", model_dir="effnet", sub_task="ft")

test_method(taskid="1", model_dir="resnet", sub_task="probing")
test_method(taskid="1", model_dir="resnet", sub_task="ft")

  ckpt = torch.load(os.path.join(models_path, "ckpt.pt"), map_location=torch.device('cpu'))
  y_true = torch.tensor(y_true).numpy()
  ckpt = torch.load(os.path.join(models_path, "ckpt.pt"), map_location=torch.device('cpu'))


dict_keys(['id', 'D', 'G', 'A'])
Submission file saved at ../eval/preds/Task 1/effnet/probing/Task-1-effnet-probing_submission.csv
dict_keys(['id', 'D', 'G', 'A'])
Submission file saved at ../eval/preds/Task 2/effnet/focal/Task-2-effnet-focal_submission.csv


  ckpt = torch.load(os.path.join(models_path, "ckpt.pt"), map_location=torch.device('cpu'))


dict_keys(['id', 'D', 'G', 'A'])
Submission file saved at ../eval/preds/Task 2/effnet/balanced/Task-2-effnet-balanced_submission.csv


  ckpt = torch.load(os.path.join(models_path, "ckpt.pt"), map_location=torch.device('cpu'))


dict_keys(['id', 'D', 'G', 'A'])
Submission file saved at ../eval/preds/Task 1/resnet/probing/Task-1-resnet-probing_submission.csv


  ckpt = torch.load(os.path.join(models_path, "ckpt.pt"), map_location=torch.device('cpu'))


dict_keys(['id', 'D', 'G', 'A'])
Submission file saved at ../eval/preds/Task 1/resnet/ft/Task-1-resnet-ft_submission.csv


#### Predict

In [None]:
def predict(model, test_loader, cfg):
    model.eval()
    y_true, y_pred = [], []
    device = 'cpu'
    backbone = cfg.model.backbone
    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            probs = torch.sigmoid(outputs).cpu().numpy()
            preds = (probs > 0.5).astype(int)
            y_true.extend(labels.numpy())
            y_pred.extend(preds)

    y_true = torch.tensor(y_true).numpy()
    y_pred = torch.tensor(y_pred).numpy()

    disease_names = ["DR", "Glaucoma", "AMD"]

    res = {}
    for i, disease in enumerate(cfg.data.label_names):  #compute metrics for every disease
        y_t = y_true[:, i]
        y_p = y_pred[:, i]

        acc = accuracy_score(y_t, y_p)
        precision = precision_score(y_t, y_p, average="macro",zero_division=0)
        recall = recall_score(y_t, y_p, average="macro",zero_division=0)
        f1 = f1_score(y_t, y_p, average="macro",zero_division=0)
        kappa = cohen_kappa_score(y_t, y_p)

        print(f"{disease} Results [{cfg.model.backbone}]")
        print(f"Accuracy : {acc:.4f}")
        print(f"Precision: {precision:.4f}")
        print(f"Recall   : {recall:.4f}")
        print(f"F1-score : {f1:.4f}")
        print(f"Kappa    : {kappa:.4f}")

        res[disease] = {
            "accuracy": acc,
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
            "cohen_kappa": kappa
        }    

    avg_acc = accuracy_score(y_true, y_pred)
    avg_precision = precision_score(y_true, y_pred, average="macro",zero_division=0)
    avg_recall = recall_score(y_true, y_pred, average="macro",zero_division=0)
    avg_f1 = f1_score(y_true, y_pred, average="macro",zero_division=0)
    avg_kappa = -1.#cohen_kappa_score(y_true, y_pred) # TODO: Check if kappa can be averaged across multi-label
    res['avg'] = {"f1_score": avg_f1, "accuracy": avg_acc, "precision": avg_precision, "recall": avg_recall, "cohen_kappa": avg_kappa}

    df_result = pd.DataFrame(res).T
    # df_result = df_result.reset_index()
    return df_result