# Ml Studies

## Imports

In [None]:
from components.data_manager import Data_Manager
import components.network_data as nwd

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier
from sklearn.naive_bayes import GaussianNB, BernoulliNB
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.model_selection import train_test_split, cross_validate
from sklearn.feature_selection import mutual_info_classif

from os import cpu_count

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math
import time

## Constants

In [None]:
CSV_DEFAULT_SIM_PATH: str = "data/default_simulation/default_simulation_dataset.csv"
TABLE_PATH: str = "graphics/tables"
RANDOM_SEED: int = 1137

tmp_cores: int | None = cpu_count()
CORES: int = 4 if tmp_cores is None else tmp_cores

# strings to choice the section of dataset: "all", "no_target" or "with_target"
DATASET_PARTITION: str = "all"

## Collect Data

In [None]:
dm: Data_Manager = Data_Manager()
dm.load_csv(CSV_DEFAULT_SIM_PATH)

df: pd.DataFrame = dm.csv_to_dataframe()

## Select the dataset

In [None]:
DATASET_PARTITION = DATASET_PARTITION.lower()

# name of the results table
TABLE_NAME: str = f"{TABLE_PATH}/default_simulations-table-"

# all dataset
if DATASET_PARTITION == "all":
    TABLE_NAME += "all_dataset.csv"

# without targets
elif DATASET_PARTITION == "no_target":
    df: pd.DataFrame = df.loc[df[nwd.TARGETS_PER_BLACK_HOLE] == 0]
    TABLE_NAME += "no_target.csv"

# with targets
elif DATASET_PARTITION == "with_target":
    df: pd.DataFrame = df.loc[
        (df[nwd.TARGETS_PER_BLACK_HOLE] == 1) | (df[nwd.NUMBER_OF_BLACK_HOLES] == 0)
    ]
    TABLE_NAME += "with_target.csv"

else:
    raise Exception("No partition chosen.")

# to show numbers of simulations in dataset
len(df)

## Remove constants columns and answer columns

In [None]:
df = df.drop(
    columns=[
        # string column
        nwd.INTENSITY,

        # answer columns
        nwd.NUMBER_OF_BLACK_HOLES,
        nwd.BLACK_HOLE_SWAP_PROB,
        nwd.TARGETS_PER_BLACK_HOLE,

        # Constant columns
        nwd.REQUESTS,
        nwd.PARAMETER,
        nwd.TOPOLOGY,
        nwd.TOTAL_NO_PATHS,
        nwd.NUMBER_OF_NODES,

        # Redundant features
        nwd.TOTAL_REQUEST_FAILS, # TOTAL_REQUEST_SUCCESS
        nwd.TOTAL_SWAPPING_FAILS, # TOTAL_SWAPPING_FAILS

        # feature don't worry
        nwd.SIMULATION_TIME,
    ]
)

## Function to create models

In [None]:
def get_models_dict() -> dict:
    return {
        "Random Forest": RandomForestClassifier(62, n_jobs=CORES),
        "Gradient Boost": GradientBoostingClassifier(),
        "SGD Classifier": SGDClassifier(),
        "Benoulli NB": BernoulliNB(),
        "MLP": MLPClassifier(hidden_layer_sizes=(20, 30, 30), early_stopping=True, random_state=RANDOM_SEED),
        "Linear SVC": LinearSVC(),
        "Gaussian NB": GaussianNB()
    }

## Select Features

In [None]:
X: pd.DataFrame = df.drop(columns=nwd.ATTACK_TYPE)
y: pd.Series = df[nwd.ATTACK_TYPE]

mi_scores = mutual_info_classif(X, y)

features_names = X.columns
scores_series = pd.Series(mi_scores, index=features_names)
scores_series = scores_series.sort_values(ascending=False)

display(scores_series)

## Cleaning the X

In [None]:
# removing the features with lower mi score
X = X.drop(columns=[
    nwd.CONSUMED_EPRS,
    nwd.TOTAL_ROUTE_LENGTH,
    nwd.TOTAL_ENTANGLEMENT_ATTEMPTS,
    nwd.TOTAL_SWAPPING_ATTEMPTS,
])

## Split Data

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_SEED)

## Function to fit all algorithms

In [None]:
def fit_algorithms(models: dict, X_train: pd.DataFrame, y_train: pd.Series) -> None:
    for name, model in models.items():

        start = time.time()
        model.fit(X_train, y_train)

        print(f"{name} fit time: {time.time() - start}")

## Fit all algorithm

In [None]:
models: dict = get_models_dict()

fit_algorithms(models, X_train, y_train)

## Collect Scores without Cross Validation

In [None]:
def specificity_score(y_true: pd.Series | np.ndarray, y_pred: pd.Series | np.ndarray) -> float:
    """
    To analyze the True False percetage
    """

    cm: np.ndarray = confusion_matrix(y_true, y_pred)

    TN, FP, FN, TP = cm.ravel()
    return TN / (TN + FP) if TN + FP != 0 else 0

