In [None]:
import os
import glob
import time
import sys
sys.path.insert(0, "..")

#import warnings
#warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
import numpy as np
from skimage import io

## Analisi dei primi risultati: 5 gruppi da 120 immagini con threshold che varia; si indivisduano falsi negativi, falsi positivi e tracce vere; si trovano le percentuali per valore di threshold e si mette tutto in grafico. Si può scegliere fra l'algoritmo DoG, LoG e DoH: per il primo si devono tenere 3 cifre decimali, per gli altri 4 (DoH second 5 decimali) da cambiare in linea 50.


In [None]:
def get_all_needed(folder_name: str):
    """
    This function simply upload every needed file
    for the first analysis

    input:
    * folder_name: directory containing data
    * number: iteration 
    * n_t: number of threshold values to consider, -1 corresponds to all of them.

    output:
    * name_list 
    * result_array 
    * img_dict
    * gt_dict = ground_truth
    * float_thresh
    * pred_dict
    """
    # Get the global results
    name_list = glob.glob(os.path.join(folder_name, "New_test", "*.txt"))
    name_list.sort()

    # Extract the names of the analyzed images and collect the ground truth data
    name_list = [name[-7:-4] for name in name_list]
    #print(name_list)
    img_dict = {name : io.imread(os.path.join("..", "Img_test",
                            "{}.jpg".format(name))) for name in name_list}
    gt_dict = {name : np.loadtxt(os.path.join("..", "Img_test",
                            "{}.txt".format(name))) for name in name_list}

    pred_dict = {}
    for name in name_list:
        virtual = np.array([-1,-1,-1])
        # se non trova il file mi inventa una traccia in 
        # un posto impossibile
        # in questo modo i fn vengono bene
        text_path = os.path.join(folder_name, 
                        "New_test",
                        "{name}.txt".format(name=name))
        if os.path.isfile(text_path):
            text = np.loadtxt(text_path)
            pred_dict[name] = text
    
    return name_list, img_dict, gt_dict, pred_dict

def FalseNegative(name_list: list, gt: dict, pred:dict):
    """
    Questa funzione trova il numero di falsi negativi per immagine

    in input prende:
    * gt: array dei risultati veri (di una immagine)
    * pred: le tracce trovate dagli algoritmi per quell'immagine

    in output abbiamo:
    * numero di tracce vere trovate
    * numero di falsi negativi
    """
    num_true = []
    num_false_negative = []
    true_dict = {}
    false_negative_dict = {}
    for name in name_list:
        true = []
        false_negative = []
        gt_track = gt[name]
        pred_track = pred[name]
        if len(pred_track) != 0 :
            if not np.isscalar(pred_track[0]):
                a_blob_i = [i for i in range(len(pred_track))]
                if not np.isscalar(gt_track[0]):
                    for blob in gt_track:
                        y, x, r, c = blob
                        check = False                
                        for a_i in a_blob_i:
                            y_a, x_a, r_a = pred_track[a_i]
                            d2 = (x - x_a)**2 + (y - y_a)**2
                            if d2 <= (r)**2:
                                check = True
                                break
                        if check:
                            true.append(pred_track[a_i])
                            a_blob_i.remove(a_i)
                        else:
                            false_negative.append(blob)
                else:
                    y, x, r, c = gt_track
                    check = False                
                    for a_i in a_blob_i:
                        y_a, x_a, r_a = pred_track[a_i]
                        d2 = (x - x_a)**2 + (y - y_a)**2
                        if d2 <= (r)**2:
                            check = True
                            break
                    if check:
                        true.append(pred_track[a_i])
                        a_blob_i.remove(a_i)
                    else:
                        false_negative.append(gt_track)
            else:
                y_a, x_a, r_a = pred_track
                if not np.isscalar(gt_track[0]):
                    true_i = [i for i in range(len(gt_track))]
                    for t_i in true_i:
                        y, x, r, c = gt_track[t_i]
                        d2 = (x - x_a)**2 + (y - y_a)**2
                        if d2 <= (r)**2:
                            true.append(pred_track)
                            true_i.remove(t_i)
                            break
                    for t_i in true_i:
                        false_negative.append(gt[t_i])
                else:
                    y, x, r, c = gt_track
                    y_a, x_a, r_a = pred_track
                    d2 = (x - x_a)**2 + (y - y_a)**2
                    if d2 <= (r)**2:
                        true.append(pred_track)
                    else:
                        false_negative.append(gt_track)
        else:
            false_negative.append(gt_track)
            
        true_dict.update({name: np.asarray(true)})
        false_negative_dict.update({name: np.asarray(false_negative)})
        num_true.append(len(true))
        num_false_negative.append(len(false_negative))
        
    return num_true, num_false_negative, true_dict, false_negative_dict


