In [None]:
## import the necessary packages
import numpy as np
import os
from scipy.optimize import linear_sum_assignment
import warnings
import csv
import pandas as pd
from tqdm import tqdm
import tifffile
from math import*
import json

# Suppress FutureWarning messages
warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
method = 'GNN'
#Choose directory containing the predicted nucleus and golgi centroids 

gt_dir = r"../data/vectors" #directory with the ground truth vectors
img_dir = r"../data/images"#directory with images

#nuclei_thresholds = [8.8]
#golgi_thresholds = [4.4]
nuclei_thresholds = [10]
golgi_thresholds = [6]
levels_ = [0]
lvl_ = 0

numbers_ = [0, 1, 2, 3, 4, 5, 6, 7] #crops

## name of the images
imgs = ['Crop1', 'Crop2', 'Crop3', 'Crop4', 'Crop5_BC', 'Crop6_BC', 'Crop7_BC','Crop8_BC']

image_dimensions = [[0.333,0.333,0.270], [0.333,0.333,0.270], [0.333,0.333,0.270], [0.333,0.333,0.270],
              [0.333,0.333,0.270], [0.333,0.333,0.270], [0.333,0.333,0.400], [0.333,0.333,0.400]] #um

info = ['test']*8

In [None]:
''' Define the metrics ''' 
def square_rooted(x):
    return round(np.sqrt(sum([a*a for a in x])),3)
 
def cosine_similarity(x,y):
    numerator = sum(a*b for a,b in zip(x,y))
    denominator = square_rooted(x)*square_rooted(y)
    return round(numerator/float(denominator),3)

#Euclidean distance computed in um
def distance_um(p, q, dimx, dimy, dimz):
    dist_um = (((p[0]-q[0])*dimx)**2)+(((p[1]-q[1])*dimy)**2)+(((p[2]-q[2])*dimz)**2)
    return np.sqrt(dist_um) 

#ignore the borders of the image
def inside_img(coord,img_dim_x,img_dim_y,img_dim_z,x_y_lim,z_lim):
    return coord[0]<img_dim_x-x_y_lim and coord[0]>x_y_lim and coord[1]<img_dim_y-x_y_lim and coord[1]>x_y_lim and coord[2]<img_dim_z-z_lim and coord[2]>0

#modify Constraints Col
def transform_constraints_column(col):
    """
    This function takes a string value from the 'Constraints' column and returns:
        - False if the value doesn't contain 'constraints_'
        - The substring after 'constraints_' (inclusively) otherwise
    """
    if 'constraints' not in col:
        return False
    else:
        return "constraints"+col.split('constraints')[1]