def predict_models_no_cross_validation(models: dict, X_test: pd.DataFrame, y_test: pd.Series) -> tuple[dict[str, dict], dict[str, np.ndarray]]:
    data: dict = {}
    confusion_matrices_models: dict = {}
    for name, model in models.items():
        print(f"Model: {name}")
        start = time.time()

        y_pred = model.predict(X_test)
        model_data: dict = {
            "f1 score": f1_score(y_test, y_pred),
            "accuracy": accuracy_score(y_test, y_pred),
            "precision score": precision_score(y_test, y_pred, zero_division=0),
            "recall score": recall_score(y_test, y_pred),
            "specificity": specificity_score(y_test, y_pred),
            "predict time": (time.time()-start)
        }

        confusion_matrices_models[name] = confusion_matrix(y_test, y_pred)
        data[name] = model_data

    return data, confusion_matrices_models

In [None]:
scores, confusion_matrices = predict_models_no_cross_validation(models, X_test, y_test)

print("Results")
for model_name, score in scores.items():
    print(f"{model_name}: {score}")

## Show confusion matrices

In [None]:
num_models = len(confusion_matrices)

columns: int = 3
rows: int = math.ceil(num_models / columns)

fig, axes = plt.subplots(rows, columns, figsize=(5*columns, 4*rows))

axes = axes.flatten()

for ax, (model_name, cm) in zip(axes, confusion_matrices.items()):
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False, ax=ax)
    ax.set_title(model_name)
    ax.set_xlabel('Predicted')
    ax.set_ylabel('Real Value')
    for ax in axes:
        for _, spine in ax.spines.items():
            spine.set_visible(True)
            spine.set_linewidth(1.5)
            spine.set_color('black')

for i in range(num_models, len(axes)):
    axes[i].axis('off')


plt.tight_layout()
plt.show()

## Show result data

In [None]:
results_df: pd.DataFrame = pd.DataFrame(scores).T
results_df.reset_index(inplace=True)
results_df_fi_time = results_df['predict time']
results_df = results_df.drop(columns='predict time')
results_df.rename(columns={'index': 'Model'}, inplace=True)

results_df_long = results_df.melt(id_vars="Model", var_name="Metrics", value_name="Value")

plt.figure(figsize=(10, 9))
sns.barplot(data=results_df_long, x="Model", y="Value", hue="Metrics")
plt.title("Models' Performance")
plt.ylim(0, 1)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.grid(True, color="gray", linestyle="--", alpha=0.5, axis='y')
plt.legend(title="Metrics")
plt.show()

## Make a cross validation training

In [None]:
from sklearn.metrics import make_scorer

def predict_models_with_cross_validation(models: dict, X: pd.DataFrame, y: pd.Series) -> tuple[dict[str, dict], dict[str, float]]:
    data: dict = {}
    time_results: dict = {}

    for name, model in models.items():
        print(f"Model: {name}")
        start = time.time()
        
        scoring = {
            'accuracy': 'accuracy', 
            'f1_score': make_scorer(f1_score, average='macro'),
            'precision_score': make_scorer(precision_score, zero_division=0),
            'recall_score': make_scorer(recall_score),
        }

        results: dict = cross_validate(model, X, y, cv=5, scoring=scoring)

        total_time = time.time() - start
        print(f"{name} cross time: {total_time}")
        
        time_results[name] = total_time
        data[name] = {test_name: result for test_name, result in results.items()}

    return data, time_results

In [None]:
models = get_models_dict()
results, time_results = predict_models_with_cross_validation(models, X, y)

In [None]:
mean_score_dict: dict[str, dict] = {}

for model_name, tmp_result in results.items():
    result_mean: dict = {}

    for name_result, array_result in tmp_result.items():
        result_mean[name_result] = array_result.mean()
        
        
    print(f"{model_name}: {result_mean}")
    mean_score_dict[model_name] = result_mean

## Show graphics from cross validation

In [None]:
results_df: pd.DataFrame = pd.DataFrame(mean_score_dict).T
results_df.reset_index(inplace=True)
results_df_fi_time = results_df["fit_time"]
results_df = results_df.drop(columns=["fit_time", "score_time"])
results_df.rename(columns={"index": "Model", "test_accuracy": "Accuracy", "test_f1_score": "F1", "test_precision_score" : "Precision", "test_recall_score": "Recall"}, inplace=True)

results_df_long = results_df.melt(id_vars="Model", var_name="Metrics", value_name="Value")

plt.figure(figsize=(10, 8))
sns.barplot(data=results_df_long, x="Model", y="Value", hue="Metrics")
plt.title("Models' Performance")
plt.ylim(0, 1)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.grid(True, color="gray", linestyle="--", alpha=0.5, axis='y')
plt.legend(title="Metrics")
plt.show()

## Saving graphics of cross validation

In [None]:
results_df.to_csv(TABLE_NAME)

## Time Graphic

In [None]:
models_name = list(time_results.keys())
time_results = list(time_results.values())

plt.figure(figsize=(8, 5))
plt.bar(models_name, time_results, color='skyblue')
plt.xlabel('Models')
plt.ylabel('Execution Time (s)')
plt.title('Execution Time per Model')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()