# 1. Ładowanie danych

### Rozpakowywanie danych (po pierwszym uruchomieniu)

In [None]:
import os
import zipfile

DATA_PATH = 'data'
ZIP_PATH = 'data.zip'

if not os.path.isdir(DATA_PATH):
    print(f"'{DATA_PATH}' directory not found. Trying to unzip '{ZIP_PATH}'...")
    
    if os.path.isfile(ZIP_PATH):
        with zipfile.ZipFile(ZIP_PATH, 'r') as zip_ref:
            zip_ref.extractall('.')
        print(f"Unzipped '{ZIP_PATH}'")
    else:
        raise FileNotFoundError(f"Neither '{DATA_PATH}' folder nor '{ZIP_PATH}' found.")



'data' directory not found. Trying to unzip 'data.zip'...
Unzipped 'data.zip' into 'data/'


In [None]:
import pandas as pd


def load_data(dataset: str) -> pd.DataFrame:
    if dataset not in [f.split('.')[0] for f in os.listdir(DATA_PATH)]:
        raise FileNotFoundError(f"Dataset {dataset} not available.")

    return pd.read_csv(os.path.join(DATA_PATH, f"{dataset}.csv"))


In [9]:
DATASET = 'creditcard'

df = load_data(DATASET)

## Data info

In [5]:
print(df.head(5))

   Time        V1        V2        V3        V4        V5        V6        V7  \
0   0.0 -1.359807 -0.072781  2.536347  1.378155 -0.338321  0.462388  0.239599   
1   0.0  1.191857  0.266151  0.166480  0.448154  0.060018 -0.082361 -0.078803   
2   1.0 -1.358354 -1.340163  1.773209  0.379780 -0.503198  1.800499  0.791461   
3   1.0 -0.966272 -0.185226  1.792993 -0.863291 -0.010309  1.247203  0.237609   
4   2.0 -1.158233  0.877737  1.548718  0.403034 -0.407193  0.095921  0.592941   

         V8        V9  ...       V21       V22       V23       V24       V25  \
0  0.098698  0.363787  ... -0.018307  0.277838 -0.110474  0.066928  0.128539   
1  0.085102 -0.255425  ... -0.225775 -0.638672  0.101288 -0.339846  0.167170   
2  0.247676 -1.514654  ...  0.247998  0.771679  0.909412 -0.689281 -0.327642   
3  0.377436 -1.387024  ... -0.108300  0.005274 -0.190321 -1.175575  0.647376   
4 -0.270533  0.817739  ... -0.009431  0.798278 -0.137458  0.141267 -0.206010   

        V26       V27       V28 

## Preprocessing


In [6]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

DROP_COLUMNS = {
    "creditcard": ['Time']
}
PREPROCESS_DATA = {
    "creditcard": lambda df: preprocess_creditcard(df),
}
SEED = 42
TEST_SIZE = 0.2


def preprocess_creditcard(df: pd.DataFrame) -> pd.DataFrame:
    scaler = StandardScaler()
    df['Amount'] = scaler.fit_transform(df[['Amount']])

    return df


def preprocess_data(df: pd.DataFrame, dataset_name: str, return_unsplit=False) -> list:
    df = df.drop(columns=DROP_COLUMNS[dataset_name])
    df = PREPROCESS_DATA[dataset_name](df)

    X = df.drop('Class', axis=1)
    y = df['Class']

    if not return_unsplit:
        return train_test_split(
            X, y,
            test_size=TEST_SIZE,
            random_state=SEED,
            stratify=y
        )
    return X, y


In [7]:
from sklearn.utils.validation import check_array

X_train, X_test, y_train, y_test = preprocess_data(df, DATASET)

X_train_np = check_array(X_train)
X_test_np = check_array(X_test)
y_test_np = y_test.values

# Outliers percentage
true_contamination = y_train.mean()
print("\nTraining set info:")
print(f"Shape: {X_train.shape}")
print(f"Fraud percentage: {y_train.mean()*100:.4f}%")

print("\nTest set info:")
print(f"Shape: {X_test.shape}")
print(f"Fraud percentage: {y_test.mean()*100:.4f}%")




Training set info:
Shape: (227845, 29)
Fraud percentage: 0.1729%

Test set info:
Shape: (56962, 29)
Fraud percentage: 0.1720%


## Anomaly Detectors

In [8]:
from abc import ABC, abstractmethod
import numpy as np


class BaseAnomalyDetector(ABC):
    def __init__(self, contamination=0.01, n_neighbors=20, metric='euclidean'):
        self.contamination = contamination
        self.n_neighbors = n_neighbors
        self.metric = metric

    @abstractmethod
    def fit(self, X):
        pass

    @abstractmethod
    def score_samples(self, X):
        pass

    def predict(self, X):
        scores = self.score_samples(check_array(X))
        threshold = np.percentile(scores, 100 * (1 - self.contamination))
        return np.where(scores >= threshold, 1, 0)


In [9]:
from sklearn.neighbors import LocalOutlierFactor, NearestNeighbors
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.utils.validation import check_array