def FalsePositive(name_list: list, gt: dict, pred:dict):
    """
    Questa funzione trova il numero di falsi positivi per immagine

    in input prende:
    * gt: array dei risultati veri (di una immagine)
    * pred: le tracce trovate dagli algoritmi per quell'immagine e quel valore di thresh

    in output abbiamo:
    * numero di tracce vere trovate
    * numero di falsi positivi
    """
    num_true = []
    num_false_positive = []
    true_dict = {}
    false_positive_dict = {}
    for name in name_list:
        true = []
        false_positive = []
        gt_track = gt[name]
        pred_track = pred[name]
        if len(pred_track) != 0 :
            if not np.isscalar(pred_track[0]):
                if not np.isscalar(gt_track[0]):
                    true_i = [i for i in range(len(gt_track))]
                    for blob in pred_track:
                        y_a, x_a, r_a = blob
                        check = False                
                        for t_i in true_i:
                            y, x, r, c = gt_track[t_i]
                            d2 = (x - x_a)**2 + (y - y_a)**2
                            if d2 <= (r)**2:
                                check = True
                                break
                        if check:
                            true.append(blob)
                            true_i.remove(t_i)
                        else:
                            false_positive.append(blob)
                else:
                    y, x, r, c = gt_track
                    #check = False    
                    a_blob_i = [i for i in range(len(pred_track))]
                    for a_i in a_blob_i:
                        y_a, x_a, r_a = pred_track[a_i]
                        d2 = (x - x_a)**2 + (y - y_a)**2
                        if d2 <= (r)**2:
                            #check = True
                            true.append(pred_track[a_i])
                            a_blob_i.remove(a_i)
                            break   
                    for a_i in a_blob_i:
                        false_positive.append(pred_track[a_i])
            else:
                y_a, x_a, r_a = pred_track
                if not np.isscalar(gt_track[0]):
                    check = False
                    for blob in gt_track:
                        y, x, r, c = blob                        
                        d2 = (x - x_a)**2 + (y - y_a)**2
                        if d2 <= (r)**2:
                            true.append(pred_track)
                            check = True
                            break
                    if not check:
                        false_positive.append(pred_track)
                else:
                    y, x, r, c = gt_track
                    y_a, x_a, r_a = pred_track
                    d2 = (x - x_a)**2 + (y - y_a)**2
                    if d2 <= (r)**2:
                        true.append(pred_track)
                    else:
                        false_positive.append(pred_track)
                    
        true_dict.update({name: np.asarray(true)})
        false_positive_dict.update({name: np.asarray(false_positive)})
        num_true.append(len(true))
        num_false_positive.append(len(false_positive))
        
    return num_true, num_false_positive, true_dict, false_positive_dict



In [None]:
# name_list, img_dict, gt_dict, pred_dict
print("retrieve data")
start = time.time()
name, img_dict, gt_dict, pred_dict = get_all_needed(os.path.join("Dog_Algorithm"))
true_num_n, negative_num, true_track_fn, false_negative_track = FalseNegative(name, 
                                                                            gt_dict,
                                                                            pred_dict)
true_num_p, positive_num, true_track_fp, false_positive_track = FalsePositive(name, 
                                                                            gt_dict,
                                                                            pred_dict)
end = time.time()
print("data collected in {:.2f}s".format(end-start))
#print(name)

    

In [None]:
false_negative_percent = []
false_positive_percent = []
for indx in range(len(name)):
    total_n = true_num_n[indx] + negative_num[indx]
    percent_n = (negative_num[indx]/total_n)*100
    false_negative_percent.append(percent_n)
    
    total_p = true_num_p[indx] + positive_num[indx]
    if total_p != 0:
        percent_p = (positive_num[indx]/total_p)*100
    else:
        percent_p = 0
    false_positive_percent.append(percent_p)
    
print("Mean negative: {:.2f}\nMean positive {:.2f}".format(np.mean(false_negative_percent),
                                                    np.mean(false_positive_percent)))

In [None]:
radius_array = []
color_array = []
for nm in name:
    false_array = false_negative_track[nm]
    for blob in false_array:
        y, x, r, c = blob
        radius_array.append(r)
        color_array.append(c)
print("Radius mean {:.2f}\nColor mean {:.2f}".format(np.mean(radius_array), 
                                                     np.mean(color_array)))

