In [None]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, roc_auc_score

# Assume df is your pandas DataFrame and that the target variable is in column 'target'
# Features X and target y:
df1 = pd.read_csv("../datasets/Global_Dataset.csv", index_col=0)
df2 = pd.read_csv("../datasets/Single_Dataset.csv", index_col=0)
df3 = pd.read_csv("../datasets/GA1628_Dataset.csv", index_col=0)
df4 = pd.read_csv("../datasets/GA1824_Dataset.csv", index_col=0)

df1.set_index("PREGNANCY-ID-ANON", inplace=True)
df2.set_index("PREGNANCY-ID-ANON", inplace=True)
df3.set_index("PREGNANCY-ID-ANON", inplace=True)
df4.set_index("PREGNANCY-ID-ANON", inplace=True)


In [None]:
def nan_in_ds(df):
    # Missing Values in each Column
    print("Number of NaN in each Column")
    for col_name in df.columns:
        print(col_name + ":" + str(df[col_name].isna().sum()))
    for col in df.columns:
        print(f"{col}: {df[col].dtype}")

    return

def dataset_info(dataset):

    print(f"Features in Dataset: {list(dataset.columns)}" )

    print(f"Number if PTB < 37 is 1: {len(dataset.loc[dataset["PTB37"] == 1])}, is 0: {len(dataset.loc[dataset["PTB37"] == 0])}")
    print(
        f"Number if sPTB < 37 is 1: {len(dataset.loc[(dataset['PTB37'] == 1) & (dataset['SpontaneousPTB'] == 1)])}, " +
        f"is 0: {len(dataset.loc[(dataset['PTB37'] == 0) | (dataset['SpontaneousPTB'] == 0)])}"
    )
    print(
        f"Number if sPTB < 34 is 1: {len(dataset.loc[(dataset['PTB34'] == 1) & (dataset['SpontaneousPTB'] == 1)])}, " +
        f"is 0: {len(dataset.loc[(dataset['PTB34'] == 0) | (dataset['SpontaneousPTB'] == 0)])}"
    )
    print(
        f"Number if sPTB < 32 is 1: {len(dataset.loc[(dataset['PTB32'] == 1) & (dataset['SpontaneousPTB'] == 1)])}, " +
        f"is 0: {len(dataset.loc[(dataset['PTB32'] == 0) | (dataset['SpontaneousPTB'] == 0)])}"
    )

    nan_in_ds(dataset)

    return

def undersample_50_50(df, target_col='target', random_state=42):
    """
    Returns a new DataFrame with a 50/50 undersample of the given target column.
    
    Parameters:
        df (pd.DataFrame): The DataFrame to undersample.
        target_col (str): The column name containing the target variable.
        random_state (int): Random seed for reproducibility.
    
    Returns:
        pd.DataFrame: A balanced DataFrame with equal number of samples for each class.
    """
    # Count instances of each class in the target column.
    target_counts = df[target_col].value_counts()
    
    # Identify the minority and majority classes.
    minority_class = target_counts.idxmin()
    majority_class = target_counts.idxmax()
    minority_count = target_counts.min()
    
    # Split the DataFrame into minority and majority subsets.
    df_minority = df[df[target_col] == minority_class]
    df_majority = df[df[target_col] == majority_class]
    
    # Downsample the majority class to the size of the minority class.
    df_majority_downsampled = df_majority.sample(n=minority_count, random_state=random_state)
    
    # Combine the two subsets and shuffle the result.
    df_balanced = pd.concat([df_minority, df_majority_downsampled]).sample(frac=1, random_state=random_state)
    
    return df_balanced



In [None]:
dataset_info(df1)

In [None]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.model_selection import StratifiedKFold, GridSearchCV, RandomizedSearchCV
from sklearn.metrics import make_scorer, accuracy_score, recall_score, precision_score, f1_score, roc_auc_score
from xgboost import XGBClassifier

