# Centralized Log Management System for Threat Monitoring (UNSW-NB15)

**Author:** Your Name Here  
**Date:** 2025-08-13

---

This notebook implements a memory-efficient, production-minded **Centralized Log Management System (CLMS)** for threat monitoring using the **UNSW-NB15** dataset. It covers:

- Automated dataset fetch + robust failure handling
- Chunked ingestion with dtype optimization (<= 8GB RAM friendly)
- Preprocessing & feature engineering (categoricals + numeric scaling)
- EDA with professional plots (class distribution, correlations, feature importance)
- Modeling: **Baseline** (Incremental Logistic Regression via `SGDClassifier`) and **Optimized** (Scikit-learn `HistGradientBoostingClassifier`; optional XGBoost if installed)
- Full evaluation: Accuracy, Precision, Recall, F1, Confusion Matrix, ROC & AUC
- **Centralized log pipeline simulation** for near-real-time scoring and alerting
- Clear, modular code for integration into production

> **Dataset**: [UNSW-NB15](https://research.unsw.edu.au/projects/unsw-nb15-dataset)


## 1. Introduction

Modern security operations require collecting logs from multiple sources (network, system, application), centralizing them, and applying analytics to detect threats in real-time or near-real-time.
This notebook provides an end-to-end, memory-conscious blueprint you can adapt to your environment.


## 2. Setup & Configuration

- Paths and memory limits
- Download mirrors for UNSW-NB15 (graceful handling when offline)
- Lightweight dependencies by default (sklearn/matplotlib/seaborn); optional XGBoost if installed


In [11]:

# === Configuration ===
from pathlib import Path
import os, sys, shutil, hashlib, warnings, io, zipfile, tarfile, gzip, bz2, lzma, tempfile, json
from typing import Iterator, Tuple, Dict, Any, List, Optional

DATA_DIR = Path("./data_unsw_nb15")
DATA_DIR.mkdir(parents=True, exist_ok=True)

# Known filenames for the split CSVs commonly distributed
TRAIN_CSV = DATA_DIR / "UNSW_NB15_training-set.csv"
TEST_CSV  = DATA_DIR / "UNSW_NB15_testing-set.csv"

# Chunk size for memory-conscious loading
CHUNK_SIZE = 100_000  # adjust if you have more/less memory

# For EDA sampling to keep plots snappy
EDA_SAMPLE_N = 100_000

# For full-model training; you can reduce to cap memory/time
MAX_TRAIN_ROWS = 500_000  # set None to use all training rows

RANDOM_STATE = 42
N_JOBS = os.cpu_count() or 2

warnings.filterwarnings("ignore")
print("Config OK.")


Config OK.


## 3. Dataset Overview & Download

The **UNSW-NB15** dataset contains realistic network traffic with labeled normal and attack events across multiple categories.
We'll attempt to download the **training** and **testing** CSVs if they're not already present. The code gracefully handles:

- Network failures
- Missing files
- Corrupted archives

> If auto-download fails (e.g., offline), place the CSVs in `./data_unsw_nb15/` and re-run the notebook.


In [19]:

import urllib.request

def try_download(url: str, dest: Path, timeout: int = 60) -> bool:
    """Best-effort file download with graceful failure."""
    try:
        print(f"Attempting download: {url}")
        with urllib.request.urlopen(url, timeout=timeout) as r, open(dest, 'wb') as f:
            shutil.copyfileobj(r, f)
        print(f"Downloaded -> {dest}")
        return True
    except Exception as e:
        print(f"[WARN] Download failed from {url}: {e}")
        return False

def ensure_unsw_nb15_present() -> None:
    """Ensure UNSW NB15 CSVs exist; attempt multiple mirrors if needed."""
    if TRAIN_CSV.exists() and TEST_CSV.exists():
        print("UNSW-NB15 CSVs already present.")
        return

    mirrors = [
        # --- Add or update mirrors as needed ---
        # Official project page often links to mirrors; direct CSV links can change.
        # The following are commonly seen mirror paths. If these fail, place files manually.
        # These URLs are examples/placeholders that may or may not be live at runtime.
        # Replace or add mirrors in the list as needed.
        "https://raw.githubusercontent.com/duangenquan/UNSW-NB15/master/UNSW_NB15_training-set.csv",
        "https://raw.githubusercontent.com/duangenquan/UNSW-NB15/master/UNSW_NB15_testing-set.csv",
    ]
    ok = False
    # Try to download train/test if missing; iterate mirrors in pairs when possible.
    # This is a simple strategy: try each mirror for both train and test if needed.
    for base in set([m.rsplit('/', 1)[0] for m in mirrors]):
        train_url = base + "/UNSW_NB15_training-set.csv"
        test_url  = base + "/UNSW_NB15_testing-set.csv"
        got_train = TRAIN_CSV.exists() or try_download(train_url, TRAIN_CSV)
        got_test  = TEST_CSV.exists()  or try_download(test_url, TEST_CSV)
        if got_train and got_test:
            ok = True
            break

    if not ok:
        print("\n[NOTICE] Could not auto-download UNSW-NB15 CSVs.\n"
              "Please download the following files and place them in ./data_unsw_nb15/:\n"
              "  - UNSW_NB15_training-set.csv\n  - UNSW_NB15_testing-set.csv\n"
              "Source: https://research.unsw.edu.au/projects/unsw-nb15-dataset\n")

ensure_unsw_nb15_present()


Attempting download: https://raw.githubusercontent.com/duangenquan/UNSW-NB15/master/UNSW_NB15_training-set.csv
[WARN] Download failed from https://raw.githubusercontent.com/duangenquan/UNSW-NB15/master/UNSW_NB15_training-set.csv: HTTP Error 404: Not Found
Attempting download: https://raw.githubusercontent.com/duangenquan/UNSW-NB15/master/UNSW_NB15_testing-set.csv
[WARN] Download failed from https://raw.githubusercontent.com/duangenquan/UNSW-NB15/master/UNSW_NB15_testing-set.csv: HTTP Error 404: Not Found

[NOTICE] Could not auto-download UNSW-NB15 CSVs.
Please download the following files and place them in ./data_unsw_nb15/:
  - UNSW_NB15_training-set.csv
  - UNSW_NB15_testing-set.csv
Source: https://research.unsw.edu.au/projects/unsw-nb15-dataset



## 4. Data Preprocessing

We will:

- Load CSVs using **dtype optimization** and **chunked reading** to minimize memory use.
- Handle missing values.
- Encode categorical features using an **OrdinalEncoder** (lightweight; stable in streaming).
- Standardize numeric features for models that benefit from scaling.

We keep preprocessing modular and production-friendly.


In [13]:

import pandas as pd
import numpy as np
from sklearn.preprocessing import OrdinalEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split
from collections import Counter

LABEL_COL = 'label'       # 0=normal, 1=attack in UNSW split CSVs
ATTACK_CAT = 'attack_cat' # high-level attack category

def infer_dtypes(path: Path, sample_rows: int = 10000) -> Dict[str, str]:
    """Infer low-memory dtypes from a small sample."""
    sample = pd.read_csv(path, nrows=sample_rows)
    dtypes = {}
    for col in sample.columns:
        if col in [LABEL_COL, ATTACK_CAT]:
            # Keep label as int8 if numeric; attack_cat stays object/categorical
            if col == LABEL_COL and pd.api.types.is_numeric_dtype(sample[col]):
                dtypes[col] = 'int8'
            else:
                dtypes[col] = 'object'
            continue

        if pd.api.types.is_integer_dtype(sample[col]):
            # Choose smallest adequate integer type
            mn, mx = sample[col].min(), sample[col].max()
            if mn >= 0:
                if mx < 2**8: dtypes[col] = 'uint8'
                elif mx < 2**16: dtypes[col] = 'uint16'
                elif mx < 2**32: dtypes[col] = 'uint32'
                else: dtypes[col] = 'uint64'
            else:
                if -128 <= mn <= 127 and -128 <= mx <= 127: dtypes[col] = 'int8'
                elif -32768 <= mn <= 32767: dtypes[col] = 'int16'
                elif -2147483648 <= mn <= 2147483647: dtypes[col] = 'int32'
                else: dtypes[col] = 'int64'
        elif pd.api.types.is_float_dtype(sample[col]):
            dtypes[col] = 'float32'  # downcast
        elif pd.api.types.is_bool_dtype(sample[col]):
            dtypes[col] = 'bool'
        else:
            dtypes[col] = 'object'
    return dtypes

def get_columns(path: Path) -> List[str]:
    return list(pd.read_csv(path, nrows=1).columns)

def build_preprocessor(df_sample: pd.DataFrame) -> Tuple[Pipeline, List[str], List[str]]:
    """Create a ColumnTransformer for numeric+categorical with imputation+scaling/ordinal encoding."""
    X_cols = [c for c in df_sample.columns if c not in [LABEL_COL, ATTACK_CAT]]
    numeric_cols = [c for c in X_cols if pd.api.types.is_numeric_dtype(df_sample[c])]
    cat_cols = [c for c in X_cols if c not in numeric_cols]

    numeric_pipe = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler(with_mean=True, with_std=True))
    ])

    cat_pipe = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1))
    ])

    preprocessor = ColumnTransformer(transformers=[
        ('num', numeric_pipe, numeric_cols),
        ('cat', cat_pipe, cat_cols)
    ], remainder='drop', n_jobs=N_JOBS)

    return preprocessor, numeric_cols, cat_cols

