In [None]:
import warnings #suppress warnings
import torch
import random
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from scipy.io import arff
from pyod.models.anogan import AnoGAN
from pyod.models.mo_gaal import MO_GAAL
from pyod.models.lof import LOF
from pyod.models.gmm import GMM
from pyod.models.knn import KNN
from pyod.models.so_gaal import SO_GAAL
import sklearn.metrics as sk

warnings.simplefilter("ignore")

In [None]:
class CustomDataset(Dataset):
    def __init__(self, path):
        # start preprocessing 
        arff_data = arff.loadarff(path)
        df = pd.DataFrame(arff_data[0])
        # 1 is outlier, 0 is normal data
        df["outlier"] = pd.factorize(df["outlier"], sort=True)[0]
        
        self.data_tensor = torch.tensor(df.to_numpy()).float()
        self.data_numpy = df.to_numpy()
        #end preprocessing

        
    def __len__(self):
        return len(self.data_tensor)
    
    def __getitem__(self, i):
        return self.data_tensor[i]

In [None]:
internet_ads_path = "./Resources/Datasets/InternetAds_withoutdupl_norm_02_v01.arff" #invert outlier-normal-labels
arrythmia_path = "./Resources/Datasets/Arrhythmia_withoutdupl_norm_02_v01.arff"
wave_path = "./Resources/Datasets/Waveform_withoutdupl_norm_v01.arff"

#seed = 777
#torch.manual_seed(seed)
#random.seed(seed)

#num_workers = 2
#batch_size = 128
#number of used GPUs
#gpu = 0 
#usedDevice = torch.device("cpu" if gpu == 0 else "cuda")

dataset = CustomDataset(arrythmia_path)

data_no_label = dataset.data_numpy[:,:-2]
data_label = dataset.data_numpy[:, -1]

#train_set, eval_set, test_set = torch.utils.data.random_split(dataset.data_numpy[:,:-1], [0.6,0.2,0.2]) #PFUSCH WEGEN NUMPY?
#maybe data loader for each category?
#dataloader = DataLoader(dataset=dataset.data_tensor, batch_size = batch_size, shuffle=True, num_workers=num_workers)

In [None]:
def check_accuracy(decision_values, labels):
    # positive: anomaly
    tp = 0
    tn = 0
    fp = 0
    fn = 0
    
    for i in range(len(data_no_label)):
        if data_label[i] == labels[i]: # correct label
            if labels[i] == 1:
                tp += 1
            else:
                tn += 1
        else:
            if labels[i] == 1: # wrong label
                fp += 1
            else: 
                fn += 1
                
    print("------------------------------------------------------------")
    print("TP: " + str(tp))
    print("FP: " + str(fp))
    print("TN: " + str(tn))
    print("FN: " + str(fn))
    print("------------------------------------------------------------")
    print("Precision: " + str(tp/(tp+fp))) # When we declare a positive, how certain are we?
    print("Recall: " + str(tp/(tp+fn))) # How good are we at detecting the positives?
    print("Accuracy: " + str((tp+tn)/(tp+tn+fp+fn)))
    print("AUC: " + str(sk.roc_auc_score(data_label, decision_values)))

In [None]:
lof_model = LOF()
lof_model.fit(data_no_label)

In [None]:
decision_values = lof_model.decision_function(data_no_label)
check_accuracy(decision_values, lof_model.labels_)

In [None]:
#mogaal_model = MO_GAAL(lr_d = 0.01, momentum = 0.3)
#mogaal_model.fit(data_no_label)

In [None]:
#decision_values = mogaal_model.decision_function(data_no_label)
#check_accuracy(decision_values, mogaal_model.labels_)

In [None]:
anogan_model = AnoGAN()
anogan_model.fit(data_no_label)

In [None]:
#decision_values = anogan_model.decision_function(data_no_label)
check_accuracy(anogan_model.decision_scores_, anogan_model.labels_)

In [None]:
knn_model = KNN()
knn_model.fit(data_no_label)

In [None]:
decision_values = knn_model.decision_function(data_no_label)
check_accuracy(decision_values, knn_model.labels_)

In [None]:
sogaal_model = SO_GAAL()
sogaal_model.fit(data_no_label)

In [None]:
decision_values = sogaal_model.decision_function(data_no_label)
check_accuracy(decision_values, sogaal_model.labels_)