### Extract test results

The following notebook is to extrapolate the data used in testing in the ./ml_training/test_results/...

In [29]:
import numpy as np
import os
from tqdm import tqdm
import pickle
from sklearn.metrics import (f1_score, precision_score, recall_score)
import pandas as pd

In [31]:
# Only set to True if you don't have the pickle file (LEAVE THIS FALSE IF YOU HAVE THE PICKLE FILE)
if True:
    results_path = "./ml_training/test_results/"

    # Iterate through the results files and load the folders into dicts
    results = {}
    entries = os.listdir(results_path)
    for entry in tqdm(entries):
        if os.path.isdir(results_path + entry):
            results[entry] = {}
            sub_entries = os.listdir(results_path + entry)
            for sub_entry in sub_entries:
                if os.path.isdir(results_path + entry + "/" + sub_entry):
                    results[entry][sub_entry] = {}
                    sub_sub_entries = os.listdir(results_path + entry + "/" + sub_entry)
                    # Extract the one file with .csv extension
                    for sub_sub_entry in sub_sub_entries:
                        if sub_sub_entry.endswith(".csv"):
                            final_path = results_path + entry + "/" + sub_entry + "/" + sub_sub_entry
                            # For all the results will be of variable length, so we need to load them row by row
                            if "TEST" in final_path:
                                continue
                            if "all" in final_path:
                                result = []
                                with open(final_path, "r") as f:
                                    for line in f:
                                        row = np.array(line.split(",")).astype(float)
                                        row[row >= 0.5] = 1
                                        row[row < 0.5] = 0
                                        result.append(row.astype(np.int8))
                                results[entry][sub_entry] = result
                            else:
                                results[entry][sub_entry] = np.genfromtxt(final_path, delimiter=",", dtype=float)
                                # Greater than 0.5 is 1 result else 0
                                results[entry][sub_entry][results[entry][sub_entry] >= 0.5] = 1
                                results[entry][sub_entry][results[entry][sub_entry] < 0.5] = 0
                                results[entry][sub_entry] = results[entry][sub_entry].astype(np.int8)
                            break

    # Save the dict with pickle
    with open("test_results.pkl", "wb") as f:
        pickle.dump(results, f)

100%|██████████| 7/7 [04:04<00:00, 34.91s/it]


In [32]:
results = pickle.load(open("test_results.pkl", "rb"))
# Print keys and subkeys
for key in results.keys():
    print(key)
    for subkey in results[key].keys():
        print("\t" + subkey)
    print()

print("The names listed above were used for training the models. Get in touch with Michal to find out what the test set was or you can figure it out from ml_running.py")

all
	GRU
	LocalTransformer
	LSTM
	Transformer

cross_animal_comprehensive
	AA034-AA036
	AA034-AA058
	AA034-PL010
	AA036-AA034
	AA036-AA058
	AA036-PL010
	AA058-AA034
	AA058-AA036
	AA058-PL010
	PL010-AA034
	PL010-AA036
	PL010-AA058

cross_day_cross_session
	AA034_D1S1
	AA036_D2S1
	AA058_D1S1
	PL010_D1S1

cross_day_same_session
	AA034_D1S1
	AA036_D2S1
	AA058_D1S1
	PL010_D1S1

cross_session_same_day
	AA034_D1S1
	AA036_D2S1
	AA058_D1S1
	PL010_D1S1

TEST
	cross_animal
	cross_day_cross_session
	cross_day_same_session
	cross_session_same_day
	within_session

within_session
	AA034_D1S1
	AA036_D2S1
	AA058_D1S1
	PL010_D1S1

The names listed above were used for training the models. Get in touch with Michal to find out what the test set was or you can figure it out from ml_running.py


In [21]:
# The following function takes in three arguments, the experiment type, the training data and a function to apply to the data.
# You can omit providing the training data and it will accumulate the results for all subkeys in the results dict.