print("Preprocessing utilities ready.")


Preprocessing utilities ready.


## 5. Exploratory Data Analysis (EDA)

We explore:

- **Class distribution** (normal vs. attack)
- **Attack category distribution**
- **Feature correlations** (numeric subset)
- **Feature importance** via a quick tree-based model on a sample (RandomForest)

> Plots are produced on a representative sample to keep memory usage low.


In [14]:

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier

def load_sample_for_eda(path: Path, n: int = EDA_SAMPLE_N) -> pd.DataFrame:
    dtypes = infer_dtypes(path)
    total = 0
    chunks = []
    for chunk in pd.read_csv(path, dtype=dtypes, chunksize=50_000):
        chunks.append(chunk)
        total += len(chunk)
        if total >= n:
            break
    df = pd.concat(chunks, ignore_index=True)
    # ensure label is int
    if LABEL_COL in df.columns:
        df[LABEL_COL] = pd.to_numeric(df[LABEL_COL], errors='coerce').fillna(0).astype('int8')
    return df

if TRAIN_CSV.exists():
    eda_df = load_sample_for_eda(TRAIN_CSV, EDA_SAMPLE_N)
    print("Sample for EDA:", eda_df.shape)

    # Class distribution
    plt.figure(figsize=(6,4))
    sns.countplot(x=LABEL_COL, data=eda_df)
    plt.title("Class Distribution (0=Normal, 1=Attack)")
    plt.xlabel("Label"); plt.ylabel("Count")
    plt.show()

    # Attack category distribution (if present)
    if ATTACK_CAT in eda_df.columns:
        plt.figure(figsize=(10,4))
        eda_df[ATTACK_CAT] = eda_df[ATTACK_CAT].astype('category')
        eda_df[ATTACK_CAT].value_counts().head(15).plot(kind='bar')
        plt.title("Top Attack Categories (Sample)")
        plt.xlabel("attack_cat"); plt.ylabel("Count")
        plt.tight_layout()
        plt.show()

    # Correlations for numeric features
    num_cols = [c for c in eda_df.columns if c not in [LABEL_COL, ATTACK_CAT] and pd.api.types.is_numeric_dtype(eda_df[c])]
    corr = eda_df[num_cols + [LABEL_COL]].corr(numeric_only=True)
    plt.figure(figsize=(10,8))
    sns.heatmap(corr, cmap='coolwarm', center=0)
    plt.title("Feature Correlations (numeric subset)")
    plt.show()

    # Quick feature importance using RandomForest on sample
    X_sample = eda_df.drop(columns=[LABEL_COL, ATTACK_CAT], errors='ignore')
    y_sample = eda_df[LABEL_COL]
    # Coerce non-numeric to category codes just for quick importance
    for c in X_sample.columns:
        if not pd.api.types.is_numeric_dtype(X_sample[c]):
            X_sample[c] = X_sample[c].astype('category').cat.codes
    rf = RandomForestClassifier(n_estimators=100, n_jobs=N_JOBS, random_state=RANDOM_STATE)
    rf.fit(X_sample, y_sample)
    importances = pd.Series(rf.feature_importances_, index=X_sample.columns).sort_values(ascending=False).head(20)
    plt.figure(figsize=(10,4))
    importances.plot(kind='bar')
    plt.title("Top 20 Feature Importances (RF on sample)")
    plt.ylabel("Importance")
    plt.tight_layout()
    plt.show()
