Inputs should be the gene network obtained from GRN inference techniques. It should have the columns "Gene1","Gene2", "EdgeWeight" and "Direction". The sample files are provided in the data folder.

In [None]:
import csv

def smart_read_csv(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        # Read a small portion to detect delimiter
        sample = f.read(2048)
        sniffer = csv.Sniffer()
        try:
            dialect = sniffer.sniff(sample, delimiters=[',', '\t'])
        except csv.Error:
            # Fallback if delimiter can't be detected
            dialect = csv.get_dialect('excel')  # defaults to comma
        f.seek(0)
        return pd.read_csv(f, sep=dialect.delimiter)

In [None]:
import pandas as pd
import os
import glob
import itertools
import numpy as np
from scipy.stats import norm
from sklearn.metrics import precision_recall_curve, roc_curve, auc, precision_score, recall_score, f1_score, matthews_corrcoef, log_loss, cohen_kappa_score, top_k_accuracy_score

method_alphas = {'GRNBOOST2': 0.55, 'SINCERITIES': 0.42, 'GENIE3': 0.56, 'PPCOR': 0.38}

def contains_evaluation_results_bayesian(subfolder):
    for root, dirs, files in os.walk(subfolder):
        if "evaluation_results_EBEA.csv" in files:
            return True
def compute_method_weights(method_alphas):
    min_aupr, max_aupr = min(method_alphas.values()), max(method_alphas.values())
    method_weights = {method: (aupr - min_aupr) / (max_aupr - min_aupr) + 0.1 for method, aupr in method_alphas.items()}
    return method_weights

def hierarchical_bayesian_update(edge_weights, method_names, method_weights):
    """Perform Hierarchical Bayesian Update based on latent edge trustworthiness."""
    global_mu = np.mean(edge_weights)
    global_sigma = np.std(edge_weights) + 1e-6  # Avoid zero std

    weighted_sum = sum(method_weights[method] * edge for edge, method in zip(edge_weights, method_names))
    weight_sum = sum(method_weights[method] for method in method_names)

    edge_trustworthiness = (weighted_sum / weight_sum) if weight_sum > 0 else global_mu
    credibility_score = norm.cdf(edge_trustworthiness, loc=global_mu, scale=global_sigma)

    return edge_trustworthiness, credibility_score

def merge_edges_with_hierarchical_bayesian(methods, method_names, method_alphas, weight_threshold=0):
    method_weights = compute_method_weights(method_alphas)
    merged_df = pd.concat(methods, ignore_index=True)
    merged_df["Method"] = list(itertools.chain(*[[name] * len(df) for name, df in zip(method_names, methods)]))

    grouped = merged_df.groupby("Direction")
    edge_summary = []

    for name, group in grouped:
        edge_weights = group["EdgeWeight"].values
        method_names_list = group["Method"].values
        trustworthiness, credibility = hierarchical_bayesian_update(edge_weights, method_names_list, method_weights)
        edge_summary.append([name, trustworthiness, credibility])

    edge_df = pd.DataFrame(edge_summary, columns=["Direction", "Trustworthiness", "Credibility"])
    edge_df = edge_df.merge(merged_df[["Direction", "Gene1", "Gene2"]].drop_duplicates(), on="Direction", how="left")
    credible_edges = edge_df[edge_df["Credibility"] >= weight_threshold]

    return credible_edges

def evaluate_edges(predicted_edges, ref_network):
    true_edges = set(ref_network["Direction"])
    predicted_edges_list = list(zip(predicted_edges["Direction"], predicted_edges["Credibility"]))

    y_true = [1 if direction in true_edges else 0 for direction, _ in predicted_edges_list]
    y_scores = [float(prob) for _, prob in predicted_edges_list]

    precision, recall, _ = precision_recall_curve(y_true, y_scores)
    aupr = auc(recall, precision)
    fpr, tpr, _ = roc_curve(y_true, y_scores)
    auroc = auc(fpr, tpr)

    threshold = 0.5  # Default threshold for binary classification
    y_pred = [1 if prob >= threshold else 0 for prob in y_scores]

    precision_metric = precision_score(y_true, y_pred, zero_division=0)
    recall_metric = recall_score(y_true, y_pred, zero_division=0)
    f1_metric = f1_score(y_true, y_pred, zero_division=0)
    mcc_metric = matthews_corrcoef(y_true, y_pred)
    log_loss_metric = log_loss(y_true, y_scores, labels=[0, 1]) if len(set(y_true)) > 1 else None
    kappa_metric = cohen_kappa_score(y_true, y_pred)
    top_k_acc = top_k_accuracy_score(y_true, np.array(y_scores).reshape(-1, 1), k=1)  # Top-1 Accuracy

    return {
        "AUPR": aupr,
        "AUROC": auroc,
        "Precision": precision_metric,
        "Recall": recall_metric,
        "F1-Score": f1_metric,
        "MCC": mcc_metric,
        "Log Loss": log_loss_metric,
        "Kappa": kappa_metric,
        "Top-K Acc": top_k_acc
    }
def load_files(basefolder):
    """Load all CSV files from the basefolder and normalize edge weights."""
    all_files = glob.glob(os.path.join(basefolder, "*.csv"))
    method_data = {}

    for file in all_files:
        method_name = os.path.basename(file).split('.')[0]
        df = smart_read_csv(file)
        print("df columns")
        print(df.columns)
        # Check if 'Direction' column (case-sensitive) is missing
        if "Direction" not in df.columns:
            # Determine which case of Gene1 and Gene2 is present
            gene1_col = "Gene1" if "Gene1" in df.columns else ("gene1" if "gene1" in df.columns else None)
            gene2_col = "Gene2" if "Gene2" in df.columns else ("gene2" if "gene2" in df.columns else None)

            # Ensure both columns are found
            if gene1_col and gene2_col:
                # Create the 'Direction' column
                df["Direction"] = df[gene1_col].astype(str) + " -> " + df[gene2_col].astype(str)
            else:
                raise ValueError("Required columns 'Gene1'/'gene1' and/or 'Gene2'/'gene2' are missing.")

        if "EdgeWeight" in df.columns:
            min_val = df["EdgeWeight"].min()
            max_val = df["EdgeWeight"].max()
            # Avoid division by zero
            if max_val > min_val:
                df["EdgeWeight"] = (df["EdgeWeight"] - min_val) / (max_val - min_val)
            else:
                df["EdgeWeight"] = 0.5  # Arbitrary constant if all weights are equal

        method_data[method_name] = df

    return method_data
def main(basefolder, ref_network_file, weight_threshold=0):
    method_data = load_files(basefolder)
    ref_network = smart_read_csv(os.path.join(basefolder, ref_network_file))
    if "Direction" not in ref_network.columns:
        # Determine which case of Gene1 and Gene2 is present
        gene1_col = "Gene1" #if "Gene1" in df.columns else ("gene1" if "gene1" in df.columns else None)
        gene2_col = "Gene2" #if "Gene2" in df.columns else ("gene2" if "gene2" in df.columns else None)

        # Ensure both columns are found
        if gene1_col and gene2_col:
            # Create the 'Direction' column
            ref_network["Direction"] = ref_network[gene1_col].astype(str) + " -> " + ref_network[gene2_col].astype(str)
        else:
            raise ValueError("Required columns 'Gene1'/'gene1' and/or 'Gene2'/'gene2' are missing.")

    # Check if "Direction" column is missing
    if "Direction" not in ref_network.columns:
        # Create the "Direction" column by combining "Gene1" and "Gene2"
        ref_network["Direction"] = ref_network["Gene1"].astype(str) + " -> " + ref_network["Gene2"].astype(str)
    excluded_files = {"Final_XGBoost_MPCM", "refNetwork", "coffee_rankedEdges", "Final_XGBoost_PCM"}
    method_names = [name for name in method_data.keys() if name not in excluded_files]

    results = []
    MAX_COMBO_SIZE = 5

    for r in range(2, min(len(method_names), MAX_COMBO_SIZE) + 1):
        for combo in itertools.combinations(method_names, r):
            selected_methods = [method_data[method] for method in combo]
            merged_edges = merge_edges_with_hierarchical_bayesian(selected_methods, combo, method_auprs, weight_threshold=weight_threshold)
            metrics = evaluate_edges(merged_edges, ref_network)
            results.append({"Combination": combo, **metrics})

    results_df = pd.DataFrame(results)
    output_dir = os.path.join(basefolder, "myresult")
    os.makedirs(output_dir, exist_ok=True)
    results_df.to_csv(os.path.join(output_dir, "evaluation_results_EBEA.csv"), index=False)
    print("Evaluation results saved.")

def process_all_subfolders(main_function, base_folder, csv_file, weight_threshold):
    subfolders = [
        os.path.join(base_folder, name)
        for name in os.listdir(base_folder)
        if os.path.isdir(os.path.join(base_folder, name))
        and not contains_evaluation_results_bayesian(os.path.join(base_folder, name))
    ]
    for subfolder in subfolders:
        print(f"Processing folder: {subfolder}")
        main_function(subfolder, csv_file, weight_threshold)

names = ["GSD"]
for name in names:
    process_all_subfolders(main_function=main, base_folder="your folder path" + name, csv_file="reference network path", weight_threshold=0)