# Datasets and label functions should be defined as in your script
datasets = [df1, df2, df3, df4]
target_label_defs = {
    'PTB37': lambda df: df['PTB37'],
    'sPTB37': lambda df: ((df['PTB37'] == 1) & (df['SpontaneousPTB'] == 1)).astype(int),
    'sPTB34': lambda df: ((df['PTB34'] == 1) & (df['SpontaneousPTB'] == 1)).astype(int),
    'sPTB32': lambda df: ((df['PTB32'] == 1) & (df['SpontaneousPTB'] == 1)).astype(int)
}
label_cols = ['PTB37', 'PTB34', 'PTB32', 'SpontaneousPTB']

# Define models and parameter grids
models_param_grids = {
    'RandomForest': {
        'model': RandomForestClassifier(n_jobs=-1, random_state=42),
        'params': {
            'n_estimators': [200, 500],
            'max_depth': [5, 10, 20],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'max_features': ['sqrt', 'log2'],
            'bootstrap': [True, False]
        }
    },
    'LogisticRegression': {
        'model': LogisticRegression(solver='saga', max_iter=2000, random_state=42),
        'params': {
            'penalty': ['l1', 'l2'],
            'C': [0.01, 0.1, 1, 10, 100]
        }
    },
    'SVC': {
        'model': SVC(probability=True, random_state=42),
        'params': {
            'C': [0.1],
            'kernel': ['linear', 'rbf'],
            'gamma': ['auto']
        }
    },
    'XGB': {
        'model': XGBClassifier(use_label_encoder=False, eval_metric='logloss', n_jobs=-1, random_state=42, tree_method="hist"),
        'params': {
            'n_estimators': [100, 500],
            'max_depth': [3, 6, 10],
            'learning_rate': [0.01, 0.1, 0.3],
            'subsample': [0.7, 1],
            'colsample_bytree': [0.7, 1]
        }
    },
    'KNN': {
        'model': KNeighborsClassifier(),
        'params': {
            'n_neighbors': [3, 5, 7, 9],
            'weights': ['uniform', 'distance'],
            'p': [1, 2]  # 1=manhattan, 2=euclidean
        }
    },
}

# Custom scoring dictionary for GridSearchCV
scoring = {
    'accuracy': make_scorer(accuracy_score),
    'recall': make_scorer(recall_score),
    'precision': make_scorer(precision_score),
    'f1': make_scorer(f1_score),
    'roc_auc': make_scorer(roc_auc_score, needs_proba=True)
}

# Output dictionary to store best results
tuning_results = {}

for d_idx, df in enumerate(datasets):
    ds_name = f"Dataset_{d_idx+1}"
    tuning_results[ds_name] = {}
    feature_cols = [col for col in df.columns if col not in label_cols]
    for label_name, label_func in target_label_defs.items():
        print(f"\n[INFO] {ds_name} - Label: {label_name}")
        df_copy = df.copy()
        df_copy['target'] = label_func(df_copy)
        df_copy = undersample_50_50(df=df_copy, target_col='target')
        X = df_copy[feature_cols]
        y = df_copy['target']
        tuning_results[ds_name][label_name] = {}
        cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
        
        for model_name, entry in models_param_grids.items():
            print(f"  Tuning {model_name}...")
            model = entry['model']
            param_grid = entry['params']

            # Use RandomizedSearch for XGB (faster) and GridSearch for others
            if model_name == 'XGB':
                search = RandomizedSearchCV(
                    estimator=model,
                    param_distributions=param_grid,
                    n_iter=10,
                    scoring='roc_auc',
                    cv=cv,
                    refit=True,
                    verbose=2,
                    n_jobs=-1,
                    random_state=42
                )
            else:
                search = GridSearchCV(
                    estimator=model,
                    param_grid=param_grid,
                    scoring='roc_auc',
                    cv=cv,
                    refit=True,
                    verbose=2,
                    n_jobs=-1
                )
            search.fit(X, y)

            # Evaluate all scoring metrics using cross_val_predict if needed
            best_model = search.best_estimator_
            cv_results = search.cv_results_
            best_params = search.best_params_
            best_score = search.best_score_

            tuning_results[ds_name][label_name][model_name] = {
                'best_params': best_params,
                'best_roc_auc': best_score
            }
            print(f"    Best ROC AUC: {best_score:.3f} | Params: {best_params}")

