In [None]:
import pandas as pd
import numpy as np
from numpy.random import Generator

from load import (file_exists, RAW_FILE, FILE_NAMES, load_dataset, split_dataset)
from util.data_keys import Datakeys as dk

from os import cpu_count
from typing import Any
from time import time

In [None]:
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier
from sklearn.naive_bayes import GaussianNB, BernoulliNB
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.preprocessing import StandardScaler

In [None]:
tmp_cores: int | None = cpu_count()
CORES: int = 4 if tmp_cores is None else tmp_cores
RANDOM_SEED: int = 1137
RNG: Generator = np.random.default_rng(RANDOM_SEED)
TRAIN_SIZE: float = 0.8

ML_FILE_PATH: str = "data/ml"
MANIPULATED_PATH: str = "data/manipulated"

In [None]:
if not file_exists(FILE_NAMES[0]):
        split_dataset()

In [None]:
def get_models_dict() -> dict:
    return {
        "Random Forest": RandomForestClassifier,
        "Gradient Boost": GradientBoostingClassifier,
        "SGD Classifier": SGDClassifier,
        "Benoulli NB": BernoulliNB,
        "Linear SVC": LinearSVC,
        "Gaussian NB": GaussianNB
    }

In [None]:
def normalize_df(df: pd.DataFrame) -> pd.DataFrame:
    for column in df.columns:
        if (column == dk.ATTACK_TYPE.value):
            continue

        df[column] = (df[column] - df[column].mean()) / df[column].std()

    return df

In [None]:
def clean_up_df(df: pd.DataFrame) -> pd.DataFrame:
    return df.drop(
        columns=[
            # string column
            dk.INTENSITY.value,

            # answer columns
            dk.NUMBER_OF_BLACK_HOLES.value,
            dk.BLACK_HOLE_SWAP_PROB.value,
            dk.TARGETS_PER_BLACK_HOLE.value,

            # Constant columns
            dk.REQUESTS.value,
            dk.PARAMETER.value,
            dk.TOPOLOGY.value,
            dk.TOTAL_NO_PATHS.value,
            dk.NUMBER_OF_NODES.value,

            # Redundant features
            dk.TOTAL_REQUEST_FAILS.value, # TOTAL_REQUEST_SUCCESS
            dk.TOTAL_SWAPPING_FAILS.value, # TOTAL_SWAPPING_FAILS

            # feature don't worry
            dk.SIMULATION_TIME.value,
        ]
    )

In [None]:
all_df: pd.DataFrame = load_dataset(RAW_FILE)

all_normal_df: pd.DataFrame = all_df.loc[
    all_df[dk.NUMBER_OF_BLACK_HOLES.value] == 0
].sample(10_000, random_state=RANDOM_SEED)

all_normal_df = clean_up_df(all_normal_df)

### My cross validation

In [None]:
def my_cross_validation(models: dict[str, Any], X: pd.DataFrame, y: pd.Series, train_propotion: float, rng: Generator, rounds: int = 5) -> pd.DataFrame:

    all_results: list[dict] = []

    for model_name, model_constructor in models.items():
        start: float = time()
        print(f"Cross validation of the model: {model_name}")

        for _ in range(rounds):
            # rng
            indices: np.ndarray = rng.permutation(len(X))
            limit_point: int = int(len(X) * train_propotion)

            # get rng indices
            indices_train: np.ndarray = indices[:limit_point]
            indices_test: np.ndarray  = indices[limit_point:]

            # split data to train and test
            x_train: pd.DataFrame = X.iloc[indices_train].copy()
            y_train: pd.Series = y.iloc[indices_train].copy()

            x_test: pd.DataFrame = X.iloc[indices_test].copy()
            y_test: pd.Series = y.iloc[indices_test].copy()

            # normalize the data
            scaler: StandardScaler = StandardScaler(copy=False)
            x_train = scaler.fit_transform(x_train) # type: ignore
            # avoid data leaking
            x_test = scaler.transform(x_test)       # type: ignore

            # fit the model
            model: Any = model_constructor()
            model.fit(x_train, y_train)

            # collect the predict
            y_pred: np.ndarray = model.predict(x_test)

            # calculate the scores
            scores: dict = {
                "Model": model_name,
                "Accuracy": accuracy_score(y_test, y_pred),
                "F1": f1_score(y_test, y_pred, average='macro', zero_division=0),
                "Precision": precision_score(y_test, y_pred, average='macro', zero_division=0),
                "Recall": recall_score(y_test, y_pred, average='macro', zero_division=0)
            }

            all_results.append(scores)
 
        print(f"Model {model_name} finish: {time()-start}")
    
    df_results: pd.DataFrame = pd.DataFrame(all_results)

    mean_result: pd.DataFrame = df_results.groupby("Model").mean(numeric_only=True).reset_index()

    return mean_result

In [None]:
for file_name in FILE_NAMES:
    current_file_name: str = MANIPULATED_PATH + "/" + file_name
    print("-"*60)
    print(current_file_name)

    attack_df: pd.DataFrame = load_dataset(current_file_name)
    attack_df = clean_up_df(attack_df)

    normal_df: pd.DataFrame = all_normal_df.sample(len(attack_df), random_state=RNG)

    analyzis_df: pd.DataFrame = pd.concat(objs=[attack_df, normal_df])

    X: pd.DataFrame = analyzis_df.drop(columns=dk.ATTACK_TYPE.value)
    y: pd.Series = analyzis_df[dk.ATTACK_TYPE.value]

    models: dict = get_models_dict()
    df_results: pd.DataFrame = my_cross_validation(models, X, y, TRAIN_SIZE, RNG)
    
    df_results.to_csv(ML_FILE_PATH + "/" + file_name, mode="w", index=False)
