# Phase 4: Advanced Machine Learning Models


This notebook orchestrates advanced supervised models for cybersecurity attack classification and severity prediction. The workflow follows the Phase 4 implementation plan and builds on the cleaned feature sets produced during Phase 3.

## Objectives & Scope


- Configure a reproducible environment for advanced classifiers and regressors.
- Load standardized train, validation, and test splits with consistent data types.
- Prepare shared preprocessing pipelines (imputation, scaling, encoding) for downstream models.
- Persist dataset summaries and metadata for reporting and audit trails.

## Reproducibility & Artifacts



- Random seeds are fixed to ensure deterministic behaviour across executions.

- All data loading happens from the processed datasets directory without mutating source files.

- Derived summaries are exported to the `reports/` folder for later phases.


In [None]:
# Imports and global configuration
import warnings
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Markdown, display
from pandas.api.types import (
    is_categorical_dtype,
    is_numeric_dtype,
    is_string_dtype,
)
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    ConfusionMatrixDisplay,
    PrecisionRecallDisplay,
    RocCurveDisplay,
    accuracy_score,
    classification_report,
    precision_recall_fscore_support,
    roc_auc_score,
)
from sklearn.model_selection import (
    RandomizedSearchCV,
    StratifiedKFold,
    cross_validate,
)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.svm import SVC
from xgboost import XGBClassifier

# Matplotlib and seaborn configuration for consistent visuals
COLOR_PALETTE: List[str] = [
    "#c6d4e1",
    "#9bbcd4",
    "#6fa3c7",
    "#4a8ab8",
    "#2f6fa1",
    "#1f4f75",
    "#d3d3d3",
    "#a9a9a9",
    "#696969",
]

sns.set_theme(style="whitegrid", palette=COLOR_PALETTE)
mpl.rcParams["axes.prop_cycle"] = mpl.cycler(color=COLOR_PALETTE)

RANDOM_STATE: int = 42
np.random.seed(RANDOM_STATE)
warnings.filterwarnings("ignore", category=UserWarning)

DATA_DIR = Path("../cybersecurity_attacks_data/processed")
ARTIFACT_METRICS = Path("../models/phase4_metrics.csv")
ARTIFACT_FIGURES = Path("../reports/visualizations/phase4")
ARTIFACT_SUMMARY = Path("../reports/phase4_data_snapshot.md")

ARTIFACT_FIGURES.mkdir(parents=True, exist_ok=True)
ARTIFACT_SUMMARY.parent.mkdir(parents=True, exist_ok=True)


## Data Loading & Preparation
This section loads the preprocessed splits and performs lightweight validation before model training. We reuse the Phase 3 engineered features and extend them with model-specific augmentations when necessary.

In [None]:
def load_split(name: str) -> pd.DataFrame:
    """Load a processed CSV split from the Phase 3 preprocessing outputs."""
    path = DATA_DIR / f"{name}_data.csv"
    if not path.exists():
        raise FileNotFoundError(f"Split '{name}' not found at {path}")
    return pd.read_csv(path)


def summarize_split(df: pd.DataFrame, name: str) -> None:
    """Display a brief summary with shape and missingness diagnostics."""
    null_pct = df.isna().mean().sort_values(ascending=False)
    head = df.head(3)
    display(
        Markdown(
            f"### {name.title()} Split\n"
            f"- Shape: {df.shape[0]:,} rows × {df.shape[1]:,} columns\n"
            f"- Missing features (>0.01%): {null_pct[null_pct > 0.0001].index.tolist()}"
        )
    )
    display(head)


train_df = load_split("train")
val_df = load_split("val")
test_df = load_split("test")

summarize_split(train_df, "train")
summarize_split(val_df, "validation")
summarize_split(test_df, "test")

TARGET_COL = "attackSuccessful"
FEATURE_COLS = [col for col in train_df.columns if col != TARGET_COL]


## Feature Engineering & Pipelines
The preprocessing pipeline mirrors Phase 3 encoders while allowing advanced estimators to toggle optional feature transformations. This modular design supports experimentation with imbalance handling and interaction effects.

In [None]:
def infer_feature_types(df: pd.DataFrame) -> Tuple[List[str], List[str]]:
    """Infer categorical and numeric columns from the training frame."""
    categorical_cols: List[str] = []
    numeric_cols: List[str] = []
    for col in FEATURE_COLS:
        if is_numeric_dtype(df[col]):
            numeric_cols.append(col)
        elif is_categorical_dtype(df[col]) or is_string_dtype(df[col]):
            categorical_cols.append(col)
        else:
            categorical_cols.append(col)
    return numeric_cols, categorical_cols