# Save results to JSON or CSV
import json
with open('tuning_results.json', 'w') as f:
    json.dump(tuning_results, f, indent=2)

print("Results saved to tuning_results.json")


In [None]:
"""
TESTING SCRIPT FOR SVG, RF, XGBOOST, KNN AND LR MODELS
"""
import numpy as np
import pandas as pd
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from xgboost import XGBClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (accuracy_score, recall_score, precision_score,
                             f1_score, roc_auc_score, confusion_matrix)

# Load your hyperparameter tuning results
with open("tuning_results.json", "r") as f:
    best_hyperparams = json.load(f)

datasets = [df1, df2, df3, df4]
target_label_defs = {
    'PTB37': lambda df: df['PTB37'],
    'sPTB37': lambda df: ((df['PTB37'] == 1) & (df['SpontaneousPTB'] == 1)).astype(int),
    'sPTB34': lambda df: ((df['PTB34'] == 1) & (df['SpontaneousPTB'] == 1)).astype(int),
    'sPTB32': lambda df: ((df['PTB32'] == 1) & (df['SpontaneousPTB'] == 1)).astype(int)
}
label_cols = ['PTB37', 'PTB34', 'PTB32', 'SpontaneousPTB']

model_classes = {
    "RandomForest": RandomForestClassifier,
    "LogisticRegression": LogisticRegression,
    "SVC": SVC,
    "XGB": XGBClassifier,
    "KNN": KNeighborsClassifier,
}

# Store metrics/results for all models
results = {}
outer_preds_all = {}
cv_fold_metrics = {}

