In [1]:
import os
import numpy as np
import pandas as pd
from pprint import pprint
from scipy.stats import wilcoxon
from itertools import combinations, product
from sanitize_ml_labels import sanitize_ml_labels

In [2]:
RESULT_PATH = "./wilcoxon"
os.makedirs(RESULT_PATH, exist_ok=True)

# Helper

In [3]:
squared = lambda x: (y for y in product(list(x), list(x)) if y[0] != y[1])

In [4]:
accuracy = lambda x: x.Accuracy.values
auprc = lambda x: x.AUPRC.values
auroc = lambda x: x.AUROC.values
metrics = {
    sanitize_ml_labels("accuracy"):accuracy,
    sanitize_ml_labels("auprc"):auprc,
    sanitize_ml_labels("auroc"):auroc
}

In [5]:
def wilcoxon_test(x, y, include_p_value=False, p_threshold=0.05):
    diff = x - y
    if np.isclose(diff, 0).all():
        return {
        "win":0,
        "tie":1,
        "losses":0,
        "pvalue":0,
        }
    
    stats, pvalue = wilcoxon(x, y)
    if pvalue <= p_threshold:
        if (diff > 0).mean() > 0.5:
            win = 1
            lose = 0
        else:
            win = 0
            lose = 1
        tie = 0
    else:
        tie = 1
        win = 0
        lose = 0
    r = {
        "win":win,
        "tie":tie,
        "losses":lose,
    }
    if include_p_value:
        r["pvalue"] = pvalue
    
    return r

In [6]:
def compare_2_models(d1, d2, extract_metric, include_p_value=False):
    
    if d1 is None or d2 is None:
        return {
        }
    
    x = extract_metric(d1)
    y = extract_metric(d2)
    
    assert wilcoxon_test(x, y)["win"] == wilcoxon_test(y, x)["losses"]
    assert wilcoxon_test(x, y)["tie"] == wilcoxon_test(y, x)["tie"]
    
    return wilcoxon_test(x, y, include_p_value)
    

In [7]:
def sanitize_df(df):
    df.columns = sanitize_ml_labels(df.columns)
    df.index.names = list(
        map(lambda x: x[:-1] if x[-1].isdigit() else x , 
            sanitize_ml_labels(df.index.names)
        )
    )
    for col in df.columns[df.dtypes == object]:
        df[col] = sanitize_ml_labels(df[col])
        
    return df

In [8]:
def df_to_latex(df, name, task, metric=None):
    path = f"{name}_{task}"
    caption = f"Win-Tie-Losses table for {sanitize_ml_labels(task)} obtained from Wilcoxon signed-rank test."
    label = f"tab:{task}_{name}"
    if metric:
        path += "_" + metric
        caption = caption[:-1] + f"on {metric}."
        
    df = sanitize_df(df)
    df.to_csv(RESULT_PATH + f"_csv/{path}.csv")
    df = df.reset_index()
    df.columns = [
            "\\textbf{%s}"%x
            for x in df.columns
        ]
    result = df.to_latex(
        index=False,
        column_format="|{}|".format(
                "|".join("c" * len(df.columns))
            ),
        escape=False,
        )
    result = result.replace(r"\end{tabular}", r"\end{tabular}"+f"\n\\caption{{{caption}}}\n\\label{{{label}}}")
    result = result.replace("\\toprule", "") 
    result = result.replace("\\midrule", "")
    result = result.replace("\\bottomrule", "")
    result = result.replace("\\\\\n", "\\\\\n\\hline\n")
    with open(RESULT_PATH + f"_tex/{path}.tex", "w") as f:
        f.write("\\begin{table}[H]\n\\centering\n"+result+r"\end{table}")

# Tabella win tie loss per modello per dati di training (12, 3)

In [9]:
def model_tables(df, task, name="model_training_data_comparison"):
    combined_group = {
        key:val
        for key, val in df.groupby(["Model", "Trained on"])
    }
    res = pd.DataFrame([
        {
            "metric":metric,
            "model1":m1,
            "train1":t1,
            "model2":m2,
            "train2":t2,
            **compare_2_models(combined_group[(m1, t1)], combined_group[(m2, t2)], metric_function),
        }
        for metric, metric_function in metrics.items()
        for (m1, t1), (m2, t2) in squared(list(product(set(df.Model), set(df["Trained on"]))))
    ])
    for metric in metrics.keys():
        r = res[res.metric == metric].groupby(["train1", "model1"]).sum()
        df_to_latex(r, name, task, metric)

# Risultato wilcoxon per tipo di dati di training

