<a href="https://colab.research.google.com/github/kareemullah123456789/cybersecurity_ML/blob/main/cyber_workshop_3_xgboost.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline



from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score

# -------------------------------
# 2. Load and Merge Data
# -------------------------------
import os
base_path = '/content/drive/MyDrive/cybersecurity_data/data'
users = pd.read_csv(os.path.join(base_path,"users.csv"))
devices = pd.read_csv(os.path.join(base_path,"devices.csv"))
traffic = pd.read_csv(os.path.join(base_path,"network_traffic.csv"))

df = traffic.merge(users, on="user_id", how="left")
df = df.merge(devices, on="device_id", how="left")


In [None]:
df.head(2)

Unnamed: 0,log_id,user_id,device_id,timestamp,src_ip,dst_ip,protocol,port,bytes_sent,bytes_received,duration,is_anomaly,username,department,role,device_type,os
0,T00001,USR_EMP0028,D0403,2022-02-19 08:39:32,192.168.215.198,10.0.241.5,SSH,22.0,6762.0,4230.0,23.31,1,e_tifcs28@company.com,HR,Recruiter,Laptop,Ubuntu 22.04
1,T00002,USR_EMP0055,D0282,2022-01-27 01:57:06,192.168.8.133,10.0.126.225,SMTP,25.0,2749.0,521.0,32.8,0,e_hodhj55@company.com,Marketing,SEO Specialist,Desktop,Ubuntu 20.04


In [None]:
# -------------------------------
# 3. Drop unnecessary columns
# -------------------------------
drop_cols = ["log_id", "user_id", "device_id", "username", "src_ip", "dst_ip"]
df = df.drop(columns=drop_cols)
df.columns

Index(['timestamp', 'protocol', 'port', 'bytes_sent', 'bytes_received',
       'duration', 'is_anomaly', 'department', 'role', 'device_type', 'os'],
      dtype='object')

In [None]:

# -------------------------------
# 4. Extract hour from timestamp
# -------------------------------
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df["hour"] = df["timestamp"].dt.hour

In [None]:
# -------------------------------
# 5. Define feature groups
# -------------------------------
num_cols = ["bytes_sent", "bytes_received", "duration", "hour"]
cat_cols = ["protocol", "port", "department", "role", "device_type", "os"]

In [None]:
# Define X and y
X = df.drop(columns=["is_anomaly", "timestamp"])
y = df["is_anomaly"].astype(int)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, stratify=y, random_state=42
)

print("Train shape:", X_train.shape, " Test shape:", X_test.shape)


Train shape: (7000, 10)  Test shape: (3000, 10)


In [None]:
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer

numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="median"))
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, num_cols),
        ("cat", categorical_transformer, cat_cols)
    ]
)


In [None]:
from xgboost import XGBClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.model_selection import RandomizedSearchCV
import numpy as np
import pandas as pd

# -------------------------------
# Default XGBoost Model
# -------------------------------
clf_xgb = Pipeline(steps=[
    ("preprocessor", preprocessor),
    ("classifier", XGBClassifier(
        n_estimators=200,
        max_depth=8,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
        use_label_encoder=False,
        eval_metric="logloss"
    ))
])

clf_xgb.fit(X_train, y_train)
y_pred = clf_xgb.predict(X_test)
y_proba = clf_xgb.predict_proba(X_test)[:,1]

default_results = {
    "Model": "XGBoost (Default)",
    "Accuracy": accuracy_score(y_test, y_pred),
    "Precision": precision_score(y_test, y_pred),
    "Recall": recall_score(y_test, y_pred),
    "F1": f1_score(y_test, y_pred),
    "ROC-AUC": roc_auc_score(y_test, y_proba)
}
print(pd.DataFrame([default_results]))

Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


               Model  Accuracy  Precision    Recall       F1   ROC-AUC
0  XGBoost (Default)  0.899333   0.933121  0.510453  0.65991  0.751592