for d_idx, df in enumerate(datasets):
    ds_name = f"Dataset_{d_idx+1}"
    results[ds_name] = {}
    outer_preds_all[ds_name] = {}
    cv_fold_metrics[ds_name] = {}

    feature_cols = [col for col in df.columns if col not in label_cols]
    for label_name, label_func in target_label_defs.items():
        df_copy = df.copy()
        df_copy['target'] = label_func(df_copy)
        print(f"Original target counts, dataset #: {d_idx}, label: {label_name}:")
        print(df_copy['target'].value_counts())
        df_copy = undersample_50_50(df=df_copy, target_col='target')
        X = df_copy[feature_cols]
        y = df_copy['target']

        cv_fold_metrics[ds_name][label_name] = {}
        results[ds_name][label_name] = {}
        outer_preds_all[ds_name][label_name] = {}

        for model_name in ["RandomForest", "LogisticRegression", "SVC", "XGB", "KNN"]:
            print(f"  Running {model_name} for {ds_name}, {label_name}")

            # Get best params from json
            params = best_hyperparams[ds_name][label_name][model_name]['best_params']
            if model_name == "RandomForest":
                model = RandomForestClassifier(**params, n_jobs=-1, random_state=42)
            elif model_name == "LogisticRegression":
                model = LogisticRegression(**params, solver='saga', max_iter=2000, random_state=42)
            elif model_name == "SVC":
                model = SVC(**params, probability=True, random_state=42, verbose=1,)
            elif model_name == "XGB":
                model = XGBClassifier(**params, use_label_encoder=False, eval_metric='logloss', n_jobs=-1, random_state=42, tree_method='hist')
            elif model_name == "KNN":
                model = KNeighborsClassifier(**params)
            else:
                raise ValueError("Unknown model")

            # Outer CV
            outer_cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
            outer_predictions = []
            outer_true = []
            outer_subject_ids = []
            importances_sum = np.zeros(len(feature_cols), dtype=float)
            fold_count = 0
            fold_accuracies = []
            fold_recalls = []
            fold_precisions = []
            fold_f1s = []
            fold_roc_aucs = []
            fold_fprs = []
            fold_metrics_list = []

            for train_idx, test_idx in outer_cv.split(X, y):
                X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
                y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

                # Convert to numpy for XGBClassifier
                if model_name == "XGB":
                    X_train_ = X_train.to_numpy()
                    X_test_ = X_test.to_numpy()
                    y_train_ = y_train.to_numpy()
                else:
                    X_train_, X_test_, y_train_ = X_train, X_test, y_train

                model.fit(X_train_, y_train_)
                preds = model.predict_proba(X_test_)[:, 1]
                preds_class = (preds >= 0.5).astype(int)

                outer_predictions.extend(preds)
                outer_true.extend(y_test)
                outer_subject_ids.extend(X_test.index)

                # Feature importances
                if model_name in ['RandomForest', 'XGB']:
                    importances_sum += getattr(model, "feature_importances_", np.zeros_like(importances_sum))
                elif model_name == "LogisticRegression":
                    # Use absolute value of coefficients, averaged over classes if multiclass
                    if len(model.coef_.shape) > 1:
                        abs_coefs = np.mean(np.abs(model.coef_), axis=0)
                    else:
                        abs_coefs = np.abs(model.coef_.flatten())
                    importances_sum += abs_coefs

                fold_count += 1
                acc = accuracy_score(y_test, preds_class)
                rec = recall_score(y_test, preds_class)
                prec = precision_score(y_test, preds_class, zero_division=0)
                f1v = f1_score(y_test, preds_class, zero_division=0)
                ra = roc_auc_score(y_test, preds)
                tn, fp, fn, tp = confusion_matrix(y_test, preds_class).ravel()
                fpr_value = fp / (fp + tn) if (fp + tn) > 0 else 0

                fold_accuracies.append(acc)
                fold_recalls.append(rec)
                fold_precisions.append(prec)
                fold_f1s.append(f1v)
                fold_roc_aucs.append(ra)
                fold_fprs.append(fpr_value)
                fold_metrics_list.append({
                    "accuracy": acc,
                    "recall": rec,
                    "precision": prec,
                    "f1": f1v,
                    "roc_auc": ra,
                    "fpr": fpr_value
                })

            def mean_std(arr):
                return float(np.mean(arr)), float(np.std(arr, ddof=1))
            acc_mean, acc_std = mean_std(fold_accuracies)
            rec_mean, rec_std = mean_std(fold_recalls)
            prec_mean, prec_std = mean_std(fold_precisions)
            f1_mean, f1_std = mean_std(fold_f1s)
            roc_mean, roc_std = mean_std(fold_roc_aucs)
            fpr_mean, fpr_std = mean_std(fold_fprs)

            cv_fold_metrics[ds_name][label_name][model_name] = {
                "mean_std": {
                    "accuracy": {"mean": acc_mean, "std": acc_std},
                    "recall": {"mean": rec_mean, "std": rec_std},
                    "precision": {"mean": prec_mean, "std": prec_std},
                    "f1": {"mean": f1_mean, "std": f1_std},
                    "roc_auc": {"mean": roc_mean, "std": roc_std},
                    "fpr": {"mean": fpr_mean, "std": fpr_std}
                },
                "fold_values": fold_metrics_list
            }

            if fold_count > 0:
                avg_importances = importances_sum / fold_count
            else:
                avg_importances = np.zeros(len(feature_cols))

            # For SVC, set importances to zeros (no importances available)
            if model_name == "SVC":
                avg_importances = np.zeros(len(feature_cols))

            outer_predictions = np.array(outer_predictions)
            outer_true = np.array(outer_true)
            outer_subject_ids = [str(sid) for sid in outer_subject_ids]
            outer_preds_all[ds_name][label_name][model_name] = {
                "predictions": outer_predictions.tolist(),
                "true": outer_true.tolist(),
                "subject_ids": outer_subject_ids
            }

            predicted_classes = (outer_predictions >= 0.5).astype(int)
            n_iterations = 1000
            n_size = len(outer_true)
            metrics_bootstrap = {
                'accuracy': [],
                'recall': [],
                'precision': [],
                'f1': [],
                'roc_auc': [],
                'fpr': []
            }

            for _ in range(n_iterations):
                sample_indices = np.random.choice(np.arange(n_size), size=n_size, replace=True)
                sample_true = outer_true[sample_indices]
                sample_preds = outer_predictions[sample_indices]
                sample_classes = predicted_classes[sample_indices]

                acc = accuracy_score(sample_true, sample_classes)
                rec = recall_score(sample_true, sample_classes)
                prec = precision_score(sample_true, sample_classes, zero_division=0)
                f1_val = f1_score(sample_true, sample_classes, zero_division=0)
                ra = roc_auc_score(sample_true, sample_preds)
                tn, fp, fn, tp = confusion_matrix(sample_true, sample_classes).ravel()
                fpr_value = fp / (fp + tn) if (fp + tn) > 0 else 0

                metrics_bootstrap['accuracy'].append(acc)
                metrics_bootstrap['recall'].append(rec)
                metrics_bootstrap['precision'].append(prec)
                metrics_bootstrap['f1'].append(f1_val)
                metrics_bootstrap['roc_auc'].append(ra)
                metrics_bootstrap['fpr'].append(fpr_value)

            metric_summary = {}
            for metric in metrics_bootstrap:
                vals = np.array(metrics_bootstrap[metric])
                mean_val = np.mean(vals)
                lower = np.percentile(vals, 2.5)
                upper = np.percentile(vals, 97.5)
                metric_summary[metric] = {'mean': mean_val, '95CI': (lower, upper)}

            feat_import_dict = {
                feature_cols[i]: float(avg_importances[i])
                for i in range(len(feature_cols))
            }
            metric_summary["feature_importances"] = feat_import_dict
            results[ds_name][label_name][model_name] = metric_summary

