### Imports

In [1]:
import json
import yaml
import string
import itertools
from tqdm import trange

import numpy as np
import pandas as pd

import sys
sys.path.append("..")

from simulation.simulation_metrics import svm_discrimination, prepare_det_data, lstm_discrimination
from utils import ts_to_lagged

from sklearn.metrics import roc_auc_score
from sklearn.svm import SVC

import matplotlib.pyplot as plt
import seaborn as sns

rng = np.random.default_rng()

def prind(di): print(json.dumps(di, sort_keys=False, indent=4))

COL_NAMES = list(string.ascii_uppercase) + ["".join(a) for a in list(itertools.permutations(list(string.ascii_uppercase), r=2))]

### P=Q

In [None]:
classifiers = {
    "svc": svm_discrimination, 
    "lstm": lstm_discrimination
}
aucs = {
    "svc": [], 
    "lstm": [], 
}
for n in trange(20):

    """ ____________ Data ____________ """

    # triginoetric data
    index = np.linspace(0, 500, 502)
    col_1 = np.sin(index) + np.random.rand(len(index))*0.2 
    col_2 = []
    for i, x1 in zip(index[1:], col_1[1:]):
        col_2.append(1 + np.cos(i) + np.cos(x1)*0.4 + 0.5 + np.random.rand()*0.2)
    col_3 = []
    for i, x1, x2 in zip(index[2:], col_1[2:], col_2[1:]):
        col_3.append(2 + np.tanh(i) + np.sin(x1) + np.cos(x2)*0.3 + 0.5 + np.random.rand()*0.2)
    index = np.array(index[2:])
    col_1 = np.array(col_1[2:])
    col_2 = np.array(col_2[1:])
    col_3 = np.array(col_3[:])

    # real
    data = np.stack([col_1, 
                    col_2, 
                    col_3], axis=1)
    real = pd.DataFrame(data=data, columns=COL_NAMES[:data.shape[1]])

    # simulated
    synthetic = real.copy()

    """ ____________ C2ST ____________ """

    for clf_name, clf in classifiers.items():

        auc, probs, ys = clf(real=real, synthetic=synthetic)

        aucs[clf_name].append(auc)

for k, v in aucs.items():
    print(f"{k} : {np.mean(v).round(3)} +- {np.var(v).round(3)}")

100%|██████████| 20/20 [00:21<00:00,  1.08s/it]

svc : 0.622 +- 0.0
lstm : 0.47 +- 0.0





### Added Noise

In [4]:
""" ____________ Data ____________ """

# triginoetric functions
index = np.linspace(0, 500, 502)
col_1 = np.sin(index) + np.random.rand(len(index))*0.2 
col_2 = []
for i, x1 in zip(index[1:], col_1[1:]):
    col_2.append(1 + np.cos(i) + np.cos(x1)*0.4 + 0.5 + np.random.rand()*0.2)
col_3 = []
for i, x1, x2 in zip(index[2:], col_1[2:], col_2[1:]):
    col_3.append(2 + np.tanh(i) + np.sin(x1) + np.cos(x2)*0.3 + 0.5 + np.random.rand()*0.2)
index = np.array(index[2:])
col_1 = np.array(col_1[2:])
col_2 = np.array(col_2[1:])
col_3 = np.array(col_3[:])

# real
data = np.stack([col_1, 
                 col_2, 
                 col_3], axis=1)
real = pd.DataFrame(data=data, columns=COL_NAMES[:data.shape[1]])

for loc, scale in [(0.0, 1), (0.5, 1), (1.0, 1), (1.5, 1), (2.0, 1), (2.5, 1), (3.0, 1)]:

    print(f" ____________ loc={loc} | scale={scale} ____________ ")

    # simulated
    data = np.stack([col_1 + np.random.normal(loc=loc, scale=scale, size=len(col_1)), 
                    col_2 + np.random.normal(loc=loc, scale=scale, size=len(col_1)), 
                    col_3 + np.random.normal(loc=loc, scale=scale, size=len(col_1))], axis=1)
    synthetic = pd.DataFrame(data=data, columns=COL_NAMES[:data.shape[1]])


    """ ____________ C2ST ____________ """

    classifiers = {
        "svc": svm_discrimination, 
        "lstm": lstm_discrimination
    }
    aucs = {
        "svc": [], 
        "lstm": []
    }
    for n in range(20):

        for clf_name, clf in classifiers.items():

            auc, probs, ys = clf(real=real, synthetic=synthetic)

            aucs[clf_name].append(auc)

    for k, v in aucs.items():
        print(f"{k} : {np.mean(v).round(2)} +- {np.var(v).round(2)}")

 ____________ loc=0.0 | scale=1 ____________ 
svc : 1.0 +- 0.0
lstm : 0.72 +- 0.04
 ____________ loc=0.5 | scale=1 ____________ 
svc : 1.0 +- 0.0
lstm : 0.97 +- 0.01
 ____________ loc=1.0 | scale=1 ____________ 