In [10]:
def train_tables(df, task, name="training_data_comparison"):
    train_groupby = {
        key:val
        for key, val in df.groupby(["Trained on"])
    }
    res = pd.DataFrame([
        {
            "metric":metric,
            "train1":t1,
            "train2":t2,
            **compare_2_models(train_groupby[t1], train_groupby[t2], metric_function, True),
        }
        for metric, metric_function in metrics.items()
        for t1, t2 in combinations(set(df["Trained on"]), 2)

    ])
    res = res.set_index("metric")
    df_to_latex(res, name, task)

# Tabella win tie losses per modello (6, 3)

In [11]:
def model_total_tables(df, task, name="model_comparison", models=None):
    if not models:
        models = df.Model.unique()
    model_groupby = {
        key:val
        for key, val in df.groupby(["Model"])
    }
    res = pd.DataFrame([
        {
            "metric":metric,
            "model1":m1,
            "model2":m2,
            **compare_2_models(model_groupby[m1], model_groupby[m2], metric_function),
        }
        for metric, metric_function in metrics.items()
        for m1, m2 in squared(models)

    ])
    for metric in metrics.keys():
        r = res[res.metric == metric].groupby(["model1"]).sum()
        df_to_latex(r, name, task, metric)

# Weighted comparison of CAE

In [12]:
def weighted_cae_comparison(df, task, name="weighted_cae_comparison"):
    df = df[df.Target == "All nucleotides"]
    models = [x for x in df.Model.unique() if "cae" in x.lower()]
    model_groupby = {
        key:val
        for key, val in df.groupby(["Model", "Weight"])
    }
    res = pd.DataFrame([
        {
            "metric":metric,
            "model":m,
            "weight1":w1,
            "weight2":w2,
            **compare_2_models(model_groupby[m, w1], model_groupby[m, w2], metric_function),
        }
        for metric, metric_function in metrics.items()
        for m in models
        for w1, w2 in product(list(df.Weight.unique()), list(df.Weight.unique()))

    ])
    for metric in metrics.keys():
        r = res[res["metric"] == metric].groupby(["weight1"]).sum()
        df_to_latex(r, name, task, metric)

In [13]:
def weighted_comparison_all(df, task, name="weighted_comparison_all"):
    df = df[df.Target == "All nucleotides"]
    models = df.Model.unique()
    model_groupby = {
        key:val
        for key, val in df.groupby(["Model", "Weight"])
    }
    res = pd.DataFrame([
        {
            "metric":metric,
            "model1":m1,
            "model2":m2,
            "weight1":w1,
            "weight2":w2,
            **compare_2_models(model_groupby.get((m1, w1), None), model_groupby.get((m2, w2), None), metric_function),
        }
        for metric, metric_function in metrics.items()
        for m1, m2 in product(list(models), list(models))
        for w1, w2 in product(list(df.Weight.unique()), list(df.Weight.unique()))
        if (m1, w1) != (m2, w2)

    ])
    res = res[~res.win.isna()]
    for metric in metrics.keys():
        r = res[res["metric"] == metric].groupby(["model1", "weight1"]).sum()
        df_to_latex(r, name, task, metric)

# Nucleotides performance check

In [14]:
def nucletoide_tables_wtl(df, task, name="nucleotides_comparison", model_type=None):
    df = df[df.Target != "All nucleotides"]
    targets = df.Target.unique()
    if model_type:
        df = df[df.Model.str.contains(model_type)]
    target_groupby = {
        key:val
        for key, val in df.groupby(["Target"])
    }
    res = pd.DataFrame([
        {
            "metric":metric,
            "target1":t1,
            "target2":t2,
            **compare_2_models(target_groupby[t1], target_groupby[t2], metric_function),
        }
        for metric, metric_function in metrics.items()
        for t1, t2 in squared(targets)

    ])
    for metric in metrics.keys():
        r = res[res.metric == metric].groupby(["target1"]).sum()
        df_to_latex(r, name, task, metric)

In [15]:
def nucletoide_means(df, task, name="nucleotides_means", model_type=None):
    df = df[df.Target != "All nucleotides"]
    targets = df.Target.unique()
    if model_type:
        df = df[df.Model.str.contains(model_type)]
    target_groupby = {
        key:val
        for key, val in df.groupby(["Target"])
    }
    res = pd.DataFrame([
        {
            "metric":metric,
            "target":t,
            "mean":target_groupby[t][metric].mean(),
        }
        for metric, metric_function in metrics.items()
        for t in targets

    ])
    for metric in metrics.keys():
        r = res[res.metric == metric]
        df_to_latex(r, name, task, metric)