# Saving results
def convert_results_for_json(obj):
    if isinstance(obj, dict):
        return {k: convert_results_for_json(v) for k, v in obj.items()}
    elif isinstance(obj, (tuple, list)):
        return [convert_results_for_json(x) for x in obj]
    else:
        return obj

with open("results.json", "w") as f:
    json.dump(convert_results_for_json(results), f, indent=4)
print("Bootstrapped results saved to results.json")

with open("outer_predictions.json", "w") as f:
    json.dump(convert_results_for_json(outer_preds_all), f, indent=4)
print("Outer predictions saved to outer_predictions.json")

with open("cv_fold_metrics.json", "w") as f:
    json.dump(convert_results_for_json(cv_fold_metrics), f, indent=4)
print("Fold-level results (mean & std + raw fold data) saved to cv_fold_metrics.json")



In [None]:
import json
import numpy as np
import pandas as pd

with open('cv_fold_metrics.json', 'r') as f:
    results = json.load(f)

models = ["RandomForest", "SVC", "LogisticRegression", "XGB", "KNN"]
labels = ["PTB37", "sPTB37", "sPTB34", "sPTB32"]
datasets = ["Dataset_1", "Dataset_2", "Dataset_3", "Dataset_4"]
metrics = ["accuracy", "recall", "precision", "f1", "roc_auc", "fpr"]
metric_names = {
    "accuracy": "Accuracy",
    "recall": "Recall",
    "precision": "Precision",
    "f1": "F1 Score",
    "roc_auc": "ROC AUC",
    "fpr": "False Positive Rate"
}
dataset_map = {
    "Dataset_1": "General",
    "Dataset_2": "Single",
    "Dataset_3": "GA-16-28",
    "Dataset_4": "GA-18-24"
}

def mean_std_str(mean, std):
    return f"{mean:.3f} ± {std:.3f}"

