In [None]:
#imports
import os
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn
import sklearn

from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.metrics import confusion_matrix, make_scorer
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline


In [None]:
def get_feature_combos():
    a =  [
    ['FS-I'],
    ['FS-II'],
    ['FS-III'],
    ['FS-IV'],
    ['FS-I', 'FS-II'],
    ['FS-I', 'FS-III'],
    ['FS-I', 'FS-IV'],
    ['FS-II', 'FS-III'],
    ['FS-II', 'FS-IV'],
    ['FS-III', 'FS-IV'],
    ['FS-I', 'FS-II', 'FS-III'],
    ['FS-I', 'FS-II', 'FS-IV'],
    ['FS-I', 'FS-III', 'FS-IV'],
    ['FS-II', 'FS-III', 'FS-IV'],
    ['FS-I', 'FS-II', 'FS-III', 'FS-IV']
    ]
    
    return a

# Hyperparametric tuning
# after we find the best feature set from feature_experiment()
def tuning(best_fs, data_path='.', period="data-2010-15", k=5, seed=123):
   
    # Define parameter grid (params to test)
    param_grid = {
        "kernel": ["rbf", "linear", "poly"],
        "gamma": ["scale", 0.1, 0.05, 0.01, 0.005, 0.001],
        "C": [0.1, 0.5, 1, 5, 10, 30],
    }

    # Load and preprocess data (only removing NaNs; scaling handled in CV)
    ds = my_svm(data_path=data_path, period=period)
    X, y = ds.preprocess(best_fs)

    # fold data
    folds = StratifiedKFold(n_splits=k, shuffle=True, random_state=seed)

    # Including a sclara in pipeline to avoid leakage during tuning
    pipe = Pipeline([
        ("scaler", StandardScaler()),
        ("svc", SVC())
    ])

    # SCORER FUNCTION for GridSearch
    def tss_score(y_true, y_pred):
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()
        tpr = tp / max(tp + fn, 1)
        fpr = fp / max(fp + tn, 1)
        return tpr - fpr
    scorer = make_scorer(tss_score)

    # Perform grid search
    grid = GridSearchCV(
        estimator=pipe,
        param_grid={
            "svc__C": param_grid["C"],
            "svc__gamma": param_grid["gamma"],
            "svc__kernel": param_grid["kernel"],
        },
        scoring=scorer, cv=folds, n_jobs=None, refit=True
    )

    print("\nPerforming GridSearchCV hyperparameter tuning...")
    grid.fit(X, y)

    print("\nBest Parameters found:")
    print(grid.best_params_)
    print(f"Best CV TSS Score: {grid.best_score_:.4f}")

    # Extract SVC parameters (remove 'svc__' prefix)
    best_params = {
        "C": grid.best_params_["svc__C"],
        "gamma": grid.best_params_["svc__gamma"],
        "kernel": grid.best_params_["svc__kernel"]
    }

    return best_params



