## CatBoost

**Cell 1: Imports, Paths, Labels, Part files**

In [None]:
from pathlib import Path
import json
import numpy as np
import scipy.sparse as sp
from glob import glob
import re

# Paths (ίδια λογική με τα άλλα notebooks)
BASE = Path.home() / "Desktop" / "Malware Project"
VEC  = BASE / "data" / "behavior_vectors_paper"

# Models & results folders για CatBoost
MODELS_DIR  = VEC / "models_catboost_capped"
RESULTS_DIR = VEC / "results_catboost_capped_eval"
MODELS_DIR.mkdir(exist_ok=True, parents=True)
RESULTS_DIR.mkdir(exist_ok=True, parents=True)

# Φόρτωση label names (προτιμούμε label_map_balanced.json αν υπάρχει)
label_map_paths = [
    VEC / "label_map_balanced.json",
    VEC / "label_map.json",
]

label_names = None
for p in label_map_paths:
    if p.exists():
        with open(p, "r") as f:
            data = json.load(f)
        if isinstance(data, dict) and "labels" in data:
            label_names = data["labels"]
            break

if label_names is None:
    raise FileNotFoundError("Δεν βρέθηκε ούτε label_map_balanced.json ούτε label_map.json στο VEC.")

n_labels = len(label_names)
print(f"Loaded labels: {n_labels}")
print(", ".join(label_names[:10]) + (" ..." if n_labels > 10 else ""))

# Train/Test parts: X CAPPED + Y BALANCED (όπως σε RF & XGB)
XTRN = sorted(map(Path, glob(str(VEC / "train_part_capped*.npz"))))
YTRN_CAP = sorted(map(Path, glob(str(VEC / "y_train_part_capped*.npy"))))

# Load TEST parts: match capped X με balanced Y using part indexes
def _index_from_name(p: Path) -> int:
    m = re.search(r'(\d+)$', p.stem)
    if not m:
        raise ValueError(f"No trailing index found in filename: {p.name}")
    return int(m.group(1))

XTE = sorted(map(Path, glob(str(VEC / "test_part_capped*.npz"))))
idx_te = [_index_from_name(p) for p in XTE]

all_YTE = list(map(Path, glob(str(VEC / "y_test_part_capped*.npy"))))
map_Y = {_index_from_name(p): p for p in all_YTE}

YTE = [map_Y[i] for i in idx_te if i in map_Y]

print("Train parts (X):", len(XTRN), "| Train parts (Y_cap):", len(YTRN_CAP))
print("Test  parts (X):", len(XTE),  "| Test  parts (Y):", len(YTE))

# Έλεγχος row counts
def _rows_X(path): return sp.load_npz(path).shape[0]
def _rows_Y(path): return np.load(path, allow_pickle=False).shape[0]

assert len(XTRN) == len(YTRN_CAP), "Αναντιστοιχία train X με y_train_part_capped*.npy"

print(f"[train part 0] rows X={_rows_X(XTRN[0])}, Y={_rows_Y(YTRN_CAP[0])}")
print(f"[test  part 0] rows X={_rows_X(XTE[0])},  Y={_rows_Y(YTE[0])}")


Loaded labels: 64
adware, antiav, antifw, autorun, backdoor, banker, bho, binder, blocker, bundler ...
Train parts (X): 551 | Train parts (Y_cap): 551
Test  parts (X): 45 | Test  parts (Y): 45
[train part 0] rows X=2851, Y=2851
[test  part 0] rows X=12,  Y=12


**Cell 2: Assemble X_train, Y_train (sparse), test parts**

In [None]:
import numpy as np
import scipy.sparse as sp

# Train X (sparse)
X_train = sp.vstack([sp.load_npz(p) for p in XTRN]).tocsr()

# Train Y (multi-label, shape = [N, n_labels])
Y_train = np.concatenate([np.load(p, allow_pickle=False) for p in YTRN_CAP])
print("X_train:", X_train.shape, "| Y_train:", Y_train.shape)

# Test parts (κρατάμε σε λίστες όπως στα άλλα notebooks)
X_test_parts = [sp.load_npz(p).tocsr() for p in XTE]
Y_test_parts = [np.load(p, allow_pickle=False) for p in YTE]

print("Loaded", len(X_test_parts), "test X parts &", len(Y_test_parts), "test Y parts")

# sanity checks
assert Y_train.shape[1] == len(label_names)
for Xt, Yt in zip(X_test_parts, Y_test_parts):
    assert Xt.shape[0] == Yt.shape[0]
    assert Yt.shape[1] == len(label_names)

print(" Multi-label data loaded correctly.")


