## Setup

In [1]:
import pandas as pd
import numpy as np
from imblearn.over_sampling import SMOTE
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.utils import resample
from sklearn.tree import DecisionTreeClassifier

## Load_Dataset

In [2]:
data = pd.read_csv("Final_data.csv")

## Exploratory_Data_Analysis

In [3]:
data['Total_Bytes'] = pd.to_numeric(data['Total_Bytes'], errors='coerce')

df_normal = data[data['Label'] == 'normal']
df_attack = data[data['Label'] == 'attack']

df_attack_oversampled = resample(df_attack , replace=True ,  n_samples=int(0.8 * len(df_normal)),  random_state=42)  

combined_df = pd.concat([df_normal, df_attack_oversampled])

df = combined_df.sample(frac=1, random_state=42).reset_index(drop=True)

df = df[(df['Protocol'] == 0) | (df['Protocol'] == 1)]
df = df[df['Source'].isin([f'10.0.0.{i}' for i in range(2, 12)])]

mapping2 = {'normal': 0, 'attack': 1}
df['Label'] = df['Label'].map(mapping2).astype(int)  

df.drop(['Source', 'Destination', 'Flow_Count_Per_Source'], axis=1, inplace=True)

if df.isnull().values.any():
    imputer = SimpleImputer(strategy='mean')  
    df[df.columns] = imputer.fit_transform(df[df.columns])

## Essential_Functions

In [4]:
def add_outliers(df, n_outliers=1000):
    outliers = pd.DataFrame({
        'Total_Bytes': np.random.uniform(low=df['Total_Bytes'].max() + 1000, high=df['Total_Bytes'].max() + 5000, size=n_outliers),
        'Label': np.random.choice([0, 1], size=n_outliers)
    })
    return pd.concat([df, outliers], ignore_index=True)

In [5]:
def add_complex_noise(X, noise_level=0.2):
    noise = np.random.uniform(-noise_level, noise_level, X.shape)
    return X + noise

In [6]:
def scale_1(dataframe, oversampling=True, noise_level=0.2):
    X = dataframe.drop('Label', axis=1).values
    y = dataframe['Label'].values

    X = add_complex_noise(X, noise_level=noise_level)

    if np.any(np.isnan(X)):
        imputer = SimpleImputer(strategy='mean')
        X = imputer.fit_transform(X)

    if oversampling:
        smote = SMOTE(random_state=42, sampling_strategy=0.6)  
        X, y = smote.fit_resample(X, y)

    return X, y



## Model_training

In [7]:
df_with_outliers = add_outliers(df, n_outliers=1000)

train_df, test_df = train_test_split(df_with_outliers, test_size=0.3, random_state=42)

In [8]:
train_X, train_y = scale_1(train_df, oversampling=True, noise_level=0.2)
test_X, test_y = scale_1(test_df, oversampling=False, noise_level=0.2)

In [9]:
train_y = train_y.astype(int)
test_y = test_y.astype(int)

In [10]:
logistic_regression = LogisticRegression(penalty='l2', C=0.5, random_state=42)
logistic_regression.fit(train_X, train_y)

In [13]:
y_pred_lr = logistic_regression.predict(test_X)
accuracy_lr = accuracy_score(test_y, y_pred_lr)
precision_lr = precision_score(test_y, y_pred_lr, zero_division=0)
recall_lr = recall_score(test_y, y_pred_lr, zero_division=0)
f1_lr = f1_score(test_y, y_pred_lr, zero_division=0)
print("Custom HGBPLDT Model Metrics:")
print(f"Accuracy: {accuracy_lr*100}")
print(f"Precision: {precision_lr*100}")
print(f"Recall: {recall_lr*100}")
print(f"F1 Score: {f1_lr*100}")

Custom HGBPLDT Model Metrics:
Accuracy: 99.9612152755514
Precision: 99.91918974248465
Recall: 99.94718917054665
F1 Score: 99.93318749528541


In [14]:
class HGBPLDT:
    def __init__(self, max_depth=3, n_estimators=10, learning_rate=0.1):
        self.max_depth = max_depth
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.trees = []

    def fit(self, X_train, y_train):
        features = self.sliding_window(X_train)
        hist = self.histogram(features)
        for _ in range(self.n_estimators):
            tree = self.build_tree(X_train, y_train)
            self.trees.append(tree)

    def predict(self, X_test):
        predictions = np.zeros((X_test.shape[0], self.n_estimators))
        for i, tree in enumerate(self.trees):
            predictions[:, i] = tree.predict(X_test)

        y_pred = np.mean(predictions, axis=1)
        return np.round(y_pred).astype(int)

    def sliding_window(self, X):
        win_size = 20
        step = 4
        features = []
        for i in range(0, len(X) - win_size + 1, step):
            window = X[i:i + win_size]
            feature = self.calculate_features(window)
            features.append(feature)
        return features

    def calculate_features(self, window):
        if np.max(window) - np.min(window) == 0:
            return np.zeros(4)
        window = (window - np.min(window)) / (np.max(window) - np.min(window))

        mean = np.mean(window)
        coef_variation = np.std(window) / mean if mean != 0 else 0
        madiff = np.mean(np.abs(np.diff(window)))
        wavelen = np.sum(np.abs(np.diff(window)))

        return np.array([mean, coef_variation, madiff, wavelen])

    def histogram(self, features):
        hist = []
        for feature in features:
            bins = np.percentile(feature, [0, 25, 50, 75, 100])
            hist.append(bins)
        return hist

    def build_tree(self, X_train, y_train):
        tree = DecisionTreeClassifier(max_depth=self.max_depth, class_weight='balanced')
        tree.fit(X_train, y_train)
        return tree

In [15]:
model = HGBPLDT(max_depth=3, n_estimators=10, learning_rate=0.1)
model.fit(train_X, train_y)

In [16]:
y_pred_custom = model.predict(test_X)

In [17]:
accuracy_custom = accuracy_score(test_y, y_pred_custom)
precision_custom = precision_score(test_y, y_pred_custom, zero_division=0)
recall_custom = recall_score(test_y, y_pred_custom, zero_division=0)
f1_custom = f1_score(test_y, y_pred_custom, zero_division=0)
print(f"Accuracy: {accuracy_custom *100}")


Accuracy: 99.97654149730931