In [None]:
def eval_results_batch(pred_subfolder, image_nb, allmetrics, metrics_stats):
    pred_vectors = os.path.join(pred_subfolder, imgs[image_nb] + '.csv')
    gt_vectors = os.path.join(gt_dir, imgs[image_nb] + '.csv')

    ## read the image and get its dimensions
    image = tifffile.imread(os.path.join(img_dir, imgs[image_nb] + '.tif'))
    (img_dim_x, img_dim_y, img_dim_z, channels) = np.shape(image)

    #voxel's physical dimensions
    x_spacing = image_dimensions[image_nb][0]
    y_spacing = image_dimensions[image_nb][1]
    z_spacing = image_dimensions[image_nb][2]
    
    #limits to ignore vectors at the borders of the image
    x_y_lim = int(7/x_spacing)  #(voxels)  16
    z_lim = int(5/z_spacing)    #(voxels)  5

    #print('Reading the csv file with the ground truth vectors')
    ## nuclei and golgi centroids
    nuclei_centroids_gt = [] 
    golgi_centroids_gt = []
    
    #open the csv file and save the gt nucleus and Golgi centroids
    file = open(gt_vectors, "rU")
    reader = csv.reader(file, delimiter=';')
    for row in reader:
        if row[0] != 'YN,XN,ZN,YG,XG,ZG':
            aux = row[0].split(",")
            YN = int(float(aux[0]))-1
            XN = int(float(aux[1]))-1
            ZN = int(float(aux[4]))-1
            YG = int(float(aux[2]))-1
            XG = int(float(aux[3]))-1
            ZG = int(float(aux[5]))-1
            
            if inside_img(np.asarray([XN,YN,ZN]), img_dim_x, img_dim_y, img_dim_z, x_y_lim, z_lim) and inside_img(np.asarray([XG,YG,ZG]), img_dim_x,img_dim_y,img_dim_z,x_y_lim,z_lim):
                nuclei_centroids_gt.append((XN,YN,ZN))
                golgi_centroids_gt.append((XG,YG,ZG))     
    
    golgi_centroids_gt = np.asarray(golgi_centroids_gt)
    nuclei_centroids_gt = np.asarray(nuclei_centroids_gt)
    
    #Remove predicted nuclei and golgi at image borders
    n_centroids = []
    g_centroids = []
    #open the csv file and save the gt nucleus and Golgi centroids
    file = open(pred_vectors, "rU")
    reader = csv.reader(file, delimiter=';')
    for row in reader:
        if row[0] != 'YN,XN,ZN,YG,XG,ZG':
            aux = row[0].split(",")
            YN = int(float(aux[0]))-1
            XN = int(float(aux[1]))-1
            ZN = int(float(aux[4]))-1
            YG = int(float(aux[2]))-1
            XG = int(float(aux[3]))-1
            ZG = int(float(aux[5]))-1
            
            if inside_img(np.asarray([XN,YN,ZN]), img_dim_x, img_dim_y, img_dim_z, x_y_lim, z_lim) and inside_img(np.asarray([XG,YG,ZG]), img_dim_x,img_dim_y,img_dim_z,x_y_lim,z_lim):
                if distance_um([XN,YN,ZN], [XG,YG,ZG], x_spacing, y_spacing, z_spacing)<18:
                    n_centroids.append((XN,YN,ZN))
                    g_centroids.append((XG,YG,ZG))     
            
    nuclei_centroids = np.asarray(n_centroids)
    golgi_centroids = np.asarray(g_centroids)
    
    #print('Evaluation')
    ''' Assignment nuclei centroids '''
    ## compute the Euclidean distance between the predicted and ground truth centroids
    matrix = np.zeros((len(nuclei_centroids),len(nuclei_centroids_gt)))
    
    ## build the cost matrix
    for i in range(0,len(nuclei_centroids)):
        for j in range(0,len(nuclei_centroids_gt)):
            matrix[i,j] = distance_um(nuclei_centroids[i], nuclei_centroids_gt[j], x_spacing, y_spacing, z_spacing) + distance_um(golgi_centroids[i], golgi_centroids_gt[j], x_spacing, y_spacing, z_spacing)
    
    matrix[matrix>10] = 2000
    
    ## method to solve the linear assignment problem
    row_ind, col_ind = linear_sum_assignment(matrix)
    
    ''' Compute the metrics for the vectors '''
    for n_th, g_th, thlvl in zip(nuclei_thresholds, golgi_thresholds, levels_):
        metrics = pd.DataFrame(columns = ["Image", "Method", "Type", "NucleusTh", "GolgiTh", "Threshold_level",
                                      "cosine similarity", "vec_error", "nuclei", "golgi"])

        if thlvl==lvl_:
            index_tp = []  ## positions in vectors nuclei_centroids, golgi_centroids, that are
                            ## true positives
                            
            index_tp_gt = [] ## positions in vectors nuclei_centroids_gt and golgi_centroids_gt,
                              ## that correspond to true positives

        for i in range(0, len(row_ind)):
            n_coord = nuclei_centroids[row_ind[i]]
            g_coord = golgi_centroids[row_ind[i]]
        
            vec = g_coord - n_coord
        
            n_coord_gt = nuclei_centroids_gt[col_ind[i]]
            g_coord_gt = golgi_centroids_gt[col_ind[i]]
        
            vec_gt = g_coord_gt - n_coord_gt
            
            dist_n_centroids = distance_um(n_coord, n_coord_gt, x_spacing, y_spacing, z_spacing)
            dist_g_centroids = distance_um(g_coord, g_coord_gt, x_spacing, y_spacing, z_spacing)
            vec_error = distance_um(vec, vec_gt, x_spacing, y_spacing, z_spacing)
            
            cos_sim = cosine_similarity(vec, vec_gt)
            
            if dist_n_centroids<=n_th and dist_g_centroids<=g_th:
                res = {"Image": imgs[image_nb], "Method": method, "Type": info[image_nb], "NucleusTh": n_th, "GolgiTh": g_th,
                       "Threshold_level": thlvl,
                       "cosine similarity": abs(cos_sim), "vec_error": vec_error, 
                       "nuclei": dist_n_centroids, "golgi": dist_g_centroids}
                
                res_aux = {"Image": imgs[image_nb], "Method": method, "Type": info[image_nb], "NucleusTh": n_th, "GolgiTh": g_th,
                       "Threshold_level": thlvl, "index_tp_gt": col_ind[i],
                       "cosine similarity": abs(cos_sim), "vec_error": vec_error, 
                       "nuclei": dist_n_centroids, "golgi": dist_g_centroids}
                
                row_aux = len(allmetrics)
                allmetrics.loc[row_aux] = res_aux
                
                row = len(metrics)
                metrics.loc[row] = res
                
                row_stats = len(metrics_stats)
                metrics_stats.loc[row_stats] = res
                
                if thlvl==lvl_:
                    index_tp.append(row_ind[i])
                    index_tp_gt.append(col_ind[i])
                
        
        metrics_mean = metrics.select_dtypes(include=[np.number]).mean()
        metrics_std = metrics.select_dtypes(include=[np.number]).std()
        
        TP = len(metrics)
        
        FP = np.shape(golgi_centroids)[0] - len(metrics)
        
        FN = np.shape(golgi_centroids_gt)[0] - len(metrics)

        TN = len(pred_vectors)-TP-FP-FN
        
        ACC = (TP+TN)/(TP+FP+TN+FN)
        TPR = TP/(TP+FN)
        
        FPR = FP/(FP+TP)
        
        FNR = FN/(FN+TP)
        
        PRECISION = TP/(TP+FP)
        RECALL = TPR
        F1_SCORE = 2*PRECISION*RECALL/(PRECISION+RECALL)
        
        res = {"Image": imgs[image_nb], "Method": method, "Type": info[image_nb], 
               "NucleusTh": n_th, "GolgiTh": g_th, "Threshold_level": thlvl,
               "CosineSimilarityM": metrics_mean['cosine similarity'],
               "CosineSimilaritySTD": metrics_std['cosine similarity'], 
               "VecErrorM": metrics_mean['vec_error'],
               "VecErrorSTD": metrics_std['vec_error'],
               "DistanceNuM": metrics_mean['nuclei'], 
               "DistanceNuSTD": metrics_std['nuclei'], 
               "DistanceGoM": metrics_mean['golgi'], 
               "DistanceGoSTD": metrics_std['golgi'], 
               "TP": TP, 
               "FP": FP, 
               "FN": FN, 
               "TPR": TPR, 
               "FPR": FPR,
               "FNR": FNR,
               "ACC":ACC,
               "PRECISION":PRECISION,
               "F1_SCORE":F1_SCORE}
        return res