else:
    print("Training CSV not found; EDA will run after files are available.")


Training CSV not found; EDA will run after files are available.


## 6. Model Building

We implement two models:

1. **Baseline:** Incremental Logistic Regression via `SGDClassifier(loss='log_loss')`
   - Trained with **partial_fit** over chunks → low memory footprint.
2. **Optimized:** `HistGradientBoostingClassifier` (and optional XGBoost if installed)
   - Trained on a capped number of rows for efficiency.
   - Hyperparameters can be tuned via grid/random search as needed.

We use a robust preprocessing `ColumnTransformer` for both.


In [15]:

from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix, roc_auc_score, roc_curve, classification_report
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import train_test_split

def stream_dataframe(path: Path, dtypes: Dict[str,str], chunksize: int = CHUNK_SIZE) -> Iterator[pd.DataFrame]:
    for chunk in pd.read_csv(path, dtype=dtypes, chunksize=chunksize):
        # Clean label
        if LABEL_COL in chunk.columns:
            chunk[LABEL_COL] = pd.to_numeric(chunk[LABEL_COL], errors='coerce').fillna(0).astype('int8')
        yield chunk

def fit_preprocessor_on_sample(path: Path, sample_rows: int = 100_000):
    dtypes = infer_dtypes(path)
    # gather sample
    frames = []
    total = 0
    for chunk in stream_dataframe(path, dtypes, chunksize=50_000):
        frames.append(chunk)
        total += len(chunk)
        if total >= sample_rows:
            break
    df_sample = pd.concat(frames, ignore_index=True)
    pre, num_cols, cat_cols = build_preprocessor(df_sample)
    # Fit on X sample only
    Xs = df_sample.drop(columns=[LABEL_COL, ATTACK_CAT], errors='ignore')
    pre.fit(Xs)
    return pre, dtypes, num_cols, cat_cols