def build_preprocessor(numeric_cols: List[str], categorical_cols: List[str]) -> ColumnTransformer:
    numeric_pipe = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]
    )
    categorical_pipe = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
        ]
    )
    return ColumnTransformer(
        transformers=[
            ("num", numeric_pipe, numeric_cols),
            ("cat", categorical_pipe, categorical_cols),
        ]
    )


NUMERIC_FEATURES, CATEGORICAL_FEATURES = infer_feature_types(train_df)
PREPROCESSOR = build_preprocessor(NUMERIC_FEATURES, CATEGORICAL_FEATURES)

train_X, train_y = train_df[FEATURE_COLS], train_df[TARGET_COL]
val_X, val_y = val_df[FEATURE_COLS], val_df[TARGET_COL]
test_X, test_y = test_df[FEATURE_COLS], test_df[TARGET_COL]


## Experiment Utilities
Helper functions standardize training, hyperparameter search, and evaluation output across models. Metrics are written to `models/phase4_metrics.csv` to align with the project plan.

In [None]:
MetricRow = Dict[str, float]


def ensure_metrics_store() -> pd.DataFrame:
    if ARTIFACT_METRICS.exists():
        return pd.read_csv(ARTIFACT_METRICS)
    return pd.DataFrame(
        columns=[
            "model",
            "validation_accuracy",
            "validation_precision",
            "validation_recall",
            "validation_f1",
            "validation_roc_auc",
            "test_accuracy",
            "test_precision",
            "test_recall",
            "test_f1",
            "test_roc_auc",
        ]
    )


def log_metrics(model_name: str, metrics: Dict[str, float]) -> None:
    store = ensure_metrics_store()
    row = {"model": model_name}
    row.update(metrics)
    store = pd.concat([store, pd.DataFrame([row])], ignore_index=True)
    store.to_csv(ARTIFACT_METRICS, index=False)


def evaluate_model(
    model_pipeline: Pipeline,
    model_name: str,
    train_X: pd.DataFrame,
    train_y: pd.Series,
    val_X: pd.DataFrame,
    val_y: pd.Series,
    test_X: pd.DataFrame,
    test_y: pd.Series,
    k_folds: int = 5,
) -> Dict[str, float]:
    cv = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=RANDOM_STATE)
    cv_results = cross_validate(
        model_pipeline,
        train_X,
        train_y,
        cv=cv,
        scoring=["accuracy", "f1", "precision", "recall", "roc_auc"],
        n_jobs=-1,
    )
    cv_summary = {
        f"cv_{metric.replace('test_', '')}": np.mean(scores)
        for metric, scores in cv_results.items()
        if metric.startswith("test_")
    }

    model_pipeline.fit(pd.concat([train_X, val_X]), pd.concat([train_y, val_y]))
    val_pred = model_pipeline.predict(val_X)
    test_pred = model_pipeline.predict(test_X)
    val_proba = model_pipeline.predict_proba(val_X)[:, 1]
    test_proba = model_pipeline.predict_proba(test_X)[:, 1]

    val_precision, val_recall, val_f1, _ = precision_recall_fscore_support(
        val_y, val_pred, average="binary", zero_division=0
    )
    test_precision, test_recall, test_f1, _ = precision_recall_fscore_support(
        test_y, test_pred, average="binary", zero_division=0
    )

    metrics = {
        "validation_accuracy": accuracy_score(val_y, val_pred),
        "validation_precision": val_precision,
        "validation_recall": val_recall,
        "validation_f1": val_f1,
        "validation_roc_auc": roc_auc_score(val_y, val_proba),
        "test_accuracy": accuracy_score(test_y, test_pred),
        "test_precision": test_precision,
        "test_recall": test_recall,
        "test_f1": test_f1,
        "test_roc_auc": roc_auc_score(test_y, test_proba),
    }
    metrics.update(cv_summary)
    log_metrics(model_name, metrics)
    display(Markdown(f"### {model_name}"))
    display(Markdown(classification_report(val_y, val_pred, output_dict=False)))
    return metrics


In [None]:
def plot_diagnostics(model, X: pd.DataFrame, y: pd.Series, model_name: str, suffix: str) -> None:
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))

    ConfusionMatrixDisplay.from_estimator(model, X, y, ax=axes[0], colorbar=False)
    axes[0].set_title(f"{model_name} Confusion Matrix ({suffix})")

    RocCurveDisplay.from_estimator(model, X, y, ax=axes[1])
    axes[1].set_title(f"{model_name} ROC Curve ({suffix})")

    PrecisionRecallDisplay.from_estimator(model, X, y, ax=axes[2])
    axes[2].set_title(f"{model_name} Precision-Recall ({suffix})")

    plt.tight_layout()
    figure_path = ARTIFACT_FIGURES / f"{model_name.lower().replace(' ', '_')}_{suffix}.png"
    fig.savefig(figure_path, dpi=200)
    plt.close(fig)