# Not Grouped

In [None]:
#Uncomment here to use LATEST results with data WITHOUT normalization
#pred_dir = r"../results/deep_learning/results_real_automatic_not_normalized/trial1/Results_0_constraints/" #GNN w/ Angular Features
#pred_dir = r"../results/deep_learning/results_real_automatic_not_normalized/trial1/Results_1_constraints/" #GNN w/o Angular Features
#pred_dir = r"../results/deep_learning/results_real_automatic_not_normalized/trial1/Results_2_constraints/" #MLP w/ Angular Features
#pred_dir = r"../results/deep_learning/results_real_automatic_not_normalized/trial1/Results_3_constraints/" #MLP w/o Angular Features

#Uncomment here to use results for classical algorithms
#pred_dir = r"../results/HopcroftKarp_RealDataCNNAutomatic_k10/" #Hopcroft-Karp
pred_dir = r"../results/JonkerVolgenant_RealDataCNNAutomatic_k10/" #Jonker-Volgenant


performance_metrics = pd.DataFrame(columns = ["Image", "Method", "Type", "NucleusTh", "GolgiTh", "Threshold_level",
                                              "CosineSimilarityM",
                                              "CosineSimilaritySTD", "VecErrorM","VecErrorSTD",
                                              "DistanceNuM", "DistanceNuSTD", "DistanceGoM",
                                              "DistanceGoSTD", "TP", "FP", "FN","ACC", "TPR", "FPR", "FNR", "PRECISION", "F1_SCORE"])

metrics_stats = pd.DataFrame(columns = ["Image", "Method", "Type", "NucleusTh", "GolgiTh", "Threshold_level",
                                        "cosine similarity", "vec_error", "nuclei", "golgi"])

allmetrics = pd.DataFrame(columns = ["Image", "Method", "Type", "NucleusTh", "GolgiTh", "Threshold_level",
                                      "index_tp_gt", "cosine similarity", "vec_error", "nuclei", "golgi"])

for image_nb in tqdm(numbers_):
        res = eval_results_batch(pred_dir, image_nb, allmetrics, metrics_stats)
        
        
        row = len(performance_metrics)
        performance_metrics.loc[row] = res

final_metrics = performance_metrics.groupby(["Threshold_level"], as_index=False).agg({'CosineSimilarityM': np.mean,
                                                 "CosineSimilaritySTD": np.mean,
                                                 "VecErrorM": np.mean,
                                                 "VecErrorSTD": np.mean,
                                                 "DistanceNuM": np.mean, 
                                                 "DistanceNuSTD": np.mean, 
                                                 "DistanceGoM": np.mean, 
                                                 "DistanceGoSTD": np.mean, 
                                                 "TP": np.sum, 
                                                 "FP": np.sum, 
                                                 "FN": np.sum, 
                                                 "ACC": np.mean,
                                                 "TPR": np.mean, 
                                                 "FPR": np.mean,
                                                 "FNR": np.mean,
                                                 "PRECISION": np.mean,
                                                 "F1_SCORE":np.mean})
final_metrics

In [None]:
print(final_metrics.to_latex())

# Grouped

In [None]:
trials_dfs = []

list_pred_dirs = [r"../results/deep_learning_with_edge_feats_with_node_feats/results_real_automatic_normalized/trial1/",
                    r"../results/deep_learning_with_edge_feats_with_node_feats/results_real_automatic_normalized/trial2/",
                  r"../results/deep_learning_with_edge_feats_with_node_feats/results_real_automatic_normalized/trial3/",
                  ]