In [None]:
# -------------------------------
# Hyperparameter Tuning
# -------------------------------
param_dist = {
    "classifier__n_estimators": [100, 200, 300],
    "classifier__max_depth": [5, 8, 10, 12],
    "classifier__learning_rate": [0.01, 0.05, 0.1, 0.2],
    "classifier__subsample": [0.8, 1.0],
    "classifier__colsample_bytree": [0.8, 1.0]
}

search = RandomizedSearchCV(
    clf_xgb, param_distributions=param_dist,
    n_iter=15, scoring="f1", cv=3, n_jobs=-1, verbose=2, random_state=42
)
search.fit(X_train, y_train)

print("Best params:", search.best_params_)
print("Best CV F1:", search.best_score_)

best_model = search.best_estimator_

# -------------------------------
# Evaluate Tuned Model
# -------------------------------
y_pred_tuned = best_model.predict(X_test)
y_proba_tuned = best_model.predict_proba(X_test)[:,1]

tuned_results = {
    "Model": "XGBoost (Tuned)",
    "Accuracy": accuracy_score(y_test, y_pred_tuned),
    "Precision": precision_score(y_test, y_pred_tuned),
    "Recall": recall_score(y_test, y_pred_tuned),
    "F1": f1_score(y_test, y_pred_tuned),
    "ROC-AUC": roc_auc_score(y_test, y_proba_tuned)
}
print(pd.DataFrame([tuned_results]))

Fitting 3 folds for each of 15 candidates, totalling 45 fits


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


Best params: {'classifier__subsample': 0.8, 'classifier__n_estimators': 300, 'classifier__max_depth': 10, 'classifier__learning_rate': 0.01, 'classifier__colsample_bytree': 1.0}
Best CV F1: 0.6811864680143355
             Model  Accuracy  Precision    Recall        F1   ROC-AUC
0  XGBoost (Tuned)  0.903667   0.986348  0.503484  0.666667  0.741182


In [None]:
# -------------------------------
# Find Best Threshold (for tuned model)
# -------------------------------
thresholds = np.linspace(0.1, 0.9, 9)
threshold_results = []

for t in thresholds:
    y_pred_thresh = (y_proba_tuned >= t).astype(int)
    threshold_results.append({
        "Threshold": t,
        "Precision": precision_score(y_test, y_pred_thresh),
        "Recall": recall_score(y_test, y_pred_thresh),
        "F1": f1_score(y_test, y_pred_thresh)
    })

threshold_df = pd.DataFrame(threshold_results)
print(threshold_df)

best_thresh = threshold_df.loc[threshold_df["F1"].idxmax(), "Threshold"]
print("Best Threshold by F1:", best_thresh)

   Threshold  Precision    Recall        F1
0        0.1   0.285714  0.679443  0.402269
1        0.2   0.757500  0.527875  0.622177
2        0.3   0.930380  0.512195  0.660674
3        0.4   0.976510  0.506969  0.667431
4        0.5   0.986348  0.503484  0.666667
5        0.6   0.986207  0.498258  0.662037
6        0.7   0.989399  0.487805  0.653442
7        0.8   0.989209  0.479094  0.645540
8        0.9   0.994975  0.344948  0.512290
Best Threshold by F1: 0.4


In [None]:
import joblib
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from xgboost import XGBClassifier

# Custom wrapper with threshold
class ThresholdClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, base_model, threshold=0.4):
        self.base_model = base_model
        self.threshold = threshold

    def fit(self, X, y):
        self.base_model.fit(X, y)
        return self

    def predict(self, X):
        proba = self.base_model.predict_proba(X)[:, 1]
        return (proba >= self.threshold).astype(int)

    def predict_proba(self, X):
        return self.base_model.predict_proba(X)  # keep raw probabilities

# Assume best_xgb_model = your tuned pipeline from RandomizedSearchCV
# and best_xgb_threshold = the threshold you found with F1 score
wrapped_xgb = ThresholdClassifier(base_model=best_model, threshold=0.4)
wrapped_xgb.fit(X_train, y_train)

# Save to Drive
joblib.dump(wrapped_xgb, "/content/drive/MyDrive/cybersecurity_data/models/xgb_wrapped.joblib")

print("✅ Wrapped XGBoost model saved with threshold baked in.")


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


✅ Wrapped XGBoost model saved with threshold baked in.


