In [1]:
# Import necessary libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from xgboost import XGBClassifier
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix


In [2]:
# Load the dataset
df = pd.read_csv('../Data Warehousing ETL/Transformed_Data/individuals_cyber_attacks_europe.csv')

# Define new classes for quantile-based binning
classes = [ 'Low', 'Medium', 'High']

# Apply quantile-based binning with pd.qcut
df['Anomaly_Class'] = pd.qcut(df['Anomaly_Scores'], q=3, labels=classes)

# Drop unnecessary columns
df = df.drop(columns=['Anomaly_Scores', 'User_Information', 'Payload_Data', 'Timestamp'])


In [3]:
# Label encoding categorical columns
label_encoders = {}
for column in df.select_dtypes(include=['object']).columns:
    le = LabelEncoder()
    df[column] = le.fit_transform(df[column].astype(str))
    label_encoders[column] = le


In [4]:
# Split the data into features (X) and target (y)
X = df.drop(columns=['Anomaly_Class'])
y = df['Anomaly_Class']

# Encode the target labels using LabelEncoder
le_target = LabelEncoder()
y_encoded = le_target.fit_transform(y)  # y_encoded will be used for training

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.3, random_state=42)


In [5]:
# Feature Selection using RandomForest to identify important features
rf = RandomForestClassifier(random_state=42)
rf.fit(X_train, y_train)

# Select top features based on importance
importances = rf.feature_importances_
indices = np.argsort(importances)[::-1]
top_10_indices = indices[:10]
top_10_features = X.columns[top_10_indices]
print(top_10_features)

# Use only the top 10 features for model selection
X_train_top_10 = X_train[top_10_features]
X_test_top_10 = X_test[top_10_features]


Index(['Year', 'Attack_Signature', 'Source_IP_Address', 'Destination_Port',
       'Packet_Length', 'Destination_IP_Address', 'Source_Port', 'Day',
       'Attack_Type', 'Hour_of_Day'],
      dtype='object')


In [6]:
# Define classifiers with default parameters
classifiers = {
    'RandomForest': RandomForestClassifier(random_state=42),
    'GradientBoosting': GradientBoostingClassifier(random_state=42),
    'XGBoost': XGBClassifier(random_state=42)
}


In [7]:
def evaluate_model(model, X_test, y_test, use_scaling=False):
    if use_scaling:
        X_test = StandardScaler().fit_transform(X_test)
        
    # Predict the labels for the test set
    y_pred = model.predict(X_test)
    
    # Calculate accuracy
    accuracy = accuracy_score(y_test, y_pred)
    
    # Calculate precision, recall, and F1 score
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')
    
    # Generate confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    
    # Print the evaluation results
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, target_names=le_target.classes_))  # Use original class names
    print("\nConfusion Matrix:")
    print(cm)
    
    # Return F1 score for selecting the best model
    return f1


In [8]:
# Evaluate all models and store their performance (F1 score)
best_f1 = 0
best_model_name = None
best_model = None

for name, model in classifiers.items():
    print(f"\nTraining and evaluating {name} with default parameters:")
    
    # Fit the model on the training data
    model.fit(X_train_top_10, y_train)
    
    # Evaluate the model's performance on the test data
    use_scaling = False  # XGBoost and GradientBoosting do not require scaling by default
    f1 = evaluate_model(model, X_test_top_10, y_test)
    
    # Track the best model based on F1 score
    if f1 > best_f1:
        best_f1 = f1
        best_model_name = name
        best_model = model

print(f"\nBest model based on F1 score: {best_model_name} with F1 score: {best_f1}")



Training and evaluating RandomForest with default parameters:
Accuracy: 0.5325333333333333
Precision: 0.6082921086777036
Recall: 0.5325333333333333
F1 Score: 0.5517506668490825

Classification Report:
              precision    recall  f1-score   support

        High       0.42      0.51      0.46      5055
         Low       0.99      0.60      0.75      4971
      Medium       0.41      0.49      0.45      4974

    accuracy                           0.53     15000
   macro avg       0.61      0.53      0.55     15000
weighted avg       0.61      0.53      0.55     15000


Confusion Matrix:
[[2571   10 2474]
 [1009 2977  985]
 [2524   10 2440]]

Training and evaluating GradientBoosting with default parameters:
Accuracy: 0.5292666666666667
Precision: 0.6064931314455911
Recall: 0.5292666666666667
F1 Score: 0.5487156505917826

Classification Report:
              precision    recall  f1-score   support

        High       0.42      0.47      0.44      5055
         Low       1.00     

In [9]:
# Define hyperparameter grids for tuning the best model
param_grid = {
    'RandomForest': {
        'n_estimators': [50, 100, 200],
        'max_depth': [None, 10, 20],
        'min_samples_split': [2, 5, 10]
    },
    'GradientBoosting': {
        'n_estimators': [50, 100, 200],
        'learning_rate': [0.01, 0.1, 0.2],
        'max_depth': [3, 5, 7],
        'subsample': [0.8, 1.0]
    },
    'XGBoost': {
        'n_estimators': [50, 100, 200],
        'learning_rate': [0.01, 0.1, 0.2],
        'max_depth': [3, 5, 7],
        'subsample': [0.8, 1.0],
        'colsample_bytree': [0.8, 1.0]
    }
}


In [10]:
from sklearn.model_selection import RandomizedSearchCV

# Function to perform randomized search
def run_random_search(model, params, X_train, y_train):
    search = RandomizedSearchCV(model, param_distributions=params, n_iter=10, cv=3, random_state=42)
    search.fit(X_train, y_train)
    
    return search.best_estimator_, search.best_params_

# Perform hyperparameter tuning only for the best model
if best_model_name is not None:
    print(f"\nTuning hyperparameters for {best_model_name}...")
    best_model_tuned, best_params = run_random_search(classifiers[best_model_name], param_grid[best_model_name], X_train_top_10, y_train)
    
    print(f"Best parameters for {best_model_name}: {best_params}")
    
    # Evaluate the tuned model
    print(f"\nEvaluating {best_model_name} after hyperparameter tuning:")
    evaluate_model(best_model_tuned, X_test_top_10, y_test)



Tuning hyperparameters for RandomForest...
Best parameters for RandomForest: {'n_estimators': 50, 'min_samples_split': 5, 'max_depth': 10}

Evaluating RandomForest after hyperparameter tuning:
Accuracy: 0.5290666666666667
Precision: 0.6078221080923382
Recall: 0.5290666666666667
F1 Score: 0.5475795407339454

Classification Report:
              precision    recall  f1-score   support

        High       0.42      0.43      0.42      5055
         Low       1.00      0.60      0.75      4971
      Medium       0.41      0.56      0.47      4974

    accuracy                           0.53     15000
   macro avg       0.61      0.53      0.55     15000
weighted avg       0.61      0.53      0.55     15000


Confusion Matrix:
[[2165    0 2890]
 [ 840 2971 1160]
 [2174    0 2800]]


In [11]:
import joblib

# Save the best model
if best_model_tuned:
    joblib.dump(best_model_tuned, 'best_model.pkl')


In [12]:
import joblib

# Assuming top_10_features is a list or array of feature names
top_10_features = X.columns[top_10_indices].tolist()

# Save top features to a file
joblib.dump(top_10_features, 'top_10_features.pkl')

['top_10_features.pkl']