# Benchmark - Error calculation -

In [None]:
import pandas as pd
from glob import glob

from colomoto import minibn
from merrin.bnutils import bn_score_influence_graph

## Parameters

In [None]:
objective = 'Growth'
regulators = ['RPcl', 'RPO2', 'RPh', 'RPb']
inputs = ['Carbon1', 'Carbon2', 'Oxygen', 'Fext', 'Hext']
outputs = ['Biomass', 'Dext', 'Eext']

# Files
reference_simulations = 'data/covert/result/pipeline_nbi/FlexFlux/*.csv'

# Solver statistics
inferring_statistics = './benchmark/results/inferring_statistics.csv'

# BNET
bn_by_instance = './benchmark/results/bn_by_instances.csv'
bnet_dir = './benchmark/results/bnet/BN-{}.bnet'

# Simulations
simulations_dir = './benchmark/results/simulations/*'

# Export
csv_export = './benchmark/results/{}.csv'

## Functions

In [None]:
def residual_sum_of_squares(X: pd.DataFrame, Y: pd.DataFrame, normalize:bool = False):
    assert(X.size == Y.size)
    assert(X.columns.tolist() == Y.columns.tolist())
    
    # normalization
    if normalize:
        X_mean = X.mean(axis=0)
        X_std = X.std(axis=0)
        X_ = (X - X_mean) / X_std
        Y_ = (Y - X_mean) / X_std
    else:
        X_ = X
        Y_ = Y

    # rss
    Z = (X_ - Y_)**2
    rss = Z.sum(axis=1).sum()
    return rss

def mean_squared_error(X, Y):
    assert(X.size == Y.size)
    assert(X.columns.tolist() == Y.columns.tolist())
    rss = residual_sum_of_squares(X, Y)
    mss = rss / X.size
    return mss

In [None]:
def compute_error(ref_df: dict, sim_df: dict):
    assert(sorted(ref_df.keys()) == sorted(sim_df.keys()))

    errors = {'RSS_External_Sum': 0, 'RSS_Regulators_Sum': 0}
    for k in ref_df.keys():
        ref = ref_df[k]
        sim = sim_df[k]

        rss_inout = residual_sum_of_squares(ref[inputs + outputs], sim[inputs + outputs], normalize=True)
        rss_reg = residual_sum_of_squares(ref[regulators], sim[regulators], normalize=False)

        errors[f'RSS_External_{k}'] = rss_inout
        errors[f'RSS_Regulators_{k}'] = rss_reg

        errors['RSS_External_Sum'] += rss_inout
        errors['RSS_Regulators_Sum'] += rss_reg

    return errors

def load_sim(sim_path):
    sims = {}
    for sim in glob(sim_path):
        name = sim.split('/')[-1].strip('.csv')[-4:]
        sims[name] = pd.read_csv(sim, sep='\t')
    return sims

## Boolean networks analysis

### Boolean network error analysis

In [None]:
ref_sims_df = load_sim(reference_simulations)

In [None]:
unreg_sims_df = load_sim('data/covert_no_reg/out/pipeline_nbi/FlexFlux/*.csv')
worst_case_error_ext = compute_error(ref_sims_df, unreg_sims_df)['RSS_External_Sum']
worst_case_error_reg = 5 * 301 * 4 # 5 simulations with 301 time steps with 4 regulators

In [None]:
worst_case_error_ext, worst_case_error_reg

In [None]:
errors = {}

for bn_folder in glob(simulations_dir):
    ref_bn = int(bn_folder.strip('/').split('-')[-1])
    sims_df = load_sim(bn_folder + '/*.csv')
    sim_errors = compute_error(ref_sims_df, sims_df)
    errors[ref_bn] = {'Ref_BN': int(ref_bn)} | sim_errors

In [None]:
df = pd.DataFrame(errors).T
df = df[sorted(df.columns)]

df = df.sort_values(['Ref_BN'])
df['Ref_BN'] = df['Ref_BN'].astype(int)
df = df.round(4)

In [None]:
scores = []
gt_bnet = minibn.BooleanNetwork.load('data/covert/regulatory_network.bnet')
for ref_bn in df.index:
    bnet = minibn.BooleanNetwork.load(bnet_dir.format(ref_bn))
    score = {'Ref_BN': ref_bn} | bn_score_influence_graph(gt_bnet, bnet)
    scores.append(score)