class LocalAnomalyDetector(BaseAnomalyDetector):
    def fit(self, X):
        self.X_train_ = check_array(X)
        self.lof_ = LocalOutlierFactor(
            n_neighbors=self.n_neighbors,
            metric=self.metric,
            contamination=self.contamination,
            novelty=True
        )
        self.lof_.fit(self.X_train_)
        return self

    def score_samples(self, X):
        X = check_array(X)
        return -self.lof_.score_samples(X)


class GlobalAnomalyDetector(BaseAnomalyDetector):
    def fit(self, X):
        self.X_train_ = check_array(X)
        self.nn_ = NearestNeighbors(
            n_neighbors=self.n_neighbors,
            metric=self.metric
        )
        self.nn_.fit(self.X_train_)
        return self

    def score_samples(self, X):
        X = check_array(X)
        distances, _ = self.nn_.kneighbors(X)
        return distances.mean(axis=1)


class BaseIsolationForest(BaseAnomalyDetector):
    def __init__(self, contamination=0.01, n_estimators=100, max_samples='auto'):
        self.contamination = contamination
        self.n_estimators = n_estimators
        self.max_samples = max_samples
        self.model = IsolationForest(
            contamination=contamination,
            n_estimators=n_estimators,
            max_samples=max_samples,
        )
        self.threshold_ = None

    def fit(self, X):
        X = check_array(X)
        self.model.fit(X)
        scores = self.score_samples(X)
        self.threshold_ = np.percentile(scores, 100 * self.contamination)
        return self

    def score_samples(self, X):
        X = check_array(X)
        return -self.model.score_samples(X)

    def predict(self, X):
        scores = self.score_samples(check_array(X))
        return (scores >= self.threshold_).astype(int)

class OneClassSVMDetector(BaseAnomalyDetector):
    def __init__(self, contamination=0.01, kernel='rbf', nu='auto', gamma='scale'):
        self.kernel = kernel
        self.gamma = gamma
        self.nu = nu
        self.contamination = contamination
        self.model = None
        self.threshold_ = None

    def fit(self, X):
        X = check_array(X)

        nu_val = np.mean(X, axis=0) if self.nu == 'auto' else self.nu
        if self.nu == 'auto':
            nu_val = self.contamination

        self.model = OneClassSVM(kernel=self.kernel, gamma=self.gamma, nu=nu_val)
        self.model.fit(X)

        scores = self.score_samples(X)
        self.threshold_ = np.percentile(scores, 100 * self.contamination)
        return self

    def score_samples(self, X):
        X = check_array(X)
        return -self.model.score_samples(X)

    def predict(self, X):
        scores = self.score_samples(check_array(X))
        return (scores >= self.threshold_).astype(int)
        

## Test comparison on fixed values

In [13]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import (
    classification_report,
    roc_auc_score,
    precision_recall_curve,
    average_precision_score,
    roc_curve
)
from sklearn.utils.validation import check_array
from typing import Dict, Any


def compute_threshold(scores: np.ndarray, contamination: float) -> float:
    return np.percentile(scores, 100 * (1 - contamination))


def get_predictions(scores: np.ndarray, threshold: float) -> np.ndarray:
    return (scores >= threshold).astype(int)


def recall_at_fixed_fpr(y_true: np.ndarray, scores: np.ndarray, fpr_threshold: float) -> float:
    fpr, tpr, _ = roc_curve(y_true, scores)
    return tpr[fpr <= fpr_threshold][-1] if any(fpr <= fpr_threshold) else 0


def calculate_metrics(
    y_true: np.ndarray,
    preds: np.ndarray,
    scores: np.ndarray
) -> Dict[str, float]:
    report = classification_report(y_true, preds, output_dict=True, zero_division=0)

    metrics = {
        'Precision': report['1']['precision'],
        'Recall': report['1']['recall'],
        'F1': report['1']['f1-score'],
        'AUROC': roc_auc_score(y_true, scores),
        'AUPRC': average_precision_score(y_true, scores),
        'Recall@5%FP': recall_at_fixed_fpr(y_true, scores, 0.05)
    }
    return metrics


def plot_curves(
    y_true: np.ndarray,
    scores: np.ndarray,
    model_name: str,
    auroc: float,
    auprc: float
) -> None:
    precision, recall, _ = precision_recall_curve(y_true, scores)
    fpr, tpr, _ = roc_curve(y_true, scores)

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(fpr, tpr, label=f"AUROC = {auroc:.3f}")
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{model_name} ROC Curve')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(recall, precision, label=f"AUPRC = {auprc:.3f}")
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title(f'{model_name} Precision-Recall Curve')
    plt.legend()

    plt.tight_layout()
    plt.show()


def evaluate_model(
    model: Any,
    X_test: np.ndarray,
    y_test: np.ndarray,
    model_name: str
) -> Dict[str, float]:
    X_test = check_array(X_test)
    scores = model.score_samples(X_test)

    threshold = compute_threshold(scores, model.contamination)
    preds = get_predictions(scores, threshold)

    metrics = calculate_metrics(y_test, preds, scores)

    plot_curves(y_test, scores, model_name, metrics['AUROC'], metrics['AUPRC'])

    metrics['Model'] = model_name
    return metrics


