In [1]:
import matplotlib.pyplot as plt
import numpy as np
import cv2
import os
import random
import pandas as pd
import csv


## Set your paths here

In [2]:
results_path = 'your_root_path/exp_results/'
dataset_path = 'your_root_path/data/COVID-CTset/'
tmp_results_path = 'your_root_path/tmp/'
NN_path = 'your_root_path/data/trained_networks/FPN-fold1.hdf5'

## Precalculate predictions for an initial data from COVID-CTset

In [3]:
import keras
from keras import optimizers
from keras.models import Sequential,Model
from keras.layers import Dropout, Flatten, Dense,Input
from keras.callbacks import ModelCheckpoint
from keras.applications.imagenet_utils import preprocess_input
from keras import backend as K
from keras.preprocessing.image import ImageDataGenerator
from keras.initializers import RandomNormal
import keras.backend as k
from keras_retinanet import layers
import keras_retinanet
import zipfile

def precalc(folder_path, idxs):
    NN_name = 'FPN-fold1'
    k.clear_session()
    custom_object={'UpsampleLike': keras_retinanet.layers._misc.UpsampleLike}
    net=keras.models.load_model(NN_path, custom_objects=custom_object)
        
    for p_idx in idxs:
        data = {}
        result = f"{folder_path}predictions_{p_idx}"
        with open(result, 'r') as f:
            while True:
                a = f.readline()
                if a:
                    a = a.split(",")
                    patient_name = a[0]
                    idx = int(a[1])
                    predictions = list([float(x) for x in a[2:]])
                else:
                    break
                data.update({idx:predictions})
        covid = "covid" in patient_name
        result = f"{folder_path}projections_{p_idx}"
        
        patient_name_ = patient_name.replace('/', '_')
        
        df = f'{dataset_path}{patient_name}'
        list_names = []
        predictions = []
        for r,d,f in os.walk(df):
            for file in f:
                if '.tif' in file:
                    list_names.append(os.path.join(r,file))
        list_names = sorted(list_names)
        
        for img_name in list_names:
            img=cv2.imread(img_name,cv2.IMREAD_UNCHANGED)
            pred_ind = net.predict(np.expand_dims(np.expand_dims(img,axis=0),axis=3))[0][0]
            predictions.append(int(pred_ind > 0.5))
        res_path = f"{tmp_results_path}{patient_name_}.csv"
        with open(res_path, 'w') as f:
            f.write('')
        with open(res_path, 'a') as f:
            for a in predictions:
                f.write(str(a) + '\n')

for name in os.listdir(results_path):
    if "full_experiment" in name:
        label = name[16:]
        idxs = []
        p = results_path + name + '/'
        for patient in os.listdir(p):
            if 'predictions' in patient:
                idxs.append(patient[12:])
        precalc(p, idxs)

## Load last predictions from history and summarize for each rule

In [4]:
def get_quality(folder_path, idxs):
    total_q = 0
    total_count = 0
    sum_dose = 0
    sum_full_dose = 0
    sum_reduced_dose = 0
    for p_idx in idxs:
        data = {}
        result = f"{folder_path}predictions_{p_idx}"
        rule = result.split("L")[-1].split("t")
        L = int(rule[0])
        t = int(rule[1].split("/")[0])
        with open(result, 'r') as f:
            while True:
                a = f.readline()
                if a:
                    a = a.split(",")
                    patient_name = a[0]
                    idx = int(a[1])
                    predictions = list([float(x) for x in a[2:]])
                else:
                    break
                data.update({idx:predictions})
        covid = "covid" in patient_name
        result = f"{folder_path}projections_{p_idx}"
        proj_time = []
        reduced_dose = -1
        with open(result, 'r') as f:
            for row in f:
                proj_time.append(row)
        
        proj_time = proj_time[1:]
        proj_time = np.asarray(sorted(set(proj_time)))
        proj_time = proj_time.astype('int')
        proj_count = np.max(proj_time) + 1
        proj_int = np.zeros((proj_count))
        for i in proj_time:
            proj_int[i - 1] = 1
        
        patient_name_ = patient_name.replace('/', '_')
        y = []
        covid_count = 0
        for d in sorted(data.keys()):
            a = np.array(data[d][-1]) > 0.5
            y.append(a)
            covid_count+=a
            
        predictions = []
        covid_count_full_dose = 0
        res_path = f"{tmp_results_path}{patient_name_}.csv"
        with open(res_path, 'r') as f:
            for row in f:
                predictions.append(int(row))
                covid_count_full_dose += int(row)
        
        patient_q = 0
        for i in range(len(y)):
            patient_q += y[i] == predictions[i]
            total_q += y[i] == predictions[i]
            total_count += 1
        if len(y) == 0:
            continue
            
        patient_q /= len(y)
        sum_dose += proj_time.shape[0]
        sum_full_dose += proj_count
    
    return(total_q, total_count, sum_dose, sum_full_dose)
        
    

