In [None]:
from ucimlrepo import fetch_ucirepo 
import pandas as pd
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import copy
import torch.nn.functional as F

In [None]:
# Losowo generowac zbiory ze zwracaniem ??

In [None]:
spambase = fetch_ucirepo(id=94) 
   
X = spambase.data.features 
y = spambase.data.targets

X["target"] = y.values

In [None]:
ionosphere = fetch_ucirepo(id=52) 
  
X = ionosphere.data.features 
y = ionosphere.data.targets.replace({'g': 1, 'b': 0})

X["target"] = y.values

In [None]:
# What is normal and what is anomaly
X_normal = X[X["target"] == 1].drop("target", axis=1)
X_anomaly = X[X["target"] == 0].drop("target", axis=1)

print(f"NORMAL SIZE: {len(X_normal)}")
print(f"ANOMALY SIZE: {len(X_anomaly)}")

# Lekko zaszumione no_spam
X_normal_noise = X_anomaly.sample(frac=0.16)
X_normal_noise = pd.concat([X_normal, X_normal_noise])

print(f"NORMAL NOISED SIZE: {len(X_normal_noise)}")

# Isolation Forest

In [None]:
# Train Isolation Forest
clf = IsolationForest(random_state=42)
clf.fit(X_normal_noise.values)

In [None]:
y_pred = clf.predict(X_normal.values)
y_pred = np.where(y_pred == -1, 0, 1)
print(f"Liczba normalnych: {np.sum(y_pred)} wśród ogółu {len(y_pred)} ANOMALIE={len(y_pred)-np.sum(y_pred)}")

In [None]:
y_pred = clf.predict(X_anomaly.values)
y_pred = np.where(y_pred == -1, 0, 1)
print(f"Liczba normalnych: {np.sum(y_pred)} wśród ogółu {len(y_pred)} ANOMALIE={len(y_pred)-np.sum(y_pred)}")

# OneClassSVM 

In [None]:
clf = OneClassSVM(gamma='auto', kernel="rbf").fit(X_normal_noise.values)

In [None]:
y_pred = clf.predict(X_normal.values)
y_pred = np.where(y_pred == -1, 0, 1)
print(f"Liczba normalnych: {np.sum(y_pred)} wśród ogółu {len(y_pred)} ANOMALIE={len(y_pred)-np.sum(y_pred)}")

In [None]:
y_pred = clf.predict(X_anomaly.values)
y_pred = np.where(y_pred == -1, 0, 1)
print(f"Liczba normalnych: {np.sum(y_pred)} wśród ogółu {len(y_pred)} ANOMALIE={len(y_pred)-np.sum(y_pred)}")

# Our Solution

In [None]:
class Perceptron(nn.Module):
    def __init__(self, input_dim):
        super(Perceptron, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),

            nn.Linear(256, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),

            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)

class OneClassClassifier():
    def __init__(self, df, n_of_splits, model_type="nn", batch_size=32):
        self.n_of_splits = n_of_splits
        self.df = df
        self.subsets = np.array_split(df, n_of_splits)
        self.input_dim = df.shape[1]
        self.batch_size = batch_size

        if model_type == "nn":
            self.models = [Perceptron(self.input_dim) for _ in range(n_of_splits)]
        elif model_type == "rf":
            self.models = []
        else:
            raise("Invalid model param!")
        self.model_type = model_type

    def prepare_data_nn(self, idx):
        subsets = copy.deepcopy(self.subsets)
        for i, subset in enumerate(subsets):
            if i == idx:
                subset["target"] = 0
            else:
                subset["target"] = 1
        combined_df = pd.concat(subsets, ignore_index=True)

        features = torch.Tensor(combined_df.drop('target', axis=1).values)
        targets = torch.Tensor(combined_df["target"].values)

        dataset = TensorDataset(features, targets)
        dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)
        return dataloader
    
    def prepare_data_rf(self, idx):
        subsets = copy.deepcopy(self.subsets)
        for i, subset in enumerate(subsets):
            if i == idx:
                subset["target"] = 0
            else:
                subset["target"] = 1
        combined_df = pd.concat(subsets, ignore_index=True)

        # Return features and targets
        return combined_df.drop('target', axis=1).values, torch.Tensor(combined_df["target"].values)

    def train_models(self, verbose=False):
        if self.model_type == "nn":
            for idx, model in enumerate(self.models):
                model.train()
                dataloader = self.prepare_data_nn(idx)
                criterion = nn.BCELoss()
                optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
                for epoch in range(10):
                    for inputs, labels in dataloader:
                        optimizer.zero_grad()
                        outputs = model(inputs)
                        loss = criterion(outputs, labels.unsqueeze(1))
                        loss.backward()
                        optimizer.step()
                    if verbose:
                        print(f'Model {idx+1}: Epoch {epoch+1}, Loss: {loss.item():.4f}')
        if self.model_type == "rf":
            for idx in range(self.n_of_splits):
                X, y = self.prepare_data_rf(idx)
                clf = RandomForestClassifier(max_depth=4, random_state=0)
                clf.fit(X, y)
                self.models.append(clf)
                if verbose:
                    print(f"Model {idx+1} is ready (RandomForest)")
        
    def predict_nn(self, input_sample, threshold=0.5):
        for model in self.models:
            model.eval()
        predictions = [model(input_sample) for model in self.models]

        average_prediction = torch.mean(torch.stack(predictions), dim=0)
        predicted_class = (average_prediction > threshold).int()

        # Return avg_probability, predicted_class based on threshold
        return average_prediction, predicted_class.item()


one_class_classifier = OneClassClassifier(
    n_of_splits = 3,
    df = X_normal_noise,
    model_type="rf"
)
one_class_classifier.train_models()

In [None]:
# NN
# 1 -> normal
# 0 -> anomaly
results = []
probabilities = []
for sample in X_normal.values:
    input_sample=torch.Tensor(sample).reshape(1, -1)

    for model in one_class_classifier.models:
        model.eval()
    predictions = [model(input_sample) for model in one_class_classifier.models]

    average_prediction = torch.mean(torch.stack(predictions), dim=0)
    predicted_class = (average_prediction > 0.6).int()
    results.append(predicted_class.item())
    probabilities.append(average_prediction.item())
sum(results) / len(results)

In [None]:
# RF
sample = X_anomaly.values
probs = [model.predict_proba(sample) for model in one_class_classifier.models]
average_probs = np.mean(probs, axis=0)
threshold = 0.65
predictions = [1 if prob[1] >= threshold else 0 for prob in average_probs]
sum(predictions) / len(predictions)

In [None]:
average_probs