df = df.merge(pd.DataFrame(scores).rename(columns={'recall': 'BN_score_recall', 'precision': 'BN_score_precision'}), on='Ref_BN', how='left')
df

In [None]:
rss_external_col = [f'RSS_External_{s}' for s in ['fig5', 'fig6', 'fig7', 'fig8', 'fig9']]
rss_reg_col = [f'RSS_Regulators_{s}' for s in ['fig5', 'fig6', 'fig7', 'fig8', 'fig9']]
bin_df = df[rss_reg_col + rss_external_col]
for s in ['fig5', 'fig6', 'fig7', 'fig8', 'fig9']:
    bin_df[s] = (bin_df[f'RSS_External_{s}'] + bin_df[f'RSS_Regulators_{s}'])
df['Nb_Perfect_match_simulations'] = bin_df[['fig5', 'fig6', 'fig7', 'fig8', 'fig9']].astype(bool).sum(axis=1).apply(lambda x: 5 - x)
df

In [None]:
df['Simulation_Correctness_External'] = df['RSS_External_Sum'] #1 - df['RSS External - Sum'] / worst_case_error_ext
df['Simulation_Correctness_Regulators'] = df['RSS_Regulators_Sum'] / worst_case_error_reg
df

In [None]:
df.to_csv(csv_export.format('bn_statistics'))

### Instance error analysis

In [None]:
instance_bn = pd.read_csv(bn_by_instance)
instance_bn = instance_bn.drop('Unnamed: 0', axis=1)
# instance_bn = instance_bn.rename({'Data type': 'Data_type', 'Degradation %': 'Degradation', 'Solution ID': 'Solution_ID', 'Ref BN': 'Ref_BN'}, axis=1)
instance_bn

In [None]:
inferring_statistics_df = pd.read_csv(inferring_statistics)
# inferring_statistics_df = inferring_statistics_df.rename({'Data type': 'Data_type', 'Degradation %': 'Degradation'}, axis=1)
inferring_statistics_df[inferring_statistics_df['Status'] != 'SAT']

In [None]:
full_df = instance_bn.merge(df, on='Ref_BN', how='left')
full_df = full_df.sort_values(['Instance', 'Data_type', 'Degradation', 'Seed', 'Solution_ID'])
full_df = full_df.reset_index().drop('index', axis=1)
full_df

In [None]:
full_df.to_csv('./benchmark/results/all_data_statistics.csv')

#### Worst case results - by instance

In [None]:
level_columns = ['Instance', 'Data_type', 'Degradation', 'Seed']

dg = full_df[level_columns].drop_duplicates().reset_index().drop('index', axis=1)

dg['Nb_Solution'] = dg.join(full_df.groupby(level_columns)['Solution_ID'].count(), on=level_columns, rsuffix='_r')['Solution_ID']

dg['Best_Score_External'] = dg.join(full_df.groupby(level_columns)['Simulation_Correctness_External'].min(), on=level_columns, rsuffix='_r')['Simulation_Correctness_External']
dg['Worst_Score_External'] = dg.join(full_df.groupby(level_columns)['Simulation_Correctness_External'].max(), on=level_columns, rsuffix='_r')['Simulation_Correctness_External']

dg['Best_Score_Regulators'] = dg.join(full_df.groupby(level_columns)['Simulation_Correctness_Regulators'].min(), on=level_columns, rsuffix='_r')['Simulation_Correctness_Regulators']
dg['Worst_Score_Regulators'] = dg.join(full_df.groupby(level_columns)['Simulation_Correctness_Regulators'].max(), on=level_columns, rsuffix='_r')['Simulation_Correctness_Regulators']

dg['Best_nb_Perfect_match_Simulation'] = dg.join(full_df.groupby(level_columns)['Nb_Perfect_match_simulations'].max(), on=level_columns, rsuffix='_r')['Nb_Perfect_match_simulations']
dg['Worst_nb_Perfect_match_Simulation'] = dg.join(full_df.groupby(level_columns)['Nb_Perfect_match_simulations'].min(), on=level_columns, rsuffix='_r')['Nb_Perfect_match_simulations']

dg['Best_BN_score_recall'] = dg.join(full_df.groupby(level_columns)['BN_score_recall'].max(), on=level_columns, rsuffix='_r')['BN_score_recall']
dg['Worst_BN_score_recall'] = dg.join(full_df.groupby(level_columns)['BN_score_recall'].min(), on=level_columns, rsuffix='_r')['BN_score_recall']