In [None]:
print("retrieve f_n and f_p")
    start = time.time()
    f_negative = get_FalseNegative(gt_dict,pred_dict)
    f_positive = get_FalsePositive(gt_dict,pred_dict)
    end = time.time()
    print("data collected in {}s".format(end-start))

    max_thresh_group = global_plot(result,
                                   gt_dict,
                                   f_negative,
                                   f_positive,
                                   i)
    thresh_list[i-1] = max_thresh_group



def global_plot(result_array, gt, fn, fp, i=0):
    """
    Questa funzione genera i plot

    in input prende:
    * result_array : che è l'array dei risultati globali
    * gt: il dizionario dei risultati veri
    * fn: il dizionario dei falsi negativi e tracce vere
    * fp: il dizionario dei falsi positivi
    * i: un indice numerico usato per salvare i dati
    """

    true_mean = np.sum(np.array([ len(gt[k]) for k in gt.keys()]))

    # plot totals
    thresh_values = result_array[0,:,0]
    total_tracks  = np.sum(result_array[:,:,1],axis=0)
    mean_time     = np.mean(result_array[:,:,2],axis=0)
    
#     fig, ax1 = plt.subplots( 1, 1, figsize=(8, 5))
#     fig.suptitle("Algorithm's behaviour\nGroup {}".format(i), y=1.05)

#     ax1.plot(thresh_values, total_tracks, color='red', label="Detected tracks")
#     ax1.axhline(true_mean, label="True track number")    
#     ax1.set_title("Tracks number and process time in function of threshold parameter")
#     #ax1.set_xlim((0, 0.2))
#     #ax1.set_ylim((2e3, 3e6))
#     ax1.tick_params(axis='x', labelrotation=65)
#     ax1.set_xlabel("Threshold")
#     ax1.set_ylabel("Tracks number")
#     ax1.set_yscale("log")
#     ax1.legend(loc=1)
    
#     ax3 = ax1.twinx()
#     ax3.plot(thresh_values, mean_time, color='blue', label="Execution time")
#     ax3.set_ylabel("Time(s)")
#     ax3.legend(loc=7)
    
#     plt.tight_layout()
#     #plt.savefig("Tempo_Tracce_{}.png".format(i), bbox_inches='tight')
#     #plt.show()
#     plt.close()

    
    #fig, ax = plt.subplots( 1, 
    #                        1, 
    #                        constrained_layout=True, 
    #                        figsize=(10, 7))
    #ax.set_title("Track total")  
    #ax.plot(thresh,true, label="true")
    #ax.plot(thresh,neg, label="false negative")
    #ax.plot(thresh,pos, label="false positive")
    #ax.axhline(true_mean, label="True track number")  
    #ax.set_xlim((np.min(thresh), np.max(thresh)))  
    #ax.legend()
    

    #plt.savefig("Tracce_Totali_{}.png".format(i))
    #plt.show()
    #plt.close()
    
    fn_array = np.array([[[fn[n][t][0],fn[n][t][1],t] for t in fn[n].keys()] for n in fn.keys()])
    fp_array = np.array([[[fp[n][t][0],fp[n][t][1],t] for t in fp[n].keys()] for n in fp.keys()])

    thresh = np.mean(fn_array[:,:,2], axis=0)
    true = np.sum(fn_array[:,:,0], axis=0)
    neg = np.sum(fn_array[:,:,1], axis=0)
    pos = np.sum(fp_array[:,:,1], axis=0)
    
    fig, ax = plt.subplots( 1, 1, figsize=(8, 5))
    fig.suptitle("Tracks' type percentage", y=1.0)
    
    true_mean = pos + true_mean                      
    ax.plot(thresh, true/true_mean*100, color='green', label="true")
    ax.plot(thresh, neg/true_mean*100, color='blue', label="false negative")
    ax.plot(thresh, pos/true_mean*100, color='red', label="false positive")
    ax.plot(thresh, true*100/true_mean*(1-neg/true_mean)*(1-pos/true_mean), 
            color='black', label="best function")
    ax.set_xlim((np.min(thresh), 0.4))
    ax.set_xlabel("Threshold")
    ax.set_ylabel("Percentage")
    ax.legend()
    
    plt.tight_layout()
    plt.savefig("Tracce_Percentuali_{}.png".format(i))
    plt.show()
    plt.close()
    
    total = true*100/true_mean*(1-neg/true_mean)*(1-pos/true_mean)
    max_total = np.max(total)
    index_max_total = np.where(total == max_total)
    max_thresh = thresh[index_max_total]
    
    return max_thresh