def train_incremental_logreg(train_path: Path, preprocessor: ColumnTransformer, dtypes: Dict[str,str]) -> SGDClassifier:
    clf = SGDClassifier(loss='log_loss', max_iter=5, learning_rate='optimal', random_state=RANDOM_STATE)
    classes_ = np.array([0,1], dtype='int8')
    for i, chunk in enumerate(stream_dataframe(train_path, dtypes)):
        X_chunk = chunk.drop(columns=[LABEL_COL, ATTACK_CAT], errors='ignore')
        y_chunk = chunk[LABEL_COL].values.astype('int8')
        X_trans = preprocessor.transform(X_chunk)
        if i == 0:
            clf.partial_fit(X_trans, y_chunk, classes=classes_)
        else:
            clf.partial_fit(X_trans, y_chunk)
        # free memory
        del X_chunk, y_chunk, X_trans, chunk
    return clf

def load_cap(path: Path, dtypes: Dict[str,str], max_rows: Optional[int] = None) -> Tuple[pd.DataFrame, pd.Series]:
    frames = []
    total = 0
    for chunk in stream_dataframe(path, dtypes, chunksize=CHUNK_SIZE):
        frames.append(chunk)
        total += len(chunk)
        if max_rows and total >= max_rows:
            break
    df = pd.concat(frames, ignore_index=True)
    y = df[LABEL_COL].astype('int8')
    X = df.drop(columns=[LABEL_COL, ATTACK_CAT], errors='ignore')
    return X, y