In [None]:
class my_svm():
    def __init__(self, data_path='.', period="data-2010-15"):
        self.data_dir = os.path.join(data_path, period)
        self.neg_main  = os.path.join(self.data_dir, "neg_features_main_timechange.npy")  # FS-I(0:18), FS-II(18:90)
        self.pos_main  = os.path.join(self.data_dir, "pos_features_main_timechange.npy")
        self.neg_hist  = os.path.join(self.data_dir, "neg_features_historical.npy")       # FS-III(0:1)
        self.pos_hist  = os.path.join(self.data_dir, "pos_features_historical.npy")
        self.neg_mm    = os.path.join(self.data_dir, "neg_features_maxmin.npy")           # FS-IV(0:18)
        self.pos_mm    = os.path.join(self.data_dir, "pos_features_maxmin.npy")
        self.order_fn  = os.path.join(self.data_dir, "data_order.npy")
        
        # class sizes
        self.n_neg = None
        self.n_pos = None

        self.hyperparams = {"C": 1.0, "kernel": "rbf", "gamma": "scale"}
        pass

    def preprocess(self,fs_value):
        X_raw = self.feature_creation(fs_value)
        
        #apply numpy ordering
        order =np.load(self.order_fn).astype(int).ravel()
        X = X_raw[order, :]

        #labels (0 for NEG, 1 for POS)
        y_labels = np.concatenate([np.zeros(self.n_neg, dtype=int),
                                 np.ones(self.n_pos, dtype=int)])
        y = y_labels[order]

        # drop rows with invalid values
        finite = np.isfinite(X)
        row_ok = finite.all(axis=1)
        X = X[row_ok, :]
        y = y[row_ok]

        # Normalization handled in cross_validation
        # Compute μ/σ on the training set once, store them, and reuse those for val/test.

        return X, y

    def feature_creation(self, fs_value):
         
        fs_value = [s.upper() for s in fs_value]
        
        neg_main = np.load(self.neg_main)
        pos_main = np.load(self.pos_main)
        neg_hist = np.load(self.neg_hist)
        pos_hist = np.load(self.pos_hist)
        neg_mm   = np.load(self.neg_mm)
        pos_mm   = np.load(self.pos_mm)

        fs_i_neg  = neg_main[:, 0:18]   if "FS-I"  in fs_value else None
        fs_i_pos  = pos_main[:, 0:18]   if "FS-I"  in fs_value else None
        fs_ii_neg = neg_main[:, 18:90]  if "FS-II" in fs_value else None
        fs_ii_pos = pos_main[:, 18:90]  if "FS-II" in fs_value else None

        fs_iii_neg = neg_hist[:, 0:1]   if "FS-III" in fs_value else None
        fs_iii_pos = pos_hist[:, 0:1]   if "FS-III" in fs_value else None

        fs_iv_neg  = neg_mm[:, 0:18]    if "FS-IV" in fs_value else None
        fs_iv_pos  = pos_mm[:, 0:18]    if "FS-IV" in fs_value else None

        neg_parts = []
        pos_parts = []

        for part in [fs_i_neg, fs_ii_neg, fs_iii_neg, fs_iv_neg]:
            if part is not None:
                neg_parts.append(part)

        for part in [fs_i_pos, fs_ii_pos, fs_iii_pos, fs_iv_pos]:
            if part is not None:
                pos_parts.append(part)

        if len(neg_parts) == 1:
            neg_block = neg_parts[0]
        else:
            neg_block = np.hstack(neg_parts)

        if len(pos_parts) == 1:
            pos_block = pos_parts[0]
        else:
            pos_block = np.hstack(pos_parts)

        self.n_neg = neg_block.shape[0]
        self.n_pos = pos_block.shape[0]

        # Stack NEG first, then POS
        X_raw = np.vstack([neg_block, pos_block])
        return X_raw    

    def cross_validation(self,fs_value):
         
        seed = 123
        k=5 #  k = 5 not 10
        
        # get the ordered & processed data:
        X, Y = self.preprocess(fs_value)

        # instatiate the folds, using stratfied K Fold for class balance
        fold = StratifiedKFold(n_splits=k, shuffle=True, random_state=seed)
        tss_list = []
        cm = np.array([[0, 0],[0, 0]], dtype=int)  # [[TN,FP],[FN,TP]]

        #fold.split returns the 5 splits of train,test indice tuples
        for train, test in fold.split(X,Y):

            # seperate data into folds 
            X_train, X_test = X[train], X[test]
            y_train, y_test = Y[train], Y[test]

            # normalization (fit on train, apply to both train + test to avoid data leakage)
            mu = X_train.mean(axis=0, keepdims=True)
            sd = X_train.std(axis=0, ddof=0, keepdims=True); sd[sd == 0] = 1.0
            X_train = (X_train - mu) / sd
            X_test = (X_test - mu) / sd

            # Train & fit
            # train on trainign set,
            model = self.training(X_train, y_train)
            # and test on test set
            y_hat = model.predict(X_test)

            # update tss, and data for confusion matrix
            tss_val = self.tss(y_test, y_hat)
            tss_list.append(tss_val)
            cm += confusion_matrix(y_test, y_hat, labels=[0,1])

        # return the required metrics
        self.metrics = {
            "tss"          : tss_list,
            "mean_tss"     : float(np.mean(tss_list)),
            "agg_cm"        : cm
        }

        return self.metrics

    def training(self, X, Y):
         
        C = self.hyperparams.get("C")
        gamma = self.hyperparams.get("gamma")
        kernel = self.hyperparams.get("kernel")


        svm = SVC(C=C, gamma = gamma, kernel=kernel) # gamma = 'scale'
        svm.fit(X, Y)
        return svm

    def tss(self,y_true, y_pred):
        tp = np.sum((y_true == 1) & (y_pred == 1))
        tn = np.sum((y_true == 0) & (y_pred == 0))
        fp = np.sum((y_true == 0) & (y_pred == 1))
        fn = np.sum((y_true == 1) & (y_pred == 0))

        tpr = tp / max(tp + fn, 1)
        fpr = fp / max(fp + tn, 1)
        return (tpr - fpr)