X_train: (1233020, 2381) | Y_train: (1233020, 64)
Loaded 45 test X parts & 45 test Y parts
 Multi-label data loaded correctly.


**Cell 3: Parameters & Training**

In [None]:
from catboost import CatBoostClassifier
from collections import Counter
import time
import pandas as pd
import numpy as np
import os

def cat_model_path(li: int):
    return MODELS_DIR / f"cat_label_{li:02d}.cbm"

base_params = dict(
    loss_function="Logloss",
    eval_metric="AUC",
    iterations=300,
    learning_rate=0.1,
    depth=6,
    l2_leaf_reg=3.0,
    random_seed=42,
    bootstrap_type="Bernoulli",
    subsample=0.8,
    thread_count=-1,
    verbose=False,
    task_type="CPU",
)

n_labels = Y_train.shape[1]
os.makedirs(MODELS_DIR, exist_ok=True)

print(f"Training {n_labels} label-wise CatBoost models with balanced undersampling...")

train_stats = []
total_start = time.time()
trained = 0
skipped = 0

for li in range(n_labels):
    label_name = label_names[li] if li < len(label_names) else f"label_{li}"
    y_col = Y_train[:, li].astype(int)
    cls = np.unique(y_col)

    # αν δεν υπάρχουν και οι 2 κλάσεις -> skip
    if len(cls) < 2:
        print(f"[label {li:02d}] '{label_name}': only class {cls[0]} -> skip")
        skipped += 1
        continue

    # balanced undersampling (1 : 2)
    pos_idx = np.where(y_col == 1)[0]
    neg_idx = np.where(y_col == 0)[0]
    n_pos = len(pos_idx)

    # προστασία
    if n_pos == 0:
        print(f"[label {li:02d}] '{label_name}': no positives -> skip")
        skipped += 1
        continue

    neg_sample_size = min(len(neg_idx), n_pos * 2)
    neg_sample_idx = np.random.choice(neg_idx, size=neg_sample_size, replace=False)

    sel_idx = np.concatenate([pos_idx, neg_sample_idx])
    np.random.shuffle(sel_idx)

    X_sub = X_train[sel_idx]
    y_sub = y_col[sel_idx]

    counts = Counter(y_sub)
    print(f"\n[label {li:02d}] '{label_name}' balanced counts: {counts} "
          f"(train on {len(sel_idx)} samples)")

    model = CatBoostClassifier(**base_params)
    t0 = time.time()
    model.fit(X_sub, y_sub)
    dt = time.time() - t0

    # save μοντέλο
    mpath = cat_model_path(li)
    model.save_model(mpath)
    print(f"    saved -> {mpath.name} ({dt:.1f}s)")
    trained += 1

    train_stats.append({
        "label_index": li,
        "label_name": label_name,
        "n_pos_total": int((y_col == 1).sum()),
        "n_neg_total": int((y_col == 0).sum()),
        "n_train_used": int(len(sel_idx)),
        "train_time_sec": float(dt),
    })

total_time = time.time() - total_start
print(f"\n Trained {trained} models, skipped {skipped}.")
print(f"Total CatBoost training time: {total_time:.1f} sec")

train_stats_df = pd.DataFrame(train_stats)
train_stats_df.to_csv(RESULTS_DIR / "catboost_training_stats.csv", index=False)
print("Saved training stats -> catboost_training_stats.csv")


Training 64 label-wise CatBoost models with balanced undersampling...

[label 00] 'adware' balanced counts: Counter({np.int64(0): 145064, np.int64(1): 72532}) (train on 217596 samples)


KeyboardInterrupt: 

**Cell 4: Evaluate Test set**

In [None]:
from catboost import CatBoostClassifier
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score
import numpy as np
import pandas as pd
import json
import os

n_labels = Y_train.shape[1]

def model_path_for(li: int):
    return MODELS_DIR / f"cat_label_{li:02d}.cbm"

# αν έχεις thresholds από tuning για CatBoost, φόρτωσέ τα εδώ
USE_THRESHOLDS = False
if USE_THRESHOLDS:
    THRESH_PATH = RESULTS_DIR / "catboost_label_thresholds.json"
    with open(THRESH_PATH) as f:
        thr_info = json.load(f)
    thr_map = {int(k): float(v) for k, v in thr_info["thresholds"].items()}
else:
    thr_map = {}

y_true_all, y_prob_all, y_pred_all = [], [], []