In [None]:
def append_summary_entry(model_name: str, metrics: Dict[str, float]) -> None:
    header = f"## {model_name}\n"
    lines = [header]
    for key, value in metrics.items():
        if key.startswith("cv_"):
            continue
        lines.append(f"- {key.replace('_', ' ').title()}: {value:.4f}\n")
    with open(ARTIFACT_SUMMARY, "a", encoding="utf-8") as f:
        f.writelines(lines + ["\n"])


## Model Experiments

### Balanced Logistic Regression
Revisits the linear baseline with tuned regularization and class weighting to establish a strong probabilistic reference.

In [None]:
logit_params = {
    "model__C": np.logspace(-3, 1, 8),
    "model__penalty": ["l2"],
    "model__solver": ["lbfgs"],
}

logit_pipeline = Pipeline(
    steps=[
        ("preprocessor", PREPROCESSOR),
        (
            "model",
            LogisticRegression(
                class_weight="balanced",
                max_iter=200,
                random_state=RANDOM_STATE,
            ),
        ),
    ]
)

logit_search = RandomizedSearchCV(
    estimator=logit_pipeline,
    param_distributions=logit_params,
    n_iter=10,
    scoring="roc_auc",
    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
    n_jobs=-1,
    random_state=RANDOM_STATE,
)

logit_search.fit(train_X, train_y)
logit_best = logit_search.best_estimator_
logit_metrics = evaluate_model(
    logit_best,
    "Balanced Logistic Regression",
    train_X,
    train_y,
    val_X,
    val_y,
    test_X,
    test_y,
)
plot_diagnostics(logit_best, val_X, val_y, "Balanced Logistic Regression", "validation")
plot_diagnostics(logit_best, test_X, test_y, "Balanced Logistic Regression", "test")
append_summary_entry("Balanced Logistic Regression", logit_metrics)


### Random Forest Ensemble
Targets non-linear interactions and feature importance analysis via tree-based ensembles.

In [None]:
rf_params = {
    "model__n_estimators": [200, 400, 600],
    "model__max_depth": [None, 10, 20, 30],
    "model__min_samples_split": [2, 5, 10],
    "model__min_samples_leaf": [1, 2, 4],
    "model__max_features": ["sqrt", "log2", 0.5],
}

rf_pipeline = Pipeline(
    steps=[
        ("preprocessor", PREPROCESSOR),
        (
            "model",
            RandomForestClassifier(
                class_weight="balanced_subsample",
                random_state=RANDOM_STATE,
                n_jobs=-1,
            ),
        ),
    ]
)

rf_search = RandomizedSearchCV(
    estimator=rf_pipeline,
    param_distributions=rf_params,
    n_iter=20,
    scoring="roc_auc",
    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
    n_jobs=-1,
    random_state=RANDOM_STATE,
)

rf_search.fit(train_X, train_y)
rf_best = rf_search.best_estimator_
rf_metrics = evaluate_model(
    rf_best,
    "Random Forest Ensemble",
    train_X,
    train_y,
    val_X,
    val_y,
    test_X,
    test_y,
)
plot_diagnostics(rf_best, val_X, val_y, "Random Forest Ensemble", "validation")
plot_diagnostics(rf_best, test_X, test_y, "Random Forest Ensemble", "test")
append_summary_entry("Random Forest Ensemble", rf_metrics)


### Gradient Boosting
Leverages stage-wise additive modeling to optimize for ROC-AUC with shrinkage and depth control.

In [None]:
gb_params = {
    "model__n_estimators": [200, 400, 600],
    "model__learning_rate": [0.01, 0.05, 0.1],
    "model__max_depth": [2, 3, 4],
    "model__subsample": [0.7, 0.85, 1.0],
    "model__min_samples_split": [2, 4, 6],
}

gb_pipeline = Pipeline(
    steps=[
        ("preprocessor", PREPROCESSOR),
        (
            "model",
            GradientBoostingClassifier(random_state=RANDOM_STATE),
        ),
    ]
)

gb_search = RandomizedSearchCV(
    estimator=gb_pipeline,
    param_distributions=gb_params,
    n_iter=20,
    scoring="roc_auc",
    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
    n_jobs=-1,
    random_state=RANDOM_STATE,
)

