In [None]:
import numpy as np
import os
import pandas as pd
import sys
import torch

from matplotlib import pyplot as plt
from tqdm import tqdm


project_dir = os.path.join(os.getcwd(),'..')
if project_dir not in sys.path:
    sys.path.append(project_dir)

from experiments.MNIST import ExperimentSVM
from experiments.utils import generate_roc_df

In [None]:
from itertools import chain
def generate_roc_df(roc_list:list) -> pd.DataFrame:
    ''' 
        Create a DataFrame from a list of roc curves
        Args:
        -----
            roc_list: list
                List of N roc curves where N is the number of iterations. It is a 
                list of tuples of the form (fpr, tpr), where fpr is the false positive
                rate and tpr is the true positive rate.
        Returns:
        --------
            roc_df: pd.DataFrame
                DataFrame with multiindex
    '''
    index_names = [
        list(map(lambda x: 'It {}'.format(x), np.repeat(np.arange(len(roc_list)), 2) + 1 )),
        ['FPR', 'TPR']*len(roc_list)
    ]
        
    tuples = list(zip(*index_names))
    index = pd.MultiIndex.from_tuples(tuples)
    roc_df = pd.DataFrame(chain.from_iterable(roc_list), index=index)
    return roc_df

def generate_multi_df(data:list, index_names:list) -> pd.DataFrame:
    index_names = [
        list(map(lambda x: 'It {}'.format(x), np.repeat(np.arange(len(data)), len(index_names)) + 1 )),
        index_names*len(data)
    ]

    tuples = list(zip(*index_names))
    index = pd.MultiIndex.from_tuples(tuples)
    return pd.DataFrame(chain.from_iterable(data), index=index)

def save_result(roc:list, scores:list, metrics:np.ndarray, config:dict) -> tuple:
    '''
        Save the results of the experiment
        Args:
        -----
            roc: list
                List of roc curves
            auc: list
                List of AUC scores
            config: dict
                Configuration of the experiment

        Returns:
        --------
            roc_df: pd.DataFrame
                ROC Cuve dataFrame with multiindex, considering the iterations.
            auc_df: pd.DataFrame
                DataFrame with the AUC scores
    '''

    roc_df = generate_multi_df(roc, ['FPR', 'TPR']).T
    scores_df = generate_multi_df(scores, ['Normal', 'Anomaly']).T
    metrics_df = pd.DataFrame(metrics, columns=['Accuracy', 'Precision', 'Recall', 'F1', 'AUC'])
    
    roc_df.to_pickle(os.path.join(config['save_result_dir'], 'roc.pkl'))
    scores_df.to_pickle(os.path.join(config['save_result_dir'], 'sample_score.pkl'))
    metrics_df.to_csv(os.path.join(config['save_result_dir'], 'metrics.csv'))

    return roc_df, scores_df, metrics_df

In [None]:
n_iter = 3
seed = 2*np.arange(n_iter, dtype=int) + 42
iterator = tqdm(
            range(n_iter),
            leave=True,
            unit="It.",
            postfix={"AUC": "%.3f" % -1},
        )

roc, scores = [], []
metrics = np.empty((n_iter, 5)) # acc, prec, rec, f1, auc

for it in iterator:
    exp = ExperimentSVM(known_anomalies=.1, pollution=0.1, seed=int(seed[it]))
    if it == 0:
        config = exp.config()
        exp.save_config()
    
    
    auc_score = exp.run(verbose=0)
    iterator.set_postfix({"AUC": "%.3f" % auc_score})

    fpr, tpr, roc_auc = exp.test()
    normal_scores, anomaly_scores = exp.test_score_samples()
    acc, prec, rec, f1 = exp.test_classification_metrics()
    
    roc.append((fpr, tpr))   
    scores.append((normal_scores, anomaly_scores))
    metrics[it] = [acc, prec, rec, f1, roc_auc]
    
roc_df, scores_df, metrics_df = save_result(roc, scores, metrics, config)

In [None]:
roc_df.to_pickle(os.path.join(config['save_result_dir'], 'roc.pkl'))
scores_df.to_pickle(os.path.join(config['save_result_dir'], 'sample_score.pkl'))
metrics_df.to_csv(os.path.join(config['save_result_dir'], 'metrics.csv'))

In [None]:
normal_score = scores_df['It 3', 'Normal'].values
anomaly_scores = scores_df['It 3', 'Anomaly'].values

plt.figure(figsize=(6, 4))
plt.hist(normal_score, bins=10, alpha=0.5, label='Normal')
plt.hist(anomaly_scores, bins=10, alpha=0.5, label='Anomaly')
plt.legend()
plt.show()



# Different number

In [None]:
from torchvision.transforms import Normalize,ToTensor, Compose
from torchvision.datasets import MNIST
from torch.utils.data import Subset

## Extract a 2 from the test dataset
number = 2
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
test2_dataset = MNIST('data/', train = False, download = True, transform=transform)
test2_idx = torch.where((test2_dataset.targets == number))[0]
test2_dataset = Subset(test2_dataset, test2_idx)

X, y = zip(*test2_dataset)
X = torch.stack(X).reshape(-1, 28*28)
y = torch.tensor(y).flatten()
y_score = exp.model.score_samples(X)
# y_pred = np.zeros_like(y_score, dtype=np.int)
# y_pred[y_score > 3] = 1
# y_score = model(X).detach()[:,1]

plt.hist(y_score, bins=10, alpha=.3, label='normal')
plt.show()

In [None]:
x_hat = exp.model.predict(X)
plt.hist(x_hat, bins=3)
plt.show()