for i, (Xt, Yt) in enumerate(zip(X_test_parts, Y_test_parts), start=1):
    Xt = Xt.tocsr()
    Yt = Yt.astype(np.int8)

    probs = np.zeros_like(Yt, dtype=np.float32)
    preds = np.zeros_like(Yt, dtype=np.int8)

    for li in range(n_labels):
        mpath = model_path_for(li)
        if not mpath.exists():
            continue

        model = CatBoostClassifier()
        model.load_model(str(mpath))

        p = model.predict_proba(Xt)[:, 1]
        probs[:, li] = p

        thr = thr_map.get(li, 0.5)
        preds[:, li] = (p >= thr).astype(np.int8)

    y_true_all.append(Yt)
    y_prob_all.append(probs)
    y_pred_all.append(preds)
    print(f"[test] part {i}/{len(X_test_parts)} done")

y_true = np.vstack(y_true_all)
y_prob = np.vstack(y_prob_all)
y_pred = np.vstack(y_pred_all)

# metrics
p_micro, r_micro, f1_micro, _ = precision_recall_fscore_support(
    y_true, y_pred, average="micro", zero_division=0
)
p_macro, r_macro, f1_macro, _ = precision_recall_fscore_support(
    y_true, y_pred, average="macro", zero_division=0
)
p_w, r_w, f1_w, _ = precision_recall_fscore_support(
    y_true, y_pred, average="weighted", zero_division=0
)

try:
    auc_micro = roc_auc_score(y_true, y_prob, average="micro")
except ValueError:
    auc_micro = np.nan

auc_per_label = []
for li in range(n_labels):
    y = y_true[:, li]
    s = y_prob[:, li]
    if np.unique(y).size == 2:
        try:
            auc_per_label.append(roc_auc_score(y, s))
        except ValueError:
            pass
auc_macro = float(np.nanmean(auc_per_label)) if auc_per_label else np.nan

try:
    auc_weighted = roc_auc_score(y_true, y_prob, average="weighted")
except ValueError:
    auc_weighted = np.nan

summary = pd.DataFrame(
    {
        "Micro":    [p_micro, r_micro, f1_micro, auc_micro],
        "Macro":    [p_macro, r_macro, f1_macro, auc_macro],
        "Weighted": [p_w,     r_w,     f1_w,     auc_weighted],
    },
    index=["Precision", "Recall", "F1-score", "AUC"],
).round(4)

display(summary)

OUT_PATH_CSV  = RESULTS_DIR / "overall_metrics_catboost_capped_multi_label.csv"
OUT_PATH_JSON = RESULTS_DIR / "overall_metrics_catboost_capped_multi_label.json"
summary.to_csv(OUT_PATH_CSV)
summary.to_json(OUT_PATH_JSON, orient="index", indent=2)
print("Saved CatBoost test summary ->", OUT_PATH_CSV, OUT_PATH_JSON)


[test] part 1/45 done
[test] part 2/45 done
[test] part 3/45 done
[test] part 4/45 done
[test] part 5/45 done
[test] part 6/45 done
[test] part 7/45 done
[test] part 8/45 done
[test] part 9/45 done
[test] part 10/45 done
[test] part 11/45 done
[test] part 12/45 done
[test] part 13/45 done
[test] part 14/45 done
[test] part 15/45 done
[test] part 16/45 done
[test] part 17/45 done
[test] part 18/45 done
[test] part 19/45 done
[test] part 20/45 done
[test] part 21/45 done
[test] part 22/45 done
[test] part 23/45 done
[test] part 24/45 done
[test] part 25/45 done
[test] part 26/45 done
[test] part 27/45 done
[test] part 28/45 done
[test] part 29/45 done
[test] part 30/45 done
[test] part 31/45 done
[test] part 32/45 done
[test] part 33/45 done
[test] part 34/45 done
[test] part 35/45 done
[test] part 36/45 done
[test] part 37/45 done
[test] part 38/45 done
[test] part 39/45 done
[test] part 40/45 done
[test] part 41/45 done
[test] part 42/45 done
[test] part 43/45 done
[test] part 44/45 do



Unnamed: 0,Micro,Macro,Weighted
Precision,0.156,0.2296,0.2794
Recall,0.4438,0.4369,0.4438
F1-score,0.2309,0.2535,0.2955
AUC,0.832,0.8357,0.7898


Saved CatBoost test summary -> /Users/georgektenas/Desktop/Malware Project/data/behavior_vectors_paper/results_catboost_capped_eval/overall_metrics_catboost_capped_multi_label.csv /Users/georgektenas/Desktop/Malware Project/data/behavior_vectors_paper/results_catboost_capped_eval/overall_metrics_catboost_capped_multi_label.json


**Cell 5: Per-label β-tuning (thresholds) + tuned global metrics**

In [20]:
import numpy as np
import pandas as pd
import json
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score

# πρέπει να υπάρχουν από το Cell 4
assert "y_true" in globals() and "y_prob" in globals(), "Run Cell 4 first."

n_labels = y_true.shape[1]
beta_grid = np.arange(0.05, 0.96, 0.01)

best_thr = {}

print("Searching best threshold per label (maximize per-label F1)...")

for li in range(n_labels):
    y_true_li = y_true[:, li]

    # skip labels χωρίς και τις δύο κλάσεις
    if y_true_li.max() == y_true_li.min():
        continue

    probs_li = y_prob[:, li]

    best_f1 = -1.0
    best_b = 0.5

    for b in beta_grid:
        y_pred_li = (probs_li >= b).astype(int)
        _, _, f1_li, _ = precision_recall_fscore_support(
            y_true_li, y_pred_li, average="binary", zero_division=0
        )
        if f1_li > best_f1:
            best_f1 = f1_li
            best_b = float(b)

    best_thr[li] = best_b

print(f"Computed thresholds for {len(best_thr)} labels.")

# --- Save thresholds σε json ---

thr_out = {
    "description": "Per-label thresholds for CatBoost (capped, multi-label), tuned to maximize per-label F1.",
    "thresholds": {str(k): float(v) for k, v in best_thr.items()},
}
thr_path = RESULTS_DIR / "catboost_label_thresholds.json"
with open(thr_path, "w") as f:
    json.dump(thr_out, f, indent=2)
print("Saved thresholds ->", thr_path)

# --- Global tuned metrics ---

# φτιάχνουμε vector thresholds, default 0.5 όπου δεν υπάρχει τιμή
thr_vec = np.array([best_thr.get(li, 0.5) for li in range(n_labels)], dtype=float)

y_pred_tuned = (y_prob >= thr_vec[None, :]).astype(int)

# micro / macro / weighted
p_micro, r_micro, f1_micro, _ = precision_recall_fscore_support(
    y_true, y_pred_tuned, average="micro", zero_division=0
)
p_macro, r_macro, f1_macro, _ = precision_recall_fscore_support(
    y_true, y_pred_tuned, average="macro", zero_division=0
)
p_w, r_w, f1_w, _ = precision_recall_fscore_support(
    y_true, y_pred_tuned, average="weighted", zero_division=0
)

# AUC (micro & weighted όπως πριν)
try:
    auc_micro = roc_auc_score(y_true, y_prob, average="micro")
except ValueError:
    auc_micro = np.nan

try:
    auc_w = roc_auc_score(y_true, y_prob, average="weighted")
except ValueError:
    auc_w = np.nan

# Macro AUC: μέσος όρος μόνο από labels με και τις δύο κλάσεις
auc_labels = []
for li in range(n_labels):
    y_li = y_true[:, li]
    if y_li.max() == y_li.min():
        continue  # skip labels χωρίς 0/1
    try:
        auc_li = roc_auc_score(y_li, y_prob[:, li])
        auc_labels.append(auc_li)
    except ValueError:
        continue

auc_macro = float(np.mean(auc_labels)) if auc_labels else np.nan

tuned_summary = pd.DataFrame(
    {
        "Micro":    [p_micro, r_micro, f1_micro, auc_micro],
        "Macro":    [p_macro, r_macro, f1_macro, auc_macro],
        "Weighted": [p_w,     r_w,     f1_w,     auc_w],
    },
    index=["Precision", "Recall", "F1-score", "AUC"],
).round(4)

print("\n=== CatBoost Tuned (per-label β) ===")
display(tuned_summary)

# save
tuned_summary.to_csv(
    RESULTS_DIR / "overall_metrics_catboost_capped_tuned_beta.csv"
)
tuned_summary.to_json(
    RESULTS_DIR / "overall_metrics_catboost_capped_tuned_beta.json",
    orient="index",
    indent=2
)

print("Saved tuned summary -> overall_metrics_catboost_capped_tuned_beta.(csv/json)")


Searching best threshold per label (maximize per-label F1)...
Computed thresholds for 55 labels.
Saved thresholds -> /Users/georgektenas/Desktop/Malware Project/data/behavior_vectors_paper/results_catboost_capped_eval/catboost_label_thresholds.json

=== CatBoost Tuned (per-label β) ===




Unnamed: 0,Micro,Macro,Weighted
Precision,0.2389,0.4404,0.4697
Recall,0.4522,0.45,0.4522
F1-score,0.3127,0.3851,0.4104
AUC,0.832,0.8357,0.7898


Saved tuned summary -> overall_metrics_catboost_capped_tuned_beta.(csv/json)
