In [None]:
import json
import random
import numpy as np
import pandas as pd
from collections import defaultdict
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, classification_report
from scipy.stats import ttest_rel
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC


# ==== path ====
CLUSTERED_PATH = "malware_clusters_results_js.json"
UNCLUSTERED_PATH = "mal_embeddings_js_480.json"
LEGIT_PATH = "legiti_embeddings_js.json"

In [None]:
def load_embeddings_clustered(path):
    with open(path, 'r') as f:
        data = json.load(f)
    result = {}
    if isinstance(data, list):
        for item in data:
            result.update({cid: [s["embedding"] for s in details["samples"]] for cid, details in item.items()})
    else:
        result = {cid: [s["embedding"] for s in details["samples"]] for cid, details in data.items()}
    return result

def load_embeddings_unclustered(path):
    with open(path, 'r') as f:
        return json.load(f)["embeddings"]

def load_embeddings_legit(path):
    with open(path, 'r') as f:
        return json.load(f)["embeddings"]

In [None]:
def run_multimodel_cluster_vs_random(clusters, legit_all, rounds=10):
    # drop singleton
    clusters = {cid: embs for cid, embs in clusters.items() if len(embs) > 1}
    cluster_ids = list(clusters.keys())

    # machine pools
    models = {
        "RandomForest": RandomForestClassifier(n_estimators=100, random_state=42),
        "LogisticRegression": LogisticRegression(max_iter=1000),
        "KNN": KNeighborsClassifier(n_neighbors=5),
        "MLP": MLPClassifier(hidden_layer_sizes=(100,), max_iter=300, random_state=42),
        "SVM": SVC(kernel="rbf", probability=True, random_state=42)
    }

    # results
    results_cluster = {model: [] for model in models}
    results_random = {model: [] for model in models}

    for round_idx in range(rounds):
        random.seed(round_idx)
        cluster_train, cluster_test = [], []
        train_pool_mal = []

        # Build training pool & test set (take 1 test per cluster, and the rest are used for cluster training or random pool)
        for cid in cluster_ids:
            embs = clusters[cid]
            test_sample = random.sample(embs, 1)
            remaining = [e for e in embs if e not in test_sample]
            if not remaining:
                continue
            elif len(remaining) >= 2:
                train_samples = random.sample(remaining, 2)
            else:
                train_samples = random.choices(remaining, k=2)
            cluster_test.extend(test_sample)
            cluster_train.extend(train_samples)
            train_pool_mal.extend(remaining)

        total_cluster_train = len(cluster_train)
        legit_train = random.choices(legit_all, k=total_cluster_train)
        legit_test = random.sample(legit_all, k=len(cluster_test))

        Xc_train = np.array(cluster_train + legit_train)
        yc_train = np.array([1]*len(cluster_train) + [0]*len(legit_train))
        Xc_test = np.array(cluster_test + legit_test)
        yc_test = np.array([1]*len(cluster_test) + [0]*len(legit_test))

        Xr_train_mal = random.sample(train_pool_mal, total_cluster_train)
        Xr_train = np.array(Xr_train_mal + legit_train)
        yr_train = np.array([1]*len(Xr_train_mal) + [0]*len(legit_train))

        #  Train and evaluate all models one by one
        for name, model in models.items():
            # Cluster-Based
            clf_c = model.__class__(**model.get_params())
            clf_c.fit(Xc_train, yc_train)
            pred_c = clf_c.predict(Xc_test)
            prob_c = clf_c.predict_proba(Xc_test)[:, 1]
            results_cluster[name].append({
                "accuracy": accuracy_score(yc_test, pred_c),
                "precision": precision_score(yc_test, pred_c),
                "recall": recall_score(yc_test, pred_c),
                "f1": f1_score(yc_test, pred_c),
                "auc": roc_auc_score(yc_test, prob_c)
            })

            # Random-Based
            clf_r = model.__class__(**model.get_params())
            clf_r.fit(Xr_train, yr_train)
            pred_r = clf_r.predict(Xc_test)
            prob_r = clf_r.predict_proba(Xc_test)[:, 1]
            results_random[name].append({
                "accuracy": accuracy_score(yc_test, pred_r),
                "precision": precision_score(yc_test, pred_r),
                "recall": recall_score(yc_test, pred_r),
                "f1": f1_score(yc_test, pred_r),
                "auc": roc_auc_score(yc_test, prob_r)
            })

    return results_cluster, results_random

In [None]:
clusters = load_embeddings_clustered(CLUSTERED_PATH)
legit_all = load_embeddings_legit(LEGIT_PATH)
results_cluster, results_random = run_multimodel_cluster_vs_random(clusters, legit_all, rounds=50)

# print output
for model in results_cluster:
    df_c = pd.DataFrame(results_cluster[model])
    df_r = pd.DataFrame(results_random[model])
    print(f"\n🔍 Model: {model}")
    print("📊 Cluster AVE.: \n", df_c.mean())
    print("📊 Random  AVE.: \n", df_r.mean())
    print("🔬 Significance Test:")
    for metric in df_c.columns:
        t, p = ttest_rel(df_c[metric], df_r[metric])
        print(f"  {metric:<10} p={p:.5f} {'✅ Significant' if p < 0.05 else '⚠️ not Significant'}")