In [None]:
y_pred = y_score > 1e-2
np.unique(y_pred), np.bincount(y_pred)

In [None]:
import numpy as np
from scipy import stats

# Tus datos
data1 = score[_y_test==1]
data2 = score[_y_test==-1]

# Realizar la prueba t
t_stat, p_value = stats.ttest_ind(data1, data2)

print(f"t-statistic: {t_stat}")
print(f"p-value: {p_value}")

# MedMNIST Test

In [None]:
import os, sys
from torchvision import transforms
project_dir = os.path.join(os.getcwd(),'..')
if project_dir not in sys.path:
    sys.path.append(project_dir)

from dataset.medmnist import AnomalyPneumoniaMNIST

In [None]:
from dataset import AnomalyPneumoniaMNIST
from matplotlib import pyplot as plt
from torchvision import transforms
import random
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[.5], std=[.5]),
])

# Load the dataset
# create a random seed
seed = 128
train_dataset = AnomalyPneumoniaMNIST('data/', download=True, transform=data_transform, n_normal_samples=-1, known_anomalies=0.2, pollution=0.0, seed=seed)
print(train_dataset)

train_dataset.montage(5, 5, seed)
plt.show()

In [None]:
x_train, y_train = zip(*[(_x, _y) for _x, _y in train_dataset])
x_train, y_train = torch.stack(x_train), torch.tensor(y_train)

_x_train = x_train[y_train==0].view(-1, 28*28).numpy()
_y_train = y_train[y_train==0].numpy()

model = OneClassSVM(kernel='rbf', gamma=1e-2, nu=1e-3)
model.fit(_x_train)

# Predict
x_test, y_test = zip(*[(_x, _y) for _x, _y in train_dataset])
x_test, y_test = torch.stack(x_test), torch.tensor(y_test)

_y_test = y_test.numpy()
_y_test[_y_test == 1] = -1
_y_test[_y_test == 0] = 1

x_test = x_test.reshape(-1, 28*28)
_y_pred = model.score_samples(x_test)

# Evaluate
from sklearn.metrics import roc_curve, auc
fpr, tpr, _ = roc_curve(_y_test, _y_pred)
roc_auc = auc(fpr, tpr)

# Plot
with plt.style.context(("seaborn-colorblind")):
    fig = plt.figure(figsize=(6, 4))
    plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate', fontsize='x-large')
    plt.ylabel('True Positive Rate', fontsize='x-large')
    plt.legend(loc="lower right")

    plt.tick_params(axis='both', which='major', labelsize='large')

    plt.show()

In [None]:
score = model.score_samples(x_test)
plt.hist(score, 25)
plt.show()

from sklearn.metrics import roc_curve

# Asumiendo que tienes las puntuaciones de predicción de tu modelo en y_scores
fpr, tpr, thresholds = roc_curve(y_test, score)
auc_score = auc(fpr, tpr)

plt.plot(fpr, tpr)
plt.show()

In [None]:
plt.hist(score[torch.argwhere(y_test==1).squeeze()], bins=25, alpha=.5, label='normal')
plt.hist(score[torch.argwhere(y_test==-1).squeeze()], bins=25, alpha=.5, label='anomaly')
plt.legend()
plt.show()

In [None]:
# Calcular la diferencia entre TPR y FPR para cada umbral
differences = tpr - fpr

# Encontrar el índice del umbral que maximiza la diferencia
optimal_threshold_index = np.argmax(differences)

# Obtener el umbral óptimo
optimal_threshold = thresholds[optimal_threshold_index]

print("Optimal threshold:", optimal_threshold)

In [None]:
import numpy as np
from scipy import stats

# Tus datos
data1 = score[_y_test==1]
data2 = score[_y_test==-1]

# Realizar la prueba t
t_stat, p_value = stats.ttest_ind(data1, data2)

print(f"t-statistic: {t_stat}")
print(f"p-value: {p_value}")

# Test

In [None]:
from matplotlib import pyplot as plt
x_test, y_test = zip(*exp.test_dataset)
x_test, y_test = torch.stack(x_test), torch.tensor(y_test)
y_test = np.where(y_test.numpy()==0, 1, -1)
x_test = x_test.reshape(-1, 28*28)

score = exp.model.score_samples(x_test)
plt.hist(score[np.argwhere(y_test==1).squeeze()], bins=10, alpha=.3, label='normal', density=False)
plt.hist(score[np.argwhere(y_test==-1).squeeze()], bins=10, alpha=.3, label='anomaly', density=False)
plt.legend()
plt.show()

In [None]:
y_pred = exp.model.predict(x_test)
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test, y_pred)
print(cm)

In [None]:
from torchvision.transforms import Normalize,ToTensor, Compose
from torchvision.datasets import MNIST
from torch.utils.data import Subset

## Extract a 2 from the test dataset
number = 0
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
test2_dataset = MNIST('data/', train = False, download = True, transform=transform)
test2_idx = torch.where((test2_dataset.targets == number))[0]
test2_dataset = Subset(test2_dataset, test2_idx)

X, y = zip(*test2_dataset)
X = torch.stack(X).reshape(-1, 28*28)
y = torch.tensor(y).flatten()
y_score = exp.model.score_samples(X)
# y_pred = np.zeros_like(y_score, dtype=np.int)
# y_pred[y_score > 3] = 1
# y_score = model(X).detach()[:,1]

plt.hist(y_score, bins=10, alpha=.3, label='normal')
plt.show()

In [None]:
x_hat = exp.model.predict(X)
plt.hist(x_hat, bins=3)
plt.show()

In [None]:
exp.train_dataset