svc : 1.0 +- 0.0
lstm : 1.0 +- 0.0
 ____________ loc=1.5 | scale=1 ____________ 
svc : 1.0 +- 0.0
lstm : 1.0 +- 0.0
 ____________ loc=2.0 | scale=1 ____________ 
svc : 1.0 +- 0.0
lstm : 1.0 +- 0.0
 ____________ loc=2.5 | scale=1 ____________ 
svc : 1.0 +- 0.0
lstm : 1.0 +- 0.0
 ____________ loc=3.0 | scale=1 ____________ 
svc : 1.0 +- 0.0
lstm : 1.0 +- 0.0


### Bootstrap Bias Correction

In [12]:
# bootstrap bias correction
def bbc(
        probs: list,      
        labels: list,   
        loss: callable,       
        NB: int,         
        NBS: int | None = None
):
    """
    Docstring for bbc

    Arguments
    ---    
    probs (list) : a nested list or array-like of the probabilities of each classifier on the test set 
    labels (list): a list or array-like of the test label
    loss (callable) : the loss/scoring function
    NB (int) : number of bootstrap iterations
    NBS (int) : number of permuted samples in each bootstrap iteration

    Return
    ---
    res (dict) : a dictionary with the bbc-loss `bbc` and the confidence intervals `ci`
    """
    if NBS is None:
        NBS = len(labels) 

    # placeholders
    scores_bbc = []

    # BBC loop
    for b in range(NB):
        b_indices = np.random.choice(a=np.arange(len(probs[0])), size=NBS, replace=True)    # sample with replacement
        c_indices = np.array([ind for ind in np.arange(len(probs[0])) if ind not in b_indices])    # non-sampled indices
        probs_b = [np.array(probs)[b_indices] for probs in probs]
        probs_c = [np.array(probs)[c_indices] for probs in probs]
        scores_b = [loss(y_true=np.array(labels)[b_indices], y_score=np.array(y_prob)[b_indices]) for y_prob in probs]
        i = np.argmin([np.abs(0.5-x) for x in scores_b])   # css on the bootstrapped out-ofsample predictions
        # print(f"LOG: DEV: chosen config for boot {b} : {i}")
        score_ci = loss(y_true=np.array(labels)[c_indices], y_score=probs_c[i])    # error of i on complement c
        scores_bbc.append(score_ci)
    bbc = np.mean(scores_bbc)    # mean bbc loss
    sorted_scores_bbc = sorted(scores_bbc)
    ci = (sorted_scores_bbc[int(0.025*NB)], sorted_scores_bbc[int(0.975*NB)])

    return {"bbc": bbc, "ci": ci}


# data
index = np.linspace(0, 500, 502)
col_1 = np.sin(index) + np.random.rand(len(index))*0.2 
col_2 = []
for i, x1 in zip(index[1:], col_1[1:]):
    col_2.append(1 + np.cos(i) + np.cos(x1)*0.4 + 0.5 + np.random.rand()*0.2)
col_3 = []
for i, x1, x2 in zip(index[2:], col_1[2:], col_2[1:]):
    col_3.append(2 + np.tanh(i) + np.sin(x1) + np.cos(x2)*0.3 + 0.5 + np.random.rand()*0.2)
index = np.array(index[2:])
col_1 = np.array(col_1[2:])
col_2 = np.array(col_2[1:])
col_3 = np.array(col_3[:])
# real
data = np.stack([col_1, 
                 col_2, 
                 col_3], axis=1)
real = pd.DataFrame(data=data, columns=COL_NAMES[:data.shape[1]])
 # simulated
data = np.stack([col_1 + np.random.normal(loc=0, scale=1.25, size=len(col_1)), 
                col_2 + np.random.normal(loc=0, scale=1.25, size=len(col_1)), 
                col_3 + np.random.normal(loc=0, scale=1.25, size=len(col_1))], axis=1)
synthetic = pd.DataFrame(data=data, columns=COL_NAMES[:data.shape[1]])
real_lagged = ts_to_lagged(data=real, lagged_feats=None, lags=2, contemporaneous=True)
synthetic_lagged = ts_to_lagged(data=synthetic, lagged_feats=None, lags=2, contemporaneous=True)
train_X, train_Y, test_X, test_Y = prepare_det_data(real=real_lagged, synthetic=synthetic_lagged)


# discriminators
svm_search_space = json.load(open("../configs/discrimination/svc_configs.json", "r"))
keys = [k for k in svm_search_space.keys() if k!="lags"]
values = [v for k, v in svm_search_space.items() if k!="lags"]
configs = [dict(zip(keys, config)) for config in list(itertools.product(*values))]
results = {
    "config": [], 
    "probs": [],
    "labels": [],
    "auc": [] 
}
for config in configs:
    clf = SVC(
        **config,
        probability=True
    )
    # Fit the SVC model
    clf.fit(X=train_X, y=train_Y)
    # Predicted probabilities
    preds_test = clf.predict_proba(X=test_X)[:, 1]
    # Calculate ROC-AUC
    auc = roc_auc_score(y_true=test_Y, y_score=preds_test)
    # partial(detection, **config)
    results["config"].append(config)
    results["probs"].append(preds_test)
    results["labels"].append(test_Y)
    results["auc"].append(auc)
    print(f"{config} - {auc}")
    print()