## Loop over all full_experiment's

In [5]:
full_table  = []
description = []
quality     = []
dose        = []
count = 0
sum_sclices = 0
for name in os.listdir(results_path):
    if "full_experiment" in name:
        label = name[16:]
        idxs = []
        p = results_path + name + '/'
        for patient in os.listdir(p):
            if 'predictions' in patient:
                idxs.append(patient[12:])
                count += 1
        total_correct, total_slices, reduced_dose, full_dose = get_quality(p, idxs)
        full_table.append((label, total_correct/total_slices, reduced_dose/full_dose)) 
        description.append(label)
        quality.append(total_correct/total_slices)
        dose.append(reduced_dose/full_dose)

## Table 2

In [None]:
full_dict = {}
for name in os.listdir(results_path):
    if "full_experiment" in name:
        label = name[16:]
        idxs = []
        p = results_path + name + '/'
        for patient in os.listdir(p):
            if 'predictions' in patient:
                idxs.append(patient[12:])
        total_correct, total_slices, reduced_dose, full_dose = get_quality(p, idxs)
        t_c, t_s, r_d, f_d = full_dict.get(label, (0,0,0,0))
        full_dict.update({label: (t_c + total_correct, t_s+total_slices, r_d+reduced_dose, f_d + full_dose)})
full_table  = []
description = []
quality     = []
dose        = []
for label in full_dict.keys():
    description.append(label)
    total_correct, total_slices, reduced_dose, full_dose = full_dict.get(label, (0,0,0,0))
    quality.append(total_correct/total_slices)
    dose.append(reduced_dose/full_dose)

print(description)
print(quality)
print(dose)

## Results for positive and negative cases

In [None]:
full_dict = [{}, {}]
for name in os.listdir(results_path):
    if "full_experiment" in name:
        label = name[16:]
        p = results_path + name + '/'
        for patient in os.listdir(p):
            idxs = []
            if 'predictions' in patient:
                idxs.append(patient[12:])
                idx = 1
                if 'covid' in patient:
                    idx = 0
                total_correct, total_slices, reduced_dose, full_dose = get_quality(p, idxs)
                t_c, t_s, r_d, f_d = full_dict[idx].get(label, (0,0,0,0))
                full_dict[idx].update({label: (t_c + total_correct, t_s+total_slices, r_d+reduced_dose, f_d + full_dose)})

description_b = [[],[]]
quality_b     = [[],[]]
dose_b        = [[],[]]
for i in range(2):
    for label in full_dict[i].keys():
        description_b[i].append(label)
        total_correct, total_slices, reduced_dose, full_dose = full_dict[i].get(label, (0,0,0,0))
        quality_b[i].append(total_correct/total_slices)
        dose_b[i].append(reduced_dose/full_dose)
print('covid')
print(description_b[0])
print(quality_b[0])
print(dose_b[0])
print('non_covid')
print(description_b[1])
print(quality_b[1])
print(dose_b[1])

## Plot the results

In [32]:
def plot_scatter(description, quality, dose):
    fig, ax = plt.subplots()
    ax.scatter(dose, quality)
    ax.set_ylabel('quality')
    ax.set_xlabel('dose')
    ax.set_ylim([0.8, 1.00])
    ax.set_xlim([0.65, 1.00])
    for i, txt in enumerate(description):
        ax.annotate(txt, (dose[i]-0.006, quality[i]+0.003))
    plt.show()

In [None]:
plot_scatter(description, quality, dose)
plot_scatter(description_b[0], quality_b[0], dose_b[0])
plot_scatter(description_b[1], quality_b[1], dose_b[1])