def train_hgb(train_path: Path, preprocessor: ColumnTransformer, dtypes: Dict[str,str], max_rows: Optional[int] = MAX_TRAIN_ROWS) -> HistGradientBoostingClassifier:
    X_raw, y = load_cap(train_path, dtypes, max_rows=max_rows)
    X = preprocessor.transform(X_raw)
    hgb = HistGradientBoostingClassifier(max_depth=None, learning_rate=0.1, l2_regularization=0.0,
                                         max_bins=255, random_state=RANDOM_STATE)
    hgb.fit(X, y)
    del X_raw, X, y
    return hgb

def try_train_xgboost(train_path: Path, preprocessor: ColumnTransformer, dtypes: Dict[str,str], max_rows: Optional[int] = MAX_TRAIN_ROWS):
    try:
        import xgboost as xgb
    except Exception as e:
        print("XGBoost not installed; skipping optimized XGB model.")
        return None
    X_raw, y = load_cap(train_path, dtypes, max_rows=max_rows)
    X = preprocessor.transform(X_raw)
    dtrain = xgb.DMatrix(X, label=y)
    params = {
        'objective': 'binary:logistic',
        'eval_metric': 'auc',
        'eta': 0.1,
        'max_depth': 6,
        'subsample': 0.8,
        'colsample_bytree': 0.8,
        'seed': RANDOM_STATE
    }
    bst = xgb.train(params, dtrain, num_boost_round=200)
    del X_raw, X, y, dtrain
    return bst

print("Model utilities ready.")


Model utilities ready.


## 7. Evaluation

We evaluate each model on the official UNSW-NB15 **test** split with:

- Accuracy, Precision, Recall, F1-score
- Confusion Matrix
- ROC Curve & AUC

All computations are memory-conscious.


In [16]:

def evaluate_sklearn_model(model, preprocessor, test_path: Path, dtypes: Dict[str,str], model_name: str = "Model") -> Dict[str, Any]:
    y_true_all, y_prob_all = [], []
    for chunk in stream_dataframe(test_path, dtypes, chunksize=CHUNK_SIZE):
        X_chunk = chunk.drop(columns=[LABEL_COL, ATTACK_CAT], errors='ignore')
        y_chunk = chunk[LABEL_COL].astype('int8').values
        X_trans = preprocessor.transform(X_chunk)
        if hasattr(model, "predict_proba"):
            prob = model.predict_proba(X_trans)[:,1]
        else:
            # SGDClassifier with 'log_loss' has decision_function
            try:
                from scipy.special import expit
                prob = expit(model.decision_function(X_trans))
            except Exception:
                pred = model.predict(X_trans)
                prob = pred  # fallback if needed
        y_true_all.append(y_chunk)
        y_prob_all.append(prob)
        del X_chunk, y_chunk, X_trans, chunk
    y_true = np.concatenate(y_true_all)
    y_prob = np.concatenate(y_prob_all)
    y_pred = (y_prob >= 0.5).astype('int8')
    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
    cm = confusion_matrix(y_true, y_pred)
    try:
        auc = roc_auc_score(y_true, y_prob)
    except Exception:
        auc = float('nan')
    print(f"\n=== {model_name} ===")
    print(f"Accuracy: {acc:.4f} | Precision: {prec:.4f} | Recall: {rec:.4f} | F1: {f1:.4f} | AUC: {auc:.4f}")
    print("\nClassification Report:\n", classification_report(y_true, y_pred, digits=4, zero_division=0))
    # Confusion matrix plot
    import matplotlib.pyplot as plt
    fig, ax = plt.subplots(figsize=(4,4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax)
    ax.set_title(f"Confusion Matrix - {model_name}"); ax.set_xlabel("Predicted"); ax.set_ylabel("True")
    plt.show()
    # ROC
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    plt.figure(figsize=(5,4))
    plt.plot(fpr, tpr, label=f"ROC (AUC={auc:.3f})")
    plt.plot([0,1],[0,1],'k--')
    plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate"); plt.title(f"ROC - {model_name}"); plt.legend()
    plt.show()
    return {"accuracy": acc, "precision": prec, "recall": rec, "f1": f1, "auc": auc, "cm": cm}

def evaluate_xgboost(bst, preprocessor, test_path: Path, dtypes: Dict[str,str]):
    if bst is None:
        return None
    import xgboost as xgb
    y_true_all, y_prob_all = [], []
    for chunk in stream_dataframe(test_path, dtypes, chunksize=CHUNK_SIZE):
        X_chunk = chunk.drop(columns=[LABEL_COL, ATTACK_CAT], errors='ignore')
        y_chunk = chunk[LABEL_COL].astype('int8').values
        X_trans = preprocessor.transform(X_chunk)
        dtest = xgb.DMatrix(X_trans)
        prob = bst.predict(dtest)
        y_true_all.append(y_chunk)
        y_prob_all.append(prob)
        del X_chunk, y_chunk, X_trans, dtest, chunk
    y_true = np.concatenate(y_true_all); y_prob = np.concatenate(y_prob_all)
    y_pred = (y_prob >= 0.5).astype('int8')
    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
    cm = confusion_matrix(y_true, y_pred)
    auc = roc_auc_score(y_true, y_prob)
    print(f"\n=== XGBoost (Optimized) ===")
    print(f"Accuracy: {acc:.4f} | Precision: {prec:.4f} | Recall: {rec:.4f} | F1: {f1:.4f} | AUC: {auc:.4f}")
    # plots
    import matplotlib.pyplot as plt
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title("Confusion Matrix - XGBoost"); plt.xlabel("Predicted"); plt.ylabel("True")
    plt.show()
    from sklearn.metrics import roc_curve
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    plt.figure(figsize=(5,4))
    plt.plot(fpr, tpr, label=f"ROC (AUC={auc:.3f})")
    plt.plot([0,1],[0,1],'k--')
    plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title("ROC - XGBoost"); plt.legend()
    plt.show()
    return {"accuracy": acc, "precision": prec, "recall": rec, "f1": f1, "auc": auc, "cm": cm}


### 6.1 Train & Evaluate

We fit the preprocessing on a representative sample, then:

- Train **incremental logistic regression** over chunks
- Train **HistGradientBoosting** on a capped number of rows
- Optionally train **XGBoost** if available
- Evaluate all on the **test** split


In [17]:

if TRAIN_CSV.exists() and TEST_CSV.exists():
    preprocessor, dtypes, num_cols, cat_cols = fit_preprocessor_on_sample(TRAIN_CSV, sample_rows=150_000)

    # Baseline: Incremental Logistic Regression
    sgd_clf = train_incremental_logreg(TRAIN_CSV, preprocessor, dtypes)
    results_sgd = evaluate_sklearn_model(sgd_clf, preprocessor, TEST_CSV, dtypes, model_name="Baseline SGD-LogReg")

    # Optimized: HistGradientBoosting
    hgb_clf = train_hgb(TRAIN_CSV, preprocessor, dtypes, max_rows=MAX_TRAIN_ROWS)
    results_hgb = evaluate_sklearn_model(hgb_clf, preprocessor, TEST_CSV, dtypes, model_name="Optimized HistGB")

    # Optional: XGBoost
    bst = try_train_xgboost(TRAIN_CSV, preprocessor, dtypes, max_rows=MAX_TRAIN_ROWS)
    results_xgb = evaluate_xgboost(bst, preprocessor, TEST_CSV, dtypes)
else:
    print("Training/testing CSVs not found. Place them in ./data_unsw_nb15 and re-run.")


Training/testing CSVs not found. Place them in ./data_unsw_nb15 and re-run.


## 8. Centralized Log Management Simulation

We simulate logs being ingested from multiple sources (e.g., **system**, **application**, **network**) into one pipeline.
The model processes incoming events and emits alerts for suspicious (attack-labeled) traffic.

This section demonstrates:

- A unified **log collector** that yields records from sources
- Preprocessing + model inference in streaming fashion
- Thresholded **alerting** with latency-friendly batching


In [18]:

import time
from itertools import cycle

def synthetic_source_names() -> List[str]:
    return ["syslog", "app", "netflow"]

def multi_source_stream(paths: List[Path], dtypes: Dict[str,str], batch_size: int = 5000) -> Iterator[pd.DataFrame]:
    """Round-robin across paths to simulate centralized ingestion."""
    iters = [pd.read_csv(p, dtype=dtypes, chunksize=batch_size) for p in paths]
    for it, name in zip(iters, cycle(synthetic_source_names())):
        try:
            chunk = next(it)
            chunk["_source"] = name
            yield chunk
        except StopIteration:
            continue

def realtime_scoring(pipeline_pre, model, stream_it: Iterator[pd.DataFrame], alert_threshold: float = 0.8, max_batches: int = 10):
    alerts = []
    for i, batch in enumerate(stream_it):
        Xb = batch.drop(columns=[LABEL_COL, ATTACK_CAT], errors='ignore')
        Xb_trans = pipeline_pre.transform(Xb)
        # probability
        if hasattr(model, "predict_proba"):
            prob = model.predict_proba(Xb_trans)[:,1]
        else:
            from scipy.special import expit
            prob = expit(model.decision_function(Xb_trans))
        flagged = batch[prob >= alert_threshold].copy()
        flagged["threat_score"] = prob[prob >= alert_threshold]
        for _, row in flagged.iterrows():
            alerts.append({
                "ts": time.time(),
                "source": row.get("_source", "unknown"),
                "score": float(row["threat_score"]),
                "summary": "High-risk event detected"
            })
        print(f"Batch {i+1}: processed {len(batch)} events, alerts: {len(flagged)}")
        if i+1 >= max_batches:
            break
    return alerts

# Demo (only runs if files exist and a trained model is available)
if TRAIN_CSV.exists() and TEST_CSV.exists():
    try:
        alerts = realtime_scoring(preprocessor, hgb_clf, multi_source_stream([TEST_CSV], dtypes, batch_size=10_000), alert_threshold=0.9, max_batches=5)
        print(f"Total alerts emitted: {len(alerts)}\nSample:")
        print(alerts[:5])
    except NameError:
        print("Train models first (previous cell) to run the real-time simulation.")
else:
    print("CSV files not present; simulation will run once data is available.")


CSV files not present; simulation will run once data is available.


## 9. Conclusion & Future Work

We built a memory-conscious, production-minded CLMS pipeline on **UNSW-NB15** featuring:

- Robust ingestion and dtype-optimized preprocessing with chunked loading
- Clear EDA and interpretability touchpoints (correlations, feature importance)
- Two detection models: **incremental logistic regression** and **histogram-based GBM**
- Full evaluation (Accuracy/Precision/Recall/F1/AUC)
- Real-time simulation with centralized ingestion and alerting

**Future enhancements:**

- Enrich features with time-window aggregations and entity profiling (e.g., per host/user)
- Add drift detection and online calibration for thresholds
- Integrate with message queues (Kafka), monitoring (Prometheus), and SIEM systems
- Expand to multi-class detection over `attack_cat` or hierarchical detectors


### Appendix: Memory Tips

- Reduce `CHUNK_SIZE` if you run low on RAM.
- Lower `MAX_TRAIN_ROWS` for the optimized model.
- Use swap or smaller samples for EDA.
- Ensure you're running a 64-bit Python with enough available memory.