for pred_dir in tqdm(list_pred_dirs):
    records = []
    for pred_subfolder in os.listdir(pred_dir):
        
        pred_subfolder = os.path.join(pred_dir, pred_subfolder)

        performance_metrics = pd.DataFrame(columns = ["Image", "Method", "Type", "NucleusTh", "GolgiTh", "Threshold_level",
                                              "CosineSimilarityM",
                                              "CosineSimilaritySTD", "VecErrorM","VecErrorSTD",
                                              "DistanceNuM", "DistanceNuSTD", "DistanceGoM",
                                              "DistanceGoSTD", "TP", "FP", "FN", "ACC", "TPR", "FPR", "FNR", "PRECISION", "F1_SCORE"])

        metrics_stats = pd.DataFrame(columns = ["Image", "Method", "Type", "NucleusTh", "GolgiTh", "Threshold_level",
                                                "cosine similarity", "vec_error", "nuclei", "golgi"])

        allmetrics = pd.DataFrame(columns = ["Image", "Method", "Type", "NucleusTh", "GolgiTh", "Threshold_level",
                                            "index_tp_gt", "cosine similarity", "vec_error", "nuclei", "golgi"])
        for image_nb in numbers_:

            res = eval_results_batch(pred_subfolder, image_nb, allmetrics, metrics_stats)
            row = len(performance_metrics)
            performance_metrics.loc[row] = res
            

        #Aggregate Metrics
        final_metrics = performance_metrics.groupby(["Threshold_level"], as_index=False).agg({'CosineSimilarityM': np.mean,
                                                 "CosineSimilaritySTD": np.mean,
                                                 "VecErrorM": np.mean,
                                                 "VecErrorSTD": np.mean,
                                                 "DistanceNuM": np.mean, 
                                                 "DistanceNuSTD": np.mean, 
                                                 "DistanceGoM": np.mean, 
                                                 "DistanceGoSTD": np.mean, 
                                                 "TP": np.sum, 
                                                 "FP": np.sum, 
                                                 "FN": np.sum, 
                                                 "ACC": np.mean,
                                                 "TPR": np.mean, 
                                                 "FPR": np.mean,
                                                 "FNR": np.mean,
                                                 "PRECISION": np.mean,
                                                 "F1_SCORE":np.mean})

        #print(final_metrics)
        record = {}
        params_path = os.path.join(pred_subfolder, "params.json")
        with open(params_path, 'r') as file:
            params_data = json.load(file)
        record.update(params_data["job_parameters"])
        record.update(final_metrics.to_dict('records')[0])
        record["Constraints"] = os.path.basename(pred_subfolder)
        records.append(record)

    #Get the dataframe of the records
    records_df = pd.DataFrame(records)
    records_df = records_df[["edge_feats", "model_type",  "Constraints",
                            'Threshold_level',
                            'CosineSimilarityM', 'CosineSimilaritySTD', 'VecErrorM', 'VecErrorSTD',
                            'DistanceNuM', 'DistanceNuSTD', 'DistanceGoM', 'DistanceGoSTD', 'TP',
                            'FP', 'FN', 'ACC', 'TPR', 'FPR', 'FNR', 'PRECISION', 'F1_SCORE']]

    #modify angular features col
    records_df = records_df.rename(columns={'edge_feats': 'Edge Feats'})
    records_df["Edge Feats"] = records_df["Edge Feats"].apply(lambda x: ("New " if "x1" in x else "") + 
                                                           ("Angular" if any("angle" in item for item in x) else "Cartesian"))

    #modify model_type col
    model_type_transform = {"GNN_Classifier":"GNN"}
    records_df["model_type"] = records_df["model_type"].apply(lambda x: model_type_transform.get(x, x)) 

    # Apply the transformation using vectorized function
    records_df['Constraints'] = records_df['Constraints'].apply(transform_constraints_column)
    constraints_transform = {"constraints":"Greedy", "constraints_threshold":"Greedy w/ Threshold", "constraints_opt": "Optimization"}
    records_df['Constraints'] = records_df['Constraints'].apply(lambda x : constraints_transform.get(x,x))
    trials_dfs.append(records_df)

In [None]:
for _trial_df in trials_dfs:
    display(_trial_df[["Edge Feats","model_type","Constraints","CosineSimilarityM","CosineSimilaritySTD","VecErrorM","VecErrorSTD","ACC","TPR","FPR","PRECISION","F1_SCORE"]].sort_values(by=['Edge Feats', 'Constraints','model_type']))

In [None]:
combined_df = pd.concat(trials_dfs)
grouped = combined_df.groupby(['Edge Feats', 'model_type', 'Constraints']).agg(
    {'Threshold_level': ['mean', 'std', 'min', 'max'],
    'CosineSimilarityM': ['mean', 'std', 'min', 'max'],
    'CosineSimilaritySTD': ['mean', 'std', 'min', 'max'],
    'VecErrorM': ['mean', 'std', 'min', 'max'],
    'VecErrorSTD': ['mean', 'std', 'min', 'max'],

    'DistanceNuM': ['mean', 'std', 'min', 'max'],
    'DistanceNuSTD': ['mean', 'std', 'min', 'max'],
    'DistanceGoM': ['mean', 'std', 'min', 'max'],
    'DistanceGoSTD': ['mean', 'std', 'min', 'max'],

    'TP': ['mean', 'std', 'min', 'max'],
    'FP': ['mean', 'std', 'min', 'max'],
    'FN': ['mean', 'std', 'min', 'max'],
    'ACC': ['mean', 'std', 'min', 'max'],
    'TPR': ['mean', 'std', 'min', 'max'],
    'FPR': ['mean', 'std', 'min', 'max'],
    'FNR': ['mean', 'std', 'min', 'max'],
    'PRECISION': ['mean', 'std', 'min', 'max'],
    'F1_SCORE': ['mean', 'std', 'min', 'max']
    })

grouped = grouped.reset_index(level=['Edge Feats', 'model_type', 'Constraints'])

def format_column(_mean, _min, _max, _std):
    reference = abs(_min) if abs(_min) > abs(_max) else abs(_max)
    difference = abs(_mean-reference)
    return str(round(_mean, 3)) + "±" + str(round(difference, 3))

numeric_cols = ['Threshold_level', 'CosineSimilarityM', 'CosineSimilaritySTD',
     'VecErrorM', 'VecErrorSTD', 'DistanceNuM', 'DistanceNuSTD', 'DistanceGoM',
     'DistanceGoSTD', 'TP', 'FP', 'FN', 'ACC', 'TPR', 'FPR', 'FNR', 'PRECISION', 'F1_SCORE']
for col in numeric_cols:  # Iterate over the first level of multi-level columns
    cols = [(col, 'mean'), (col, 'min'), (col, 'max'), (col, 'std')]
    grouped[(col,"")] = grouped.apply(lambda row: format_column(row[cols[0]], row[cols[1]], row[cols[2]], row[cols[3]]), axis=1)
    grouped = grouped.drop(columns= cols)

grouped.columns = grouped.columns.map(''.join)
#grouped = grouped[["Data Train","Algorithm", "Constraints","Angles",	"ROC AUC Score","Accuracy","Precision",	"Recall","F1-Score"]]
display(grouped.sort_values(by=['Edge Feats', 'Constraints','model_type']))

In [None]:
grouped = grouped[["Edge Feats","model_type","Constraints","CosineSimilarityM","VecErrorM","ACC","TPR","FPR", "PRECISION", "F1_SCORE"]]
grouped

In [None]:
# Transform Algorithm column
grouped = grouped.sort_values(by=["Edge Feats","model_type"], ascending=[True, False]).reset_index(drop=True)

grouped['model_type'] = grouped['model_type'].replace({
    'GNN_Classifier_NonRecurrent': 'MPNN Non-Recurrent',
    'GNN_Classifier_Recurrent': 'MPNN Recurrent'
})

# Add Angular Features information to Algorithm column
grouped['model_type'] = grouped.apply(lambda row: f"{row['model_type']} {'w/ Angular Features' if 'Angular' in row['Edge Feats'] else 'w/o Angular Features'}", axis=1)

# Drop unnecessary columns
grouped = grouped.drop(columns=["Edge Feats"])

print(grouped.to_latex())
grouped