for metric in metrics:
    file_name = f"results_table_{metric}.xlsx"
    with pd.ExcelWriter(file_name) as writer:
        for model in models:
            table = pd.DataFrame(index=list(dataset_map.values()) + ["Average Label"],
                                 columns=labels + ["Average DS"])
            cell_vals = []
            # Fill in mean ± std for each cell
            for ds_key, ds_name in dataset_map.items():
                row_means = []
                for label in labels:
                    try:
                        mean = results[ds_key][label][model]['mean_std'][metric]["mean"]
                        std = results[ds_key][label][model]['mean_std'][metric]["std"]
                        table.loc[ds_name, label] = mean_std_str(mean, std)
                        row_means.append(mean)
                        cell_vals.append(mean)
                    except Exception:
                        table.loc[ds_name, label] = ""
                # Average across this dataset's labels
                if row_means:
                    mean = np.mean(row_means)
                    std = np.std(row_means, ddof=1) if len(row_means) > 1 else 0
                    table.loc[ds_name, "Average DS"] = mean_std_str(mean, std)
            # Average per label across all datasets
            for label in labels:
                col_means = []
                for ds_key in dataset_map.keys():
                    try:
                        mean = results[ds_key][label][model]['mean_std'][metric]["mean"]
                        col_means.append(mean)
                    except Exception:
                        continue
                if col_means:
                    mean = np.mean(col_means)
                    std = np.std(col_means, ddof=1) if len(col_means) > 1 else 0
                    table.loc["Average Label", label] = mean_std_str(mean, std)
            # Overall grand mean ± std
            if cell_vals:
                mean = np.mean(cell_vals)
                std = np.std(cell_vals, ddof=1)
                table.loc["Average Label", "Average DS"] = mean_std_str(mean, std)
            # Write to sheet for this model
            table.to_excel(writer, sheet_name=model)
    print(f"Excel file '{file_name}' saved.")



In [None]:
import json
import pandas as pd

# Load results.json
with open("results.json", "r") as f:
    results = json.load(f)

models = ["RandomForest", "LogisticRegression"]
labels = ["PTB37", "sPTB37", "sPTB34", "sPTB32"]
datasets = ["Dataset_1", "Dataset_2", "Dataset_3", "Dataset_4"]
dataset_map = {
    "Dataset_1": "General",
    "Dataset_2": "Single",
    "Dataset_3": "GA-16-28",
    "Dataset_4": "GA-18-24"
}
metrics = ["accuracy", "recall", "precision", "f1", "roc_auc", "fpr"]
metric_names = {
    "accuracy": "Accuracy",
    "recall": "Recall",
    "precision": "Precision",
    "f1": "F1 Score",
    "roc_auc": "ROC AUC",
    "fpr": "False Positive Rate"
}

def mean_ci_str(metric_data):
    try:
        mean = metric_data['mean']
        ci_low, ci_high = metric_data['95CI']
        return f"{mean:.3f} ({ci_low:.3f}, {ci_high:.3f})"
    except Exception:
        return ""

for model in models:
    file_name = f"{model}_results.xlsx"
    with pd.ExcelWriter(file_name) as writer:
        for label in labels:
            table = pd.DataFrame(index=[metric_names[m] for m in metrics],
                                 columns=[dataset_map[ds] for ds in datasets])
            for ds in datasets:
                for m in metrics:
                    try:
                        metric_data = results[ds][label][model][m]
                        table.loc[metric_names[m], dataset_map[ds]] = mean_ci_str(metric_data)
                    except Exception:
                        table.loc[metric_names[m], dataset_map[ds]] = ""
            table.to_excel(writer, sheet_name=label)
    print(f"Excel file '{file_name}' saved.")


In [None]:
import json
import numpy as np
from itertools import combinations
from scipy.stats import f_oneway, ttest_rel
from statsmodels.stats.multitest import multipletests

# Models to analyze
MODEL_LIST = ["RandomForest", "LogisticRegression"]

metrics = ["accuracy", "recall", "precision", "f1", "roc_auc", "fpr"]