In [None]:
def compare_models(X_train, X_test, y_test, contamination):
    X_train_normal = X_train[y_train == 0]

    models = {
        'LOF (local)': LocalAnomalyDetector(
            contamination=contamination,
            n_neighbors=1000
        ).fit(X_train_normal),

        'Global Distance': GlobalAnomalyDetector(
            contamination=contamination,
            n_neighbors=20
        ).fit(X_train),

        'Base (Isolation Forest)': BaseIsolationForest(
            contamination=contamination,
        ).fit(X_train),

        'One-Class SVM': OneClassSVMDetector(
            contamination=contamination,
            kernel='rbf',
            nu='auto',
            gamma='scale'
        ).fit(X_train_normal)
    }

    results = []
    for name, model in models.items():
        print(f"\nScoring {name}...")
        results.append(evaluate_model(model, X_test, y_test, name))

    return pd.DataFrame(results)


print(f"Contamination: {true_contamination:.4f}")

# Comparison
results_df = compare_models(X_train_np, X_test_np, y_test_np, true_contamination)

print("\nFinal Comparison:")
display(results_df.round(4))

Contamination: 0.0017


## Cross Validation

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import (
    precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score
)
import numpy as np
import pandas as pd


def cross_validate_neighbors(X, y, contamination, neighbor_values, n_splits=5):
    results = []

    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

    for n in neighbor_values:
        print(f"\nCross-validating for n_neighbors = {n}...")

        metrics_lof = []
        metrics_global = []

        for train_idx, test_idx in skf.split(X, y):
            X_train, X_test = X[train_idx], X[test_idx]
            y_train, y_test = y[train_idx], y[test_idx]

            X_train_normal = X_train[y_train == 0]

            lof_model = LocalAnomalyDetector(
                contamination=contamination,
                n_neighbors=n
            ).fit(X_train_normal)

            global_model = GlobalAnomalyDetector(
                contamination=contamination,
                n_neighbors=n
            ).fit(X_train)

            for model, name, metrics in [
                (lof_model, "LOF", metrics_lof),
                (global_model, "Global", metrics_global)
            ]:
                y_pred = model.predict(X_test)
                y_scores = model.score_samples(X_test)

                precision = precision_score(y_test, y_pred, zero_division=0)
                recall = recall_score(y_test, y_pred, zero_division=0)
                f1 = f1_score(y_test, y_pred, zero_division=0)

                try:
                    auroc = roc_auc_score(y_test, y_scores)
                except ValueError:
                    auroc = np.nan
                try:
                    auprc = average_precision_score(y_test, y_scores)
                except ValueError:
                    auprc = np.nan

                r5fp = recall_at_fixed_fpr(y_test, y_scores, 0.05)

                metrics.append({
                    'Precision': precision,
                    'Recall': recall,
                    'F1': f1,
                    'AUROC': auroc,
                    'AUPRC': auprc,
                    'Recall@5%FP': r5fp
                })

        def summarize(metrics_list):
            df = pd.DataFrame(metrics_list)
            return df.mean().to_dict()

        lof_summary = summarize(metrics_lof)
        global_summary = summarize(metrics_global)

        results.append({
            'n_neighbors': n,
            'Model': 'LOF',
            **lof_summary
        })
        results.append({
            'n_neighbors': n,
            'Model': 'Global',
            **global_summary
        })

    return pd.DataFrame(results)


In [21]:
df = load_data(DATASET)
X, y = preprocess_data(df, DATASET, return_unsplit=True)

In [23]:
neighbor_vals = [5, 10, 50, 100]

cv_df = cross_validate_neighbors(
    X=X.to_numpy(),
    y=y.to_numpy(),
    contamination=true_contamination,
    neighbor_values=neighbor_vals,
    n_splits=5
)

display(cv_df.round(4))



Cross-validating for n_neighbors = 5...

Cross-validating for n_neighbors = 10...

Cross-validating for n_neighbors = 50...

Cross-validating for n_neighbors = 100...


Unnamed: 0,n_neighbors,Model,Precision,Recall,F1,AUROC,AUPRC,Recall@5%FP
0,5,LOF,0.0017,1.0,0.0035,0.763,0.0084,0.3824
1,5,Global,0.0016,0.9025,0.0031,0.9451,0.0621,0.7987
2,10,LOF,0.0017,1.0,0.0035,0.6505,0.0046,0.2276
3,10,Global,0.0015,0.8882,0.0031,0.9562,0.0854,0.8618
4,50,LOF,0.0017,1.0,0.0035,0.9273,0.0288,0.7702
5,50,Global,0.0014,0.8294,0.0029,0.9584,0.13,0.8719
6,100,LOF,0.0013,0.7338,0.0025,0.9425,0.1847,0.8435
7,100,Global,0.0014,0.7826,0.0027,0.9591,0.1491,0.8719
