This notebook enables to evaluate ResNet-50 model.

In [None]:
import csv
import math
from pathlib import Path

import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt
from tqdm import tqdm

from src.utils.eval.eval_utils import compute_ROC
from src.utils.training.data_loading import get_load_spectro_for_class
from src.utils.training.keras_models import resnet

## Parameters

In [None]:
ROOT_DIR = "PATH/TO/DATA"  # path where we expect to find directories named "postives", "negatives" and a csv file
OUTPUT_DIR = "ResNet-50/OHASISBIO-3"  # directory where to output files, in the data folder
BATCH_SIZE = 64
epoch = 31  # epoch checkpoint that we want to load
CHECKPOINT = f"../../../../data/model_saves/ResNet-50/cp-{epoch:04d}.ckpt"

ALLOWED_ERROR_S = 10  # tolerance when evaluating and time distance allowed between two peaks in the probabilities distribution

MIN_ANNOTATORS_COUNT = 3  # minimum number of agreeing annotators needed to consider one positive pick

load = get_load_spectro_for_class(224, 3)

## Load model

In [None]:
m = resnet()
m.load_weights(CHECKPOINT)
m.compile(
            optimizer=tf.keras.optimizers.legacy.Adam(),
            loss=tf.losses.binary_crossentropy,
            metrics=['Accuracy','AUC'])

## Load data

In [None]:
with open(ROOT_DIR + "/dataset.csv", "r") as f:
    csv_reader = csv.reader(f, delimiter=",")
    lines = list(csv_reader)
print(len(lines), "files found")

# remove any annotation whose number of agreeing annotators does not match the requirements
for i in range(len(lines)):
    if lines[i][1] == "positive":
        to_keep = []
        for j in range(2, len(lines[i]), 2):
            if int(lines[i][j+1]) >= MIN_ANNOTATORS_COUNT:
                to_keep.extend([lines[i][j]])
        lines[i][2:] = to_keep
        if len(lines[i]) == 2:
            lines[i][1] = "negative"
    lines[i] = lines[i][:2]
        
dataset = tf.data.Dataset.from_tensor_slices(lines)
dataset = dataset.map(load).batch(batch_size=BATCH_SIZE)

## Model execution

In [None]:
detected = []
ground_truth = []

for images, y in tqdm(dataset, total=math.ceil(len(lines)/BATCH_SIZE)):
    predicted = m.predict(images, verbose=False)
    detected.extend(predicted[:,0])
    ground_truth.extend(y)
    
detected = np.array(detected)
ground_truth = np.array(ground_truth)    

In [None]:
m.evaluate(dataset)

In [None]:
TP, FP, FN = [], [], []

for i, p in enumerate(detected):
    if p>0.5:
        # detection is positive
        if ground_truth[i]==1:
            TP.append(i)
        else:
            FP.append(i)
    else:
        # detection is negative
        if ground_truth[i]==1:
            FN.append(i)

Path(f"../../../../data/npy/{OUTPUT_DIR}").mkdir(exist_ok=True, parents=True)
Path(f"../../../../data/figures/{OUTPUT_DIR}").mkdir(exist_ok=True, parents=True)

np.save(f"../../../../data/npy/{OUTPUT_DIR}/TP.npy", TP)
np.save(f"../../../../data/npy/{OUTPUT_DIR}/FP.npy", FP)
np.save(f"../../../../data/npy/{OUTPUT_DIR}/FN.npy", FN)

## ROC curve computing

In [None]:
TPr, FPr = compute_ROC(detected[ground_truth==1], np.count_nonzero(ground_truth==1), detected[ground_truth==0], np.count_nonzero(ground_truth==0), thresh_delta=0.001)
plt.plot(FPr, TPr)

np.save(f"../../../../data/npy/{OUTPUT_DIR}/FPr.npy", FPr)
np.save(f"../../../../data/npy/{OUTPUT_DIR}/TPr.npy", TPr)
plt.xlim(0,1)
plt.ylim(0,1)
plt.ylabel("TP rate")
plt.xlabel("FP rate")
plt.title("ROC curve")
plt.savefig(f"../../../../data/figures/{OUTPUT_DIR}/ROC.png")