res = bbc(probs=results["probs"], labels=test_Y, loss=roc_auc_score, NB=1000)
res

{'C': 1.0, 'kernel': 'linear', 'degree': 3, 'gamma': 'scale'} - 0.524

{'C': 1.0, 'kernel': 'rbf', 'degree': 3, 'gamma': 'scale'} - 1.0

{'C': 0.75, 'kernel': 'linear', 'degree': 3, 'gamma': 'scale'} - 0.44000000000000006

{'C': 0.75, 'kernel': 'rbf', 'degree': 3, 'gamma': 'scale'} - 1.0

{'C': 0.5, 'kernel': 'linear', 'degree': 3, 'gamma': 'scale'} - 0.45838709677419354

{'C': 0.5, 'kernel': 'rbf', 'degree': 3, 'gamma': 'scale'} - 1.0



{'bbc': 0.4828869010049457, 'ci': (0.35, 0.5930232558139534)}

### Aeons Classifiers

In [None]:
# from aeon.classification.deep_learning import LITETimeClassifier, IndividualLITEClassifier, RecurrentNetwork
# from sklearn.metrics import roc_auc_score

# def lite_detection_XY(
#         train_X : np.array, 
#         train_Y : np.array, 
#         test_X : np.array, 
#         test_Y : np.array, 
#         n_epochs: int = 50, 
# ):
#     """ 
#     Detection test w/ ResNet-based classifier for real & synthetic datasets. Based on the aeon's LITE implementation: 
#     https://www.aeon-toolkit.org/en/stable/api_reference/auto_generated/aeon.classification.deep_learning.LITETimeClassifier.html. 
#     No internal data preparation, thus **train_X**, **train_Y**, **test_X** and **test_Y** are requested as arguments. 

#     Args
#     ----
#     train_X (numpy.array) : the training data as a numpy array 
#     train_Y (numpy.array) : the training labels as a numpy array
#     test_X (numpy.array) : the testing data as a numpy array
#     test_Y (numpy.array) : the testing labels as a numpy array 
#     split (float) : the length of the training set as a percentage of the merged set length; (default = 0.75)
#     n_epochs (int) : number of training epochs for the classifier; check aeon's documentation 

#     Return
#     ------
#     auc (float) : the computed auc, also based on the sklearn implementation
#     probs (list) : the probabilites per sample predicted by the classifier
#     """
#     # Instantiate the SVC model
#     clf = IndividualLITEClassifier(
#         use_litemv=True, 
#         n_epochs = n_epochs,
#         # probability=True
#     )

#     # Fit the SVC model
#     clf.fit(X=train_X, y=train_Y)

#     # Predicted probabilities
#     preds_test = clf.predict_proba(X=test_X)[:, 1]

#     return roc_auc_score(y_true=test_Y, y_score=preds_test), preds_test, test_Y


# def rnn_detection_XY(
#         train_X : np.array, 
#         train_Y : np.array, 
#         test_X : np.array, 
#         test_Y : np.array, 
#         n_epochs: int = 50, 
# ):
#     """ 
#     Detection test w/ ResNet-based classifier for real & synthetic datasets. Based on the aeon's LITE implementation: 
#     https://www.aeon-toolkit.org/en/stable/api_reference/auto_generated/aeon.classification.deep_learning.LITETimeClassifier.html. 
#     No internal data preparation, thus **train_X**, **train_Y**, **test_X** and **test_Y** are requested as arguments. 

#     Args
#     ----
#     train_X (numpy.array) : the training data as a numpy array 
#     train_Y (numpy.array) : the training labels as a numpy array
#     test_X (numpy.array) : the testing data as a numpy array
#     test_Y (numpy.array) : the testing labels as a numpy array 
#     split (float) : the length of the training set as a percentage of the merged set length; (default = 0.75)
#     n_epochs (int) : number of training epochs for the classifier; check aeon's documentation 

#     Return
#     ------
#     auc (float) : the computed auc, also based on the sklearn implementation
#     probs (list) : the probabilites per sample predicted by the classifier
#     """
#     # Instantiate the SVC model
#     clf = IndividualLITEClassifier(
#         use_litemv=True, 
#         n_epochs = n_epochs,
#         # probability=True
#     )

#     # Fit the SVC model
#     clf.fit(X=train_X, y=train_Y)

#     # Predicted probabilities
#     preds_test = clf.predict_proba(X=test_X)[:, 1]

#     return roc_auc_score(y_true=test_Y, y_score=preds_test), preds_test, test_Y


# train_X, train_Y, test_X, test_Y = prepare_det_data(real=real, synthetic=synthetic)
# auc, probs, ys = lite_detection_XY(train_X, train_Y, test_X, test_Y)

# print(auc)