In [None]:
!rm /content/drive/MyDrive/cybersecurity_data/models/xgb_best_threshold.txt


In [None]:
!rm /content/drive/MyDrive/cybersecurity_data/models/xgb_tuned_model.joblib

In [None]:
ls /content/drive/MyDrive/cybersecurity_data/models/

rf_threshold_0_4.joblib  xgb_wrapped.joblib


In [None]:
import joblib

save_path = "/content/drive/MyDrive/cybersecurity_data/models/"

# Save tuned XGBoost
joblib.dump(best_model, save_path + "xgb_tuned_model.joblib")
with open(save_path + "xgb_best_threshold.txt", "w") as f:
    f.write(str(best_model))

print("✅ Models & thresholds saved in Google Drive:", save_path)

✅ Models & thresholds saved in Google Drive: /content/drive/MyDrive/cybersecurity_data/models/


In [None]:
ls /content/drive/MyDrive/cybersecurity_data/models/

rf_threshold_0_4.joblib  xgb_best_threshold.txt  xgb_tuned_model.joblib


In [None]:
import gradio as gr
import pandas as pd
import joblib
import os

# -------------------------------
# Load models from Drive
# -------------------------------
model_path = "/content/drive/MyDrive/cybersecurity_data/models"

rf_model = joblib.load(os.path.join(model_path, "rf_threshold_0_4.joblib"))
xgb_model = joblib.load(os.path.join(model_path, "xgb_wrapped.joblib"))

# -------------------------------
# Define dropdown options (from dataset categories)
# -------------------------------
protocol_options = ["SSH", "FTP", "SMTP", "HTTP", "HTTPS"]
department_options = ["HR", "Finance", "Engineering", "Sales", "IT", "Marketing"]
role_options = ["Analyst", "Manager", "Engineer", "Admin", "Recruiter", "SEO Specialist"]
device_type_options = ["Laptop", "Desktop", "Server", "Firewall", "Router"]
os_options = ["Ubuntu 20.04", "Ubuntu 22.04", "Windows 10", "Windows 11", "MacOS"]

# -------------------------------
# Prediction function
# -------------------------------
def predict(model_choice, bytes_sent, bytes_received, duration, hour,
            protocol, port, department, role, device_type, os_input):

    # Build dataframe in same schema as training
    input_data = pd.DataFrame([{
        "bytes_sent": bytes_sent,
        "bytes_received": bytes_received,
        "duration": duration,
        "hour": hour,
        "protocol": protocol,
        "port": port,
        "department": department,
        "role": role,
        "device_type": device_type,
        "os": os_input
    }])

    # Select model
    if model_choice == "Random Forest":
        pred = rf_model.predict(input_data)[0]
        proba = rf_model.predict_proba(input_data)[:, 1][0]
    else:
        pred = xgb_model.predict(input_data)[0]
        proba = xgb_model.predict_proba(input_data)[:, 1][0]

    return ("⚠️ Anomaly" if pred == 1 else "✅ Normal"), float(proba)

# -------------------------------
# Build Gradio Interface
# -------------------------------
demo = gr.Interface(
    fn=predict,
    inputs=[
        gr.Dropdown(["Random Forest", "XGBoost"], label="Choose Model"),
        gr.Number(label="Bytes Sent"),
        gr.Number(label="Bytes Received"),
        gr.Number(label="Duration (seconds)"),
        gr.Slider(0, 23, step=1, label="Hour"),
        gr.Dropdown(protocol_options, label="Protocol"),
        gr.Number(label="Port"),
        gr.Dropdown(department_options, label="Department"),
        gr.Dropdown(role_options, label="Role"),
        gr.Dropdown(device_type_options, label="Device Type"),
        gr.Dropdown(os_options, label="Operating System")
    ],
    outputs=[
        gr.Label(label="Prediction"),
        gr.Number(label="Anomaly Probability")
    ],
    title="🔒 Cybersecurity Anomaly Detection",
    description="Predict anomalies using either Random Forest (threshold baked in) or tuned XGBoost (threshold baked in)."
)

demo.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://c7daca722abd634156.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