dg['Best_BN_score_precision'] = dg.join(full_df.groupby(level_columns)['BN_score_precision'].max(), on=level_columns, rsuffix='_r')['BN_score_precision']
dg['Worst_BN_score_precision'] = dg.join(full_df.groupby(level_columns)['BN_score_precision'].min(), on=level_columns, rsuffix='_r')['BN_score_precision']

# dg = dg.round(4)
dg

In [None]:
for i, r in inferring_statistics_df[inferring_statistics_df['Status'] != 'SAT'].iterrows():
    instance = r['Instance']
    datatype = r['Data_type']
    degradation = r['Degradation']
    seed = r['Seed']
    assert(not ((dg['Instance'] == instance) & (dg['Data_type'] == datatype) & (dg['Degradation'] == degradation) & (dg['Seed'] == seed)).any())
    added_row = {
        'Instance': instance,
        'Data_type': datatype,
        'Degradation': degradation,
        'Seed': seed
    }
    dg = dg.append(added_row, ignore_index = True)
dg

In [None]:
dg.to_csv('./benchmark/results/instance_statistics.csv')

#### Worst case results - by degradation level

In [None]:
level_columns = ['Instance', 'Data_type', 'Degradation']

dh = dg[level_columns].drop_duplicates().reset_index().drop('index', axis=1)

dh['Min_nb_Solution'] = dh.join(dg.groupby(level_columns)['Nb_Solution'].min(), on=level_columns, rsuffix='_r')['Nb_Solution']
dh['Max_nb_Solution'] = dh.join(dg.groupby(level_columns)['Nb_Solution'].max(), on=level_columns, rsuffix='_r')['Nb_Solution']

dh['Nb_Different_Solution'] = dh.join(full_df.groupby(level_columns)['Ref_BN'].nunique(), on=level_columns, rsuffix='_r')['Ref_BN']

dh['Best_Score_External'] = dh.join(dg.groupby(level_columns)['Best_Score_External'].min(), on=level_columns, rsuffix='_r')['Best_Score_External']
dh['Worst_Score_External'] = dh.join(dg.groupby(level_columns)['Worst_Score_External'].max(), on=level_columns, rsuffix='_r')['Worst_Score_External']

dh['Best_Score_Regulators'] = dh.join(dg.groupby(level_columns)['Best_Score_Regulators'].min(), on=level_columns, rsuffix='_r')['Best_Score_Regulators']
dh['Worst_Score_Regulators'] = dh.join(dg.groupby(level_columns)['Worst_Score_Regulators'].max(), on=level_columns, rsuffix='_r')['Worst_Score_Regulators']

dh['Best_nb_Perfect_match_Simulation'] = dh.join(dg.groupby(level_columns)['Best_nb_Perfect_match_Simulation'].max(), on=level_columns, rsuffix='_r')['Best_nb_Perfect_match_Simulation']
dh['Worst_nb_Perfect_match_Simulation'] = dh.join(dg.groupby(level_columns)['Worst_nb_Perfect_match_Simulation'].min(), on=level_columns, rsuffix='_r')['Worst_nb_Perfect_match_Simulation']

dh['Best_BN_score_recall'] = dh.join(dg.groupby(level_columns)['Best_BN_score_recall'].max(), on=level_columns, rsuffix='_r')['Best_BN_score_recall']
dh['Worst_BN_score_recall'] = dh.join(dg.groupby(level_columns)['Worst_BN_score_recall'].min(), on=level_columns, rsuffix='_r')['Worst_BN_score_recall']

dh['Best_BN_score_precision'] = dh.join(dg.groupby(level_columns)['Best_BN_score_precision'].max(), on=level_columns, rsuffix='_r')['Best_BN_score_precision']
dh['Worst_BN_score_precision'] = dh.join(dg.groupby(level_columns)['Worst_BN_score_precision'].min(), on=level_columns, rsuffix='_r')['Worst_BN_score_precision']

dh['Nb_Unsat'] = dh.join(dg.groupby(level_columns)['Nb_Solution'].apply(lambda x : x.isna().sum()), on=level_columns, rsuffix='_r')['Nb_Solution']

dh = dh.sort_values(level_columns).reset_index().drop('index', axis=1)

# dh = dh.round(4)
dh

In [None]:
dh.to_csv('./benchmark/results/global_statistics.csv')