def apply_to_results(experiment_type, func, training_name=None, average=False):
    if experiment_type not in results.keys():
        raise ValueError("The experiment type provided is not in the results dict")
    if training_name is not None:
        if training_name not in results[experiment_type].keys():
            raise ValueError("The experiment type provided is not in the results dict")
        training_name = [training_name]
    else:
        training_name = results[experiment_type].keys()
    
    # We need to extract the results. It is set up in the following manner:
    # Each two rows represent the ground truth and predictions respectively. There should be 60 rows because each experiment was repeated 5 times, for values [1, 2, 5, 10, 15, 20]. So 2*5*6=60
    # Therefore the func should be applied to each pair of rows.

    metrics = {}
    for name in training_name:
        arr = results[experiment_type][name]
        if experiment_type == "all":
            arr = arr[-20:]
        cell_length_list = [2] * 10 + [3] * 10 + [5] * 10 + [10] * 10 + [15] * 10 + [20] * 10  if experiment_type != "all" else ["max", "max" , "max", "max", "max", "max", "max" , "max", "max", "max"]
        for i, size in enumerate(cell_length_list):
            ground_truth = arr[i*2]
            predictions = arr[i*2+1]
            if metrics.get(size) is None:
                metrics[size] = []
            metrics[size].append(func(ground_truth, predictions))
    
    if average:
        for key in metrics.keys():
            metrics[key] = np.mean(metrics[key])
    
    return metrics


def true_positives(y_true, y_pred, target=1):
    return np.sum(np.logical_and(y_true == target, y_pred == target))

def true_negatives(y_true, y_pred, target=0):
    return np.sum(np.logical_and(y_true == target, y_pred == target))

def false_positives(y_true, y_pred, target=1):
    return np.sum(np.logical_and(y_true == target, y_pred != target))

def false_negatives(y_true, y_pred, target=0):
    return np.sum(np.logical_and(y_true == target, y_pred != target))

def _extract_indices(indices):
        indices = indices.nonzero()[0]
        if indices.any():
            # Split up the indices into groups
            indices = np.split(indices, np.where(np.diff(indices) != 1)[0]+1)
            # Now Split the indices into pairs of first and last indices
            indices = [(indices_group[0], indices_group[-1]+1) for indices_group in indices]

        return indices

def _overlaps(range1, range2):
    return len(range(max(range1[0], range2[0]), min(range1[1], range2[1]))) != 0

def transient_overlap(y_true, y_pred, threshold=0.5):
    # Not sure if this works as necessary avoid for the time being.    
    indices_true = _extract_indices(y_true)
    indices_pred = _extract_indices(y_pred)

    # Now iterate through the indices and check if they overlap and if they do overlap what is the percentage of overlap
    tp = 0
    fp = 0
    fn = 0

    # We'll iterate through both simultaneously and check their ranges, we'll pop from the list anything that is outside the range or already counted
    while indices_true and indices_pred:
        true_range = indices_true[0]
        pred_range = indices_pred[0]

        # Check if there is overlap
        if _overlaps(true_range, pred_range):
            # Check the percentage of overlap
            overlap = len(range(max(true_range[0], pred_range[0]), min(true_range[1], pred_range[1])))
            overlap_percentage = overlap / (pred_range[1] - pred_range[0])
            if overlap_percentage >= threshold:
                tp += 1
                # Pop both indices
                indices_true.pop(0)
                indices_pred.pop(0)
            elif true_range[1] < pred_range[1]:
                indices_true.pop(0)
                fn += 1
            else:
                indices_pred.pop(0)
                fp += 1

        else:
            # Whichever ends first, pop it
            if true_range[1] < pred_range[1]:
                indices_true.pop(0)
                fn += 1
            else:
                indices_pred.pop(0)
                fp += 1
        
    # Now we need to add the remaining indices
    fn += len(indices_true)
    fp += len(indices_pred)

    return (tp, fp, fn)

def dict_to_flattened(dict_):
    flattened = []
    for value in dict_.values():
        flattened.extend(value)
    return flattened

def to_csv(metrics, filename, experiment_type):
    # get the first value and check the length
    length = len(metrics[0][1][2]) if experiment_type != "all" else 10
    
    column_names = []
    no_cells_list = [2, 3, 5, 10, 15, 20] if experiment_type != "all" else ["max"]
    for no_cells in no_cells_list:
        for run in range(1, 11):
            if length == 10:
                column_names.append(f"C{no_cells} R{run}")
            else:
                for name in ["AA034_D1S1", "AA036_D2S1", "AA058_D1S1", "PL010_D1S1"]:
                    column_names.append(f"{name} C{no_cells} R{run}")
    
    # Create pandas dataframe and insert column names
    df = pd.DataFrame(columns=column_names)
    for name, metric in metrics:
        df.loc[name] = dict_to_flattened(metric)

    with open(filename, "w") as f:
        df.to_csv(f, lineterminator="\n")
        
                