# Compare best cnn and cae models

In [16]:
def confront_best_models(df, task, name="best_models_comparison", models=None):
    if not models:
        models = df.Model.unique()
    model_groupby = {
        key:val
        for key, val in df.groupby(["Model"])
    }
    res = pd.DataFrame([
        {
            "metric":metric,
            "model":"%s vs %s"%(m, m2),
            **compare_2_models(model_groupby[m], model_groupby[m2], metric_function, True),
        }
        for metric, metric_function in metrics.items()
        for m, m2 in combinations(models, 2)

    ])
    res = res.set_index("metric")
    df_to_latex(res, name, task)

# Gap filling

In [17]:
def get_gap_filling_data(path="./reports/"):
    df = pd.concat([
        pd.read_csv(path+file, index_col=0)
        for file in os.listdir(path)
    ])
    df = df[df.task == "gap_filling"]
    df = df.drop("dataset", axis=1)
    df = df[df.run_type != "biological validation"]
    df = sanitize_df(df)
    df["Weight"] = "0"
    
    weights = df["Weight"].values
    trainedon = df["Trained on"].values
    
    mask = df["Trained on"].str.contains("weight")
    
    mask_w_2 = df["Trained on"].str.contains("2")
    mask_w_10 = df["Trained on"].str.contains("10")
    
    mask_multigap = df["Trained on"].str.contains("Multivariate")
    mask_singlegap = df["Trained on"].str.contains("Single")
    
    trainedon[mask & mask_multigap] = "Multivariate gaps"
    trainedon[mask & mask_singlegap] = "Single gap"
    weights[mask & mask_w_2] = "2"
    weights[mask & mask_w_10] = "10"
    
    return df

In [18]:
df = get_gap_filling_data()
no_weights = df[df.Weight.values == "0"]

In [19]:
model_tables(no_weights, "gap_filling")
train_tables(no_weights, "gap_filling")
model_total_tables(no_weights, "gap_filling")
model_total_tables(no_weights, "gap_filling_cnn", models=["CNN 200", "CNN 500", "CNN 1000"])
model_total_tables(no_weights, "gap_filling_cae", models=["CAE 200", "CAE 500", "CAE 1000"])

In [20]:
confront_best_models(no_weights, "gap_filling", models=["CNN 1000", "CAE 1000"])

In [21]:
nucletoide_tables_wtl(no_weights, "gap_filling_cae", model_type="CAE")
nucletoide_tables_wtl(no_weights, "gap_filling_cnn", model_type="CNN")
nucletoide_means(no_weights, "gap_filling_cae", model_type="CAE")
nucletoide_means(no_weights, "gap_filling_cnn", model_type="CNN")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  if __name__ == '__main__':


In [22]:
weighted_cae_comparison(df, "gap_filling_weighted")



In [23]:
weighted_comparison_all(df, "gap_filling_weighted")

# Reconstruction

In [24]:
def get_reconstruction_data(path="./reports/"):
    df = pd.concat([
        pd.read_csv(path+file, index_col=0)
        for file in os.listdir(path)
    ])
    df = df[df.task == "reconstruction"]
    df = df.drop("dataset", axis=1)
    df = df[df.run_type != "biological validation"]
    df = sanitize_df(df)
    
    df["Weight"] = "0"
    
    weights = df["Weight"].values
    trainedon = df["Trained on"].values
    
    mask = df["Trained on"].str.contains("weight")
    
    mask_w_2 = df["Trained on"].str.contains("2")
    mask_w_10 = df["Trained on"].str.contains("10")
    
    mask_multigap = df["Trained on"].str.contains("Multivariate")
    mask_singlegap = df["Trained on"].str.contains("Single")
    
    trainedon[mask & mask_multigap] = "Multivariate gaps"
    trainedon[mask & mask_singlegap] = "Single gap"
    weights[mask & mask_w_2] = "2"
    weights[mask & mask_w_10] = "10"
    
    return df

In [25]:
df = get_reconstruction_data()
no_weights = df[df.Weight.values == "0"]

In [26]:
model_tables(no_weights, "reconstruction")
train_tables(no_weights, "reconstruction")
model_total_tables(no_weights, "reconstruction")

In [27]:
nucletoide_tables_wtl(no_weights, "reconstruction")
nucletoide_means(no_weights, "reconstruction")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  if __name__ == '__main__':


In [28]:
weighted_cae_comparison(df, "reconstruction")

In [29]:
weighted_comparison_all(df, "reconstruction_weighted")