def load_cv_fold_metrics(path):
    with open(path, "r") as f:
        data = json.load(f)
    return data

def run_anova_and_pairwise(cv_data, model_name):
    """
    For a single model (RF or LR), perform ANOVA and paired t-tests on metrics across datasets.
    Returns a nested dictionary: results[label][metric] = {...}
    """
    results = {}
    dataset_names = sorted(cv_data.keys())
    # Find all labels present in at least one dataset for this model
    all_labels = set()
    for ds in dataset_names:
        all_labels.update(cv_data[ds].keys())
    all_labels = sorted(all_labels)
    for label in all_labels:
        results[label] = {}
        for metric in metrics:
            group_data = []
            group_names = []
            for ds in dataset_names:
                # Check if label & model exist for this dataset
                if label not in cv_data[ds] or model_name not in cv_data[ds][label]:
                    continue
                fold_list = cv_data[ds][label][model_name]["fold_values"]
                fold_vals = [fold_dict[metric] for fold_dict in fold_list if metric in fold_dict]
                if len(fold_vals) < 2:  # skip if not enough data
                    continue
                group_data.append(fold_vals)
                group_names.append(ds)
            if len(group_data) < 2:
                continue
            # ANOVA
            f_stat, p_anova = f_oneway(*group_data)
            results[label][metric] = {
                "anova": {"f-stat": float(f_stat), "p-value": float(p_anova)},
                "posthoc": {}
            }
            # Posthoc paired t-tests (Bonferroni correction)
            if p_anova < 0.05:
                idx_pairs = list(combinations(range(len(group_data)), 2))
                pvals = []
                pair_keys = []
                for (i, j) in idx_pairs:
                    data_i = group_data[i]
                    data_j = group_data[j]
                    t_stat, p_val = ttest_rel(data_i, data_j)
                    pvals.append(p_val)
                    pair_keys.append((group_names[i], group_names[j]))
                # Bonferroni correction
                reject, pvals_corrected, _, _ = multipletests(pvals, alpha=0.05, method='bonferroni')
                for idx, (g1, g2) in enumerate(pair_keys):
                    results[label][metric]["posthoc"][f"{g1} vs {g2}"] = {
                        "p-uncorrected": float(pvals[idx]),
                        "p-corrected": float(pvals_corrected[idx]),
                        "significant": bool(reject[idx])
                    }
    return results

def main():
    cv_path = "cv_fold_metrics.json"
    cv_data = load_cv_fold_metrics(cv_path)
    for model_name in MODEL_LIST:
        print(f"\n{model_name}")
        stats_results = run_anova_and_pairwise(cv_data, model_name)
        # Print results
        for label, metrics_dict in stats_results.items():
            print(f"\nLabel: {label}")
            for metric, anova_dict in metrics_dict.items():
                f_val = anova_dict["anova"]["f-stat"]
                p_val = anova_dict["anova"]["p-value"]
                print(f"  Metric: {metric}")
                print(f"    ANOVA: F={f_val:.3f}, p={p_val:.3e}")
                if p_val < 0.05:
                    for comp, vals in anova_dict["posthoc"].items():
                        p_unc = vals["p-uncorrected"]
                        p_cor = vals["p-corrected"]
                        sig = vals["significant"]
                        print(f"    {comp}: p-unc={p_unc:.3e}, p-cor={p_cor:.3e}, sig={sig}")
        # Save each model's results to its own JSON file
        with open(f"anova_ttest_{model_name}.json", "w") as f:
            json.dump(stats_results, f, indent=4)
        print(f"\nResults saved to anova_ttest_{model_name}.json")

if __name__ == "__main__":
    main()


In [None]:
import json
import matplotlib.pyplot as plt
import numpy as np

# Load results
with open('results.json', 'r') as f:
    results = json.load(f)

anova_dict = {
    "RandomForest": "anova_ttest_RandomForest.json",
    "LogisticRegression": "anova_ttest_LogisticRegression.json",
}
anova_results = {model: json.load(open(fname)) for model, fname in anova_dict.items()}

metrics = ['accuracy', 'recall', 'precision', 'f1', 'roc_auc']
labels = ['PTB37', 'sPTB37', 'sPTB34', 'sPTB32']
datasets = ['Dataset_1', 'Dataset_2', 'Dataset_3', 'Dataset_4']
dataset_names = ["General", "Single", "GA-16-28", "GA-18-24"]
colors = ['tab:blue', 'tab:orange', 'tab:green', 'tab:red']

for model in ["RandomForest", "LogisticRegression"]:
    for lab in labels:
        fig, ax = plt.subplots(figsize=(10, 6))
        n_metrics = len(metrics)
        n_datasets = len(datasets)
        x = np.arange(n_metrics)
        width = 0.18

        x_positions_dict = {m: {} for m in metrics}
        upper_vals_dict = {m: {} for m in metrics}

        for i, ds in enumerate(datasets):
            means = []
            err_low = []
            err_high = []
            for m in metrics:
                data = results[ds][lab][model][m]
                mean_val = data['mean']
                ci_lower, ci_upper = data['95CI']
                means.append(mean_val)
                err_low.append(mean_val - ci_lower)
                err_high.append(ci_upper - mean_val)

                m_idx = metrics.index(m)
                x_pos = m_idx - (width * (n_datasets - 1) / 2) + i * width

                x_positions_dict[m][ds] = x_pos
                upper_vals_dict[m][ds] = ci_upper

            x_positions = [x_positions_dict[m][ds] for m in metrics]

            ax.errorbar(
                x_positions, means, yerr=[err_low, err_high],
                fmt='s', capsize=5, color=colors[i],
                label=dataset_names[i], markersize=8, linestyle='None'
            )

        # Cosmetic settings
        xlabels = ['Accuracy', 'Recall', 'Precision', 'F1', 'AUC']
        ax.set_xticks(x)
        ax.set_xticklabels(xlabels)
        ax.set_ylabel('Metric Value')
        ax.set_title(f'{model} Performance Metrics for {lab}')
        ax.legend(labels=dataset_names, loc='lower right')
        ax.set_ylim(0, 1)
        ax.grid(True, linestyle='--', alpha=0.6)

        # Draw significance bars if any
        if lab == "sPTB32":
            anova_file = anova_results[model]
            sig_bar_top = {}
            for m in metrics:
                max_ci = max(upper_vals_dict[m].values())
                sig_bar_top[m] = max_ci + 0.02
            bar_inset = 0.00
            stack_increase = 0.04
            vertical_length = 0.02

            for m_idx, m in enumerate(metrics):
                posthoc = anova_file.get(lab, {}).get(m, {}).get("posthoc", {})
                for comp, comp_data in posthoc.items():
                    if comp_data.get("significant", False):
                        ds1, ds2 = comp.split(" vs ")
                        x1 = x_positions_dict[m][ds1]
                        x2 = x_positions_dict[m][ds2]
                        if x2 < x1:
                            x1, x2 = x2, x1
                        y_sig = sig_bar_top[m]
                        x1_inset = x1 + bar_inset
                        x2_inset = x2 - bar_inset
                        y_low = y_sig - vertical_length

                        ax.plot([x1_inset, x2_inset], [y_sig, y_sig], color='black', linewidth=1.0)
                        ax.plot([x1_inset, x1_inset], [y_low, y_sig], color='black', linewidth=1.0)
                        ax.plot([x2_inset, x2_inset], [y_low, y_sig], color='black', linewidth=1.0)
                        x_text = (x1_inset + x2_inset) / 2
                        ax.text(x_text, y_sig + 0.005, '*', ha='center', va='bottom', fontsize=12)
                        sig_bar_top[m] += stack_increase

        plt.savefig(f"box_plot_{lab}_{model}.png", dpi=300, bbox_inches='tight')
        plt.show()