In [22]:
f1_macro = lambda x, y: f1_score(x, y, average="macro")
f1_standard = lambda x, y: f1_score(x, y)
f1_standard_nt = lambda x, y: f1_score(x, y, pos_label=0)
tp_t = lambda x, y: true_positives(x, y)
tp_nt = lambda x, y: true_positives(x, y, target=0)
tn_t = lambda x, y: true_negatives(x, y)
tn_nt = lambda x, y: true_negatives(x, y, target=1)
fp_t = lambda x, y: false_positives(x, y)
fp_nt = lambda x, y: false_positives(x, y, target=0)
fn_t = lambda x, y: false_negatives(x, y)
fn_nt = lambda x, y: false_negatives(x, y, target=1)
precision_t = lambda x, y: precision_score(x, y)
precision_nt = lambda x, y: precision_score(x, y, pos_label=0)
recall_t = lambda x, y: recall_score(x, y)
recall_nt = lambda x, y: recall_score(x, y, pos_label=0)
#trans = lambda x, y: transient_overlap(x, y)

In [23]:
def get_results_and_output(experiment_type="cross_animal_comprehensive", training_name=None):

    # Tests
    f1_result_macro = apply_to_results(experiment_type, f1_macro, training_name=training_name, average=False)
    f1_result_standard = apply_to_results(experiment_type, f1_standard, training_name=training_name, average=False)
    f1_result_standard_non_transient = apply_to_results(experiment_type, f1_standard_nt, training_name=training_name, average=False)

    tp_result_transient = apply_to_results(experiment_type, tp_t, training_name=training_name, average=False)
    tp_result_non_transient = apply_to_results(experiment_type, tp_nt, training_name=training_name, average=False)
    tn_result_transient = apply_to_results(experiment_type, tn_t, training_name=training_name, average=False)
    tn_result_non_transient = apply_to_results(experiment_type, tn_nt, training_name=training_name, average=False)
    fp_result_transient = apply_to_results(experiment_type, fp_t, training_name=training_name, average=False)
    fp_result_non_transient = apply_to_results(experiment_type, fp_nt, training_name=training_name, average=False)
    fn_result_transient = apply_to_results(experiment_type, fn_t, training_name=training_name, average=False)
    fn_result_non_transient = apply_to_results(experiment_type, fn_nt, training_name=training_name, average=False)
    precision_transient = apply_to_results(experiment_type, precision_t, training_name=training_name, average=False)
    precision_non_transient = apply_to_results(experiment_type, precision_nt, training_name=training_name, average=False)
    recall_transient = apply_to_results(experiment_type, recall_t, training_name=training_name, average=False)
    recall_non_transient = apply_to_results(experiment_type, recall_nt, training_name=training_name, average=False)

    metrics = [("TP_Transient", tp_result_transient), ("TP_No_Transient", tp_result_non_transient), 
           ("TN_Transient", tn_result_transient), ("TN_No_Tansient", tn_result_non_transient),
             ("FP_Transient", fp_result_transient), ("FP_No_Transient", fp_result_non_transient), 
             ("FN_Transient", fn_result_transient), ("FN_No_Transient", fn_result_non_transient),
             ("Precision_Transient", precision_transient), ("Precision_No_Transient", precision_non_transient),
              ("Recall_Transient", recall_transient), ("Recall_No_Transient", recall_non_transient),
              ("F1_Macro", f1_result_macro), ("F1_Standard", f1_result_standard), ("F1_Standard_No_Transient", f1_result_standard_non_transient)]

    if training_name is None:
        training_name = "all"
    to_csv(metrics, f"{experiment_type}_{training_name}.csv", experiment_type)


In [33]:
experiment_type = "cross_session_same_day"
for training_name in results[experiment_type].keys():
    get_results_and_output(experiment_type, training_name)