# Get the list of thresholds
    thresh_list = glob.glob(os.path.join(
        folder_name, "Group_{}".format(number),
        "*.txt"))
    thresh_list = [float(thresh[-9:-4]) for thresh in thresh_list]
    float_thresh = list(set(thresh_list))[0:n_t]
    float_thresh.sort()


def get_all_needed_2(folder_name: str, thresh=0.0272):
    """
    This function simply upload every needed file for the second analysis

    input:
    * folder_name: directory containing data
    * thresh: considered thrshold 

    output:
    * name_list 
    * img_dict
    * gt_dict
    * pred_dict
    """
    name_list = ["{:03d}".format(i) for i in range(1000)]
    img_dict = { name: io.imread(os.path.join( "img_PseudoTracks",
                    "{}.jpg".format(name))) for name in name_list}
    gt_dict = { name: np.loadtxt(os.path.join( "img_PseudoTracks",
                            "{}.txt".format(name))) for name in name_list}
    
    pred_dict = { name: {thresh:np.loadtxt(os.path.join(folder_name,
                                          "{name}_{thresh}.txt".format(name=name,thresh=thresh)))} for name in name_list }
    
    return name_list, img_dict, gt_dict, pred_dict

def global_plot_2(gt,
                  fn,
                  fp,
                  t,
                  name_list):

    total_true = np.array([ len(gt[n]) for n in name_list]) #control
    fn_array = np.array([[fn[n][t][0],fn[n][t][1]] for n in name_list])
    fp_array = np.array([[fp[n][t][0],fp[n][t][1]] for n in name_list])
    total_true = total_true + fp_array[:,1]
    true = fn_array[:,0]/total_true*100
    neg = fn_array[:,1]/total_true*100
    pos = fp_array[:,1]/total_true*100
    
    mean_true = np.mean(np.asarray(true))
    std_true = np.std(np.asarray(true))
    mean_positive = np.mean(np.asarray(pos))
    std_positive = np.std(np.asarray(pos))
    mean_negative = np.mean(np.asarray(neg))
    std_negative = np.std(np.asarray(neg))
    name = [int(name) for name in name_list]

    fig, ax = plt.subplots(1, 1, figsize=(10, 5))
    fig.suptitle("True, false negative and false positive tracks percentage per image", y=1.05)

    ax.plot(name, true, color='b', label="True tracks")
    ax.plot(name, neg, color='g', label="False negative")
    ax.plot(name, pos, color='r', label="False positive")
    ax.axhline(100, color='k', label="Controll percentage")
    ax.set_xlim((np.min(name), np.max(name)))
    #ax.tick_params(axis='x', labelrotation=65)
    ax.set_xlabel("Image number")
    ax.set_ylabel("Percentage")
    ax.set_title("""True tracks mean percentage {mt:.2f} +- {st:.2f} 
    False negative mean percentage {mf:.2f} +- {sf:.2f}
    False positive mean percentage {mp:.2f} +- {sp:.2f}""".format(
        mt=mean_true,st=std_true,
        mf=mean_negative,sf=std_negative,
        mp=mean_positive,sp=std_positive))
    #ax.set_xscale("log")
    #ax.set_yscale("log")
    ax.legend() 
    plt.tight_layout()
    plt.savefig("All_images.png", bbox_inches='tight')
    plt.show()
    plt.close()
    
    fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(10, 5))
    fig.suptitle("Dog histogram", y=1.05)

    ax1.hist(true, bins=50, color='b', label="True tracks")
    ax1.set_xlim((0, 100))
    ax1.set_xlabel("Percentage")
    ax1.set_ylabel("Number of images")
    ax1.legend()
    
    ax2.hist(neg, bins=50, color='g', label="False negative")
    ax2.set_xlim((0, 100))
    ax2.set_xlabel("Percentage")
    ax2.set_ylabel("Number of images")
    ax2.legend()
    
    ax3.hist(pos, bins=50, color='r', label="False positive")
    ax3.set_xlim((0, 100))
    ax3.set_xlabel("Percentage")
    ax3.set_ylabel("Number of images")
    ax3.legend()
    
    plt.tight_layout()
    plt.savefig("tre_hist_dog.png", bbox_inches='tight')
    plt.show()
    plt.close()

    
    return true, neg, pos


In [None]:
n_true = []
    n_false_positive = []
    if len(pred) != 0:
        if not np.isscalar(pred[0]):
            if not np.isscalar(gt[0]):
                blob_j = [i for i in range(len(gt))]
                for a_blob in pred:
                    y_a, x_a, r_a = a_blob
                    check = False
                    for b_i in blob_j:
                        y, x, r, c = gt[b_i]
                        d2 = (x - x_a)**2 + (y - y_a)**2
                        if d2 <= (r)**2:
                            check = True
                            true.append(a_blob)
                            blob_j.remove(b_i)
                            break
                    if not check:
                        false_positive.append(a_blob)
            else:
                a_blob_i = [i for i in range(len(pred))]
                for a_i in a_blob_i:
                    y_a, x_a, r_a = pred[a_i]
                    y, x, r, c = gt
                    d2 = (x - x_a)**2 + (y - y_a)**2
                    if d2 <= (r)**2:
                        true.append(pred[a_i])
                        a_blob_i.remove(a_i)
                        break
                for a_i in a_blob_i:
                    false_positive.append(pred[a_i])                       
            
        else:
            if not np.isscalar(gt[0]):
                blob_j = [i for i in range(len(gt))]
                y_a, x_a, r_a = pred
                check = False
                for b_i in blob_j:
                    y, x, r, c = gt[b_i]
                    d2 = (x - x_a)**2 + (y - y_a)**2
                    if d2 <= (r)**2:
                        check = True
                        true.append(pred)
                        blob_j.remove(b_i)
                        break
                if not check:
                    false_positive.append(pred)
            else:
                y_a, x_a, r_a = pred
                y, x, r, c = gt
                d2 = (x - x_a)**2 + (y - y_a)**2
                if d2 <= (r)**2:
                    true.append(pred)
                else:
                    false_positive.append(pred)
    return len(true), len(false_positive)

def get_FalsePositive(gt: dict, predict: dict):
    """
    Questa funzione genera il dizionario di risultati per immagine e valore di threshold.
    Il risultato viene salvato in un dizionario dove la prima chiave è il nome dell'immagine.
    A questa chiave viene associato un altro dizionario dove la chiave in questo caso è threshold.
    Associato alla combinazione delle due chiavi avremo un array con due valori,
    il primo il numero di tracce vere e il secondo il numero di falsi positivi.

    in input prende:
    * gt: il dizionario dei risultati veri
    * pred: le tracce trovate dagli algoritmi

    in output abbiamo:
    * dizionario dei risultati
    """
    result = {}
    for n in gt.keys():
        result[n] = {}
        for t in predict[n].keys():
            result[n][t] = FalsePositive(gt[n],predict[n][t])
    return result

def get_FalseNegative(gt: dict, predict: dict):
    """
    Questa funzione genera il dizionario di risultati per immagine e valore di threshold.
    Il risultato viene salvato in un dizionario dove la prima chiave è il nome dell'immagine.
    A questa chiave viene associato un altro dizionario dove la chiave in questo caso è threshold.
    Associato alla combinazione delle due chiavi avremo un array con due valori,
    il primo il numero di tracce vere e il secondo il numero di falsi negativi.

    in input prende:
    * gt: il dizionario dei risultati veri
    * pred: il dizionario delle tracce trovate dagli algoritmi

    in output abbiamo:
    * dizionario dei risultati
    """
    result = {}
    for n in gt.keys():
        result[n] = {}
        for t in predict[n].keys():
            result[n][t] = FalseNegative(gt[n],predict[n][t])
    return result

from utils import *
import time

t=0.0272

# name_list, result_array, img_dict, gt_dict, float_thresh, pred_dict
print("retrieve data")
start = time.time()
n, img_d, gt_d, pred_d = get_all_needed_2("Second", t)
end = time.time()
print("data collected in ",end - start,"s")
#

print("retrieve fn and fp")
start = time.time()
fn = get_FalseNegative(gt_d,pred_d)
fp = get_FalsePositive(gt_d,pred_d)
end = time.time()
print("data collected in ",end - start,"s")

global_plot_2(gt_d,fn,fp,t,n)

In [None]:
print(thresh_list) 
mean_thresh = np.round(np.mean(np.asarray(thresh_list)), 4)
std_thresh = np.round(np.std(thresh_list), 4)
arr = np.array([mean_thresh, std_thresh])
np.save("mean_thresh_value", arr, allow_pickle=True)
print(mean_thresh, std_thresh)

In [None]:
d = np.load("mean_thresh_value.npy", allow_pickle=True)
print("After the complete analysis of the data, the best threshold value for this algorithm is {}+-{}".format(d[0], d[1]))