In [None]:
def feature_experiment(data_path='.', period="data-2010-15"):
     
    k = 5
    combos = get_feature_combos()
    confusion_matrices = [] 

    ## Data for plotting
    rows = []               # for DataFrame
    tss_allfolds = []       # for the single overlay chart
    labels = []   # legend labels
       

    for fs_list in combos:
        # init svm class
        ds = my_svm(data_path=data_path, period=period)
        metrics = ds.cross_validation(fs_list)  # get the metrics for this feature set

        tss_per_fold = np.array(metrics["tss"], dtype=float)
        mean_tss = float(np.mean(tss_per_fold))
        std_tss  = float(np.std(tss_per_fold, ddof=1)) if tss_per_fold.size > 1 else 0.0

        #confusion matrix
        cm = metrics["agg_cm"]
        tn, fp, fn, tp = cm[0,0], cm[0,1], cm[1,0], cm[1,1]

        # Print metrics for each feature set combo
        print(f"\nFeature Set: {', '.join(fs_list)}")
        print(f"Mean TSS: {mean_tss:.4f}, Std Dev: {std_tss:.4f}")
        # Report TP, FP, TN, FN 
        print("Confusion Matrix (aggregated over 5 folds)")
        print(f"TP={tp}  FP={fp}  TN={tn}  FN={fn}")

        # Store table row
        rows.append({
            "feature_set": " + ".join(fs_list),
            "mean_tSS": mean_tss,
            "std_tSS": std_tss,
            "TP": tp, "FP": fp, "TN": tn, "FN": fn
        })

        # Keep for overlay chart
        tss_allfolds.append(tss_per_fold) ## append the TSS for this combination
        labels.append(" + ".join(fs_list)) ## add the label
        confusion_matrices.append((fs_list, cm)) ## add the CM values

        # Plot this combo’s confusion matrix (one for each combination)
    
        plt.figure(figsize=(3.8, 3.2))
        seaborn.heatmap(cm, annot=True, fmt="d", cbar=True, xticklabels=["Pred 0","Pred 1"], yticklabels=["True 0","True 1"])
        plt.title(f"Confusion {', '.join(fs_list)}")
        plt.tight_layout()
        plt.show()

    # plot fold-wise TSS for all 15 combos
    plt.figure(figsize=(7.5, 4.8))
    x = np.arange(1, k + 1)
    for tss_series, label in zip(tss_allfolds, labels):
        # plot a line for each combination
        plt.plot(x, tss_series, marker='o', linewidth=1.0, alpha=0.8, label=label)

    plt.xlabel("Fold")
    plt.ylabel("TSS")
    plt.title("Fold-wise TSS for All Feature Set Combinations (2010–2015, k=5)")
    # clegend
    plt.legend(loc="center left", bbox_to_anchor=(1.02, 0.5), fontsize=8, ncol=1)
    plt.tight_layout()
    plt.show()

    # Pick best by mean TSS
    # IDMAX finds the index with the max value
    df = pd.DataFrame(rows)
    best_row = df.loc[df["mean_tSS"].idxmax()]
    best_name = best_row["feature_set"]

    # worst
    worst_row = df.loc[df["mean_tSS"].idxmin()] # fs-III to get second to worst add a +1 to idxmin()
    worst_name = worst_row["feature_set"]   

    print("\n************************************")
    print(f"Best FeatureSet Combination by TSS:\n {best_name}  had  Mean TSS: {best_row['mean_tSS']:.4f}, Std Dev: {best_row['std_tSS']:.4f}")
    print("************************************")
    print("\n************************************")
    print(f"Worst FeatureSet Combination by TSS:\n {worst_name}  had  Mean TSS: {worst_row['mean_tSS']:.4f}, Std Dev: {worst_row['std_tSS']:.4f}")
    print("************************************")

    ## best feature set used for tuning
    return df, best_name