gb_search.fit(train_X, train_y)
gb_best = gb_search.best_estimator_
gb_metrics = evaluate_model(
    gb_best,
    "Gradient Boosting",
    train_X,
    train_y,
    val_X,
    val_y,
    test_X,
    test_y,
)
plot_diagnostics(gb_best, val_X, val_y, "Gradient Boosting", "validation")
plot_diagnostics(gb_best, test_X, test_y, "Gradient Boosting", "test")
append_summary_entry("Gradient Boosting", gb_metrics)


### XGBoost Gradient Boosted Trees
Explores gradient-boosting with second-order optimization and regularization controls.

In [None]:
xgb_params = {
    "model__n_estimators": [200, 400, 600],
    "model__max_depth": [3, 4, 5],
    "model__learning_rate": [0.01, 0.05, 0.1],
    "model__subsample": [0.6, 0.8, 1.0],
    "model__colsample_bytree": [0.6, 0.8, 1.0],
    "model__reg_alpha": [0.0, 0.1, 0.5],
    "model__reg_lambda": [0.5, 1.0, 2.0],
    "model__gamma": [0, 0.1, 0.3],
}

xgb_pipeline = Pipeline(
    steps=[
        ("preprocessor", PREPROCESSOR),
        (
            "model",
            XGBClassifier(
                objective="binary:logistic",
                eval_metric="logloss",
                random_state=RANDOM_STATE,
                tree_method="hist",
                scale_pos_weight=None,
            ),
        ),
    ]
)

xgb_search = RandomizedSearchCV(
    estimator=xgb_pipeline,
    param_distributions=xgb_params,
    n_iter=25,
    scoring="roc_auc",
    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
    n_jobs=-1,
    random_state=RANDOM_STATE,
)

xgb_search.fit(train_X, train_y)
xgb_best = xgb_search.best_estimator_
xgb_metrics = evaluate_model(
    xgb_best,
    "XGBoost Gradient Boosted Trees",
    train_X,
    train_y,
    val_X,
    val_y,
    test_X,
    test_y,
)
plot_diagnostics(xgb_best, val_X, val_y, "XGBoost Gradient Boosted Trees", "validation")
plot_diagnostics(xgb_best, test_X, test_y, "XGBoost Gradient Boosted Trees", "test")
append_summary_entry("XGBoost Gradient Boosted Trees", xgb_metrics)


### Support Vector Machine (RBF)
Captures complex decision boundaries through kernel methods with probabilistic calibration.

In [None]:
svm_params = {
    "model__C": np.logspace(-2, 2, 10),
    "model__gamma": np.logspace(-4, 0, 10),
}

svm_pipeline = Pipeline(
    steps=[
        ("preprocessor", PREPROCESSOR),
        (
            "model",
            SVC(
                probability=True,
                class_weight="balanced",
                random_state=RANDOM_STATE,
            ),
        ),
    ]
)

svm_search = RandomizedSearchCV(
    estimator=svm_pipeline,
    param_distributions=svm_params,
    n_iter=20,
    scoring="roc_auc",
    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE),
    n_jobs=-1,
    random_state=RANDOM_STATE,
)

svm_search.fit(train_X, train_y)
svm_best = svm_search.best_estimator_
svm_metrics = evaluate_model(
    svm_best,
    "Support Vector Machine (RBF)",
    train_X,
    train_y,
    val_X,
    val_y,
    test_X,
    test_y,
)
plot_diagnostics(svm_best, val_X, val_y, "Support Vector Machine (RBF)", "validation")
plot_diagnostics(svm_best, test_X, test_y, "Support Vector Machine (RBF)", "test")
append_summary_entry("Support Vector Machine (RBF)", svm_metrics)


## Consolidated Results & Next Steps
Loads the aggregated metrics table and outlines follow-up analyses for the final report.

In [None]:
if ARTIFACT_METRICS.exists():
    phase4_metrics = pd.read_csv(ARTIFACT_METRICS)
    display(phase4_metrics.sort_values("validation_roc_auc", ascending=False))
else:
    display(Markdown("No metrics logged yet. Run the experiments above to populate the table."))


### Reporting Checklist
- Integrate best-performing model metrics into `reports/phase4_advanced_models_report.md`.
- Refresh visualization gallery with ROC, PR, and feature importance plots.
- Update `plan/data-cybersecurity-attacks-analysis-1.md` TASK-036 to reflect experiment outcomes.
- Prepare deployment considerations if gap between validation and test metrics is minimal.