In [None]:
def data_experiment(best_params, best_fs, data_path='.',):
     
    print("*"*90)
    print("*"*90)
    print('\t DATA EXPERIMENT\n\n')

    print(f"\nUsing best FS from feature_experiment(): {', '.join(best_fs)}")
    print(f"\nUsing tuned SVC params: {best_params}")


    out = {}
    # 2) Run on both datasets
    for period in ["data-2010-15", "data-2020-24"]:
        print("\n" + "="*40)
        print(f"Dataset: {period}")
        print("="*40)

        #build the model with ideal parameters
        # use hte data from the given period (2010, 2020)
        ds = my_svm(data_path=data_path, period=period)
        ds.hyperparams.update(best_params)
        # get metrics of CV
        metrics = ds.cross_validation(best_fs)

        # Mean / std TSS
        tss_per_fold = np.array(ds.metrics["tss"], dtype=float)
        mean_tss = float(np.mean(tss_per_fold))
        std_tss  = float(np.std(tss_per_fold, ddof=1)) if tss_per_fold.size > 1 else 0.0
        print(f"Mean TSS: {mean_tss:.4f}, Std Dev: {std_tss:.4f}")

        # Confusion matrix (aggregated over folds)
        cm = ds.metrics["agg_cm"]
        tn, fp, fn, tp = cm[0,0], cm[0,1], cm[1,0], cm[1,1]
        print(f"Confusion Matrix (aggregated over 5 folds):")
        print(f"TN={tn}  FP={fp}\nFN={fn}  TP={tp}")

        # Heatmap of confusion matrix
        plt.figure(figsize=(3.8, 3.2))
        seaborn.heatmap(cm, annot=True, fmt="d", cbar=True,
                        xticklabels=["Pred 0","Pred 1"], yticklabels=["True 0","True 1"])
        plt.title(f"Confusion Matrix — {period}")
        plt.tight_layout()
        plt.show()

        # Bar chart of TSS per fold
        plt.figure(figsize=(5.2, 3.0))
        x = np.arange(1, tss_per_fold.size + 1)
        plt.bar(x, tss_per_fold)
        plt.xlabel("Fold")
        plt.ylabel("TSS")
        plt.title(f"TSS per Fold — {period}")
        plt.xticks(x)
        plt.tight_layout()
        plt.show()
    return out


In [None]:
## run the feature experiment for all combinations
_, feat = feature_experiment()

## get the best feature set
best_fs = [s.strip() for s in feat.split('+')]

## tune hyperparameters
params = tuning(best_fs, data_path='.', period="data-2010-15", k=3, seed=123)

## Run data experiment with best feature set/hyperparameters on both datasets
result = data_experiment(params, best_fs = best_fs)
