In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pandas.api.types import is_numeric_dtype
import warnings
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn import tree
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
import optuna
from tabulate import tabulate
from sklearn.feature_selection import RFE
import itertools
from sklearn.preprocessing import RobustScaler
from sklearn.decomposition import PCA
from sklearn import svm
from sklearn import metrics
pd.set_option('display.max_columns',None)
 

In [None]:
# train=pd.read_csv('Train_data.csv')
data_train=pd.read_csv('KDDTest-21.txt')

In [None]:
columns = (['duration','protocol_type','service','flag','src_bytes','dst_bytes','land','wrong_fragment','urgent','hot'
,'num_failed_logins','logged_in','num_compromised','root_shell','su_attempted','num_root','num_file_creations'
,'num_shells','num_access_files','num_outbound_cmds','is_host_login','is_guest_login','count','srv_count','serror_rate'
,'srv_serror_rate','rerror_rate','srv_rerror_rate','same_srv_rate','diff_srv_rate','srv_diff_host_rate','dst_host_count','dst_host_srv_count'
,'dst_host_same_srv_rate','dst_host_diff_srv_rate','dst_host_same_src_port_rate','dst_host_srv_diff_host_rate','dst_host_serror_rate'
,'dst_host_srv_serror_rate','dst_host_rerror_rate','dst_host_srv_rerror_rate','outcome','level'])

In [None]:
data_train.columns = columns

In [None]:
data_train.info()

In [None]:
data_train.loc[data_train['outcome'] == "normal", "outcome"] = 'normal'
data_train.loc[data_train['outcome'] != 'normal', "outcome"] = 'attack'

In [None]:
def Scaling(df_num, cols):
    std_scaler = RobustScaler()
    std_scaler_temp = std_scaler.fit_transform(df_num)
    std_df = pd.DataFrame(std_scaler_temp, columns =cols)
    return std_df

In [None]:
cat_cols = ['is_host_login','protocol_type','service','flag','land', 'logged_in','is_guest_login', 'level', 'outcome']
def preprocess(dataframe):
    df_num = dataframe.drop(cat_cols, axis=1)
    num_cols = df_num.columns
    scaled_df = Scaling(df_num, num_cols)
    
    dataframe.drop(labels=num_cols, axis="columns", inplace=True)
    dataframe[num_cols] = scaled_df[num_cols]
    
    dataframe.loc[dataframe['outcome'] == "normal", "outcome"] = 0
    dataframe.loc[dataframe['outcome'] != 0, "outcome"] = 1
    
    dataframe = pd.get_dummies(dataframe, columns = ['protocol_type', 'service', 'flag'])
    return dataframe

In [None]:
scaled_train = preprocess(data_train)

In [None]:
x = scaled_train.drop(['outcome', 'level'] , axis = 1).values
y = scaled_train['outcome'].values
y_reg = scaled_train['level'].values

pca = PCA(n_components=20)
pca = pca.fit(x)
x_reduced = pca.transform(x)
print("Number of original features is {} and of reduced features is {}".format(x.shape[1], x_reduced.shape[1]))

y = y.astype('int')
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
x_train_reduced, x_test_reduced, y_train_reduced, y_test_reduced = train_test_split(x_reduced, y, test_size=0.2, random_state=42)
x_train_reg, x_test_reg, y_train_reg, y_test_reg = train_test_split(x, y_reg, test_size=0.2, random_state=42)

In [None]:
kernal_evals = dict()
def evaluate_classification(model, name, X_train, X_test, y_train, y_test):
    train_accuracy = metrics.accuracy_score(y_train, model.predict(X_train))
    test_accuracy = metrics.accuracy_score(y_test, model.predict(X_test))
    
    train_precision = metrics.precision_score(y_train, model.predict(X_train))
    test_precision = metrics.precision_score(y_test, model.predict(X_test))
    train_recall = metrics.recall_score(y_train, model.predict(X_train))
    test_recall = metrics.recall_score(y_test, model.predict(X_test))
    
    train_f1 = metrics.f1_score(y_train, model.predict(X_train))
    test_f1 = metrics.f1_score(y_test, model.predict(X_test))
    kernal_evals[str(name)] = [train_accuracy, test_accuracy, train_precision, test_precision, train_recall, test_recall, train_f1, test_f1]
    print("Training Accuracy " + str(name) + " {}  Test Accuracy ".format(train_accuracy*100) + str(name) + " {}".format(test_accuracy*100))
    print("Training Precesion " + str(name) + " {}  Test Precesion ".format(train_precision*100) + str(name) + " {}".format(test_precision*100))
    print("Training Recall " + str(name) + " {}  Test Recall ".format(train_recall*100) + str(name) + " {}".format(test_recall*100))
    print("Training F1 Score " + str(name) + " {}  Test F1 Score ".format(train_f1*100) + str(name) + " {}".format(test_f1*100))
    actual = y_test
    predicted = model.predict(X_test)
    confusion_matrix = metrics.confusion_matrix(actual, predicted)
    cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = ['normal', 'attack'])

    fig, ax = plt.subplots(figsize=(10,10))
    ax.grid(False)
    cm_display.plot(ax=ax)
#     ax.set_title(f'Confusion Matrix for {name}')
    plt.show()
    

In [None]:
knn = KNeighborsClassifier().fit(x_train, y_train)
evaluate_classification(knn, "K Neighbors Classifier", x_train, x_test, y_train, y_test)

In [None]:
dt = DecisionTreeClassifier(max_depth=3).fit(x_train, y_train)
tdt = DecisionTreeClassifier().fit(x_train, y_train)
evaluate_classification(tdt, "DecisionTreeClassifier", x_train, x_test, y_train, y_test)



In [None]:
rf = RandomForestClassifier().fit(x_train, y_train)
evaluate_classification(rf, "RandomForestClassifier", x_train, x_test, y_train, y_test)

# KNN

In [None]:
def objective(trial):
    n_neighbors = trial.suggest_int('KNN_n_neighbors', 2, 16, log=False)
    classifier_obj = KNeighborsClassifier(n_neighbors=n_neighbors)
    classifier_obj.fit(x_train, y_train)
    accuracy = classifier_obj.score(x_test, y_test)
    return accuracy

In [None]:
study_KNN = optuna.create_study(direction='maximize')
study_KNN.optimize(objective, n_trials=1)
print(study_KNN.best_trial)

In [None]:
KNN_model = KNeighborsClassifier(n_neighbors=study_KNN.best_trial.params['KNN_n_neighbors'])
KNN_model.fit(x_train, y_train)


distances, indices = KNN_model.kneighbors(x_train)


distance_threshold = 2.0  

anomaly_indices_knn = [i for i, dists in enumerate(distances) if max(dists) > distance_threshold]

print("Anomaly indices:", anomaly_indices_knn)

KNN_train, KNN_test = KNN_model.score(x_train, y_train), KNN_model.score(x_test, y_test)

print(f"Train Score: {KNN_train}")
print(f"Test Score: {KNN_test}")

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.hist(distances.flatten(), bins=50, color='skyblue', edgecolor='black')
plt.title('Distances to K Nearest Neighbors')
plt.xlabel('Distance')
plt.ylabel('Frequency')
plt.axvline(x=distance_threshold, color='red', linestyle='--', label='Threshold')
plt.legend()
plt.show()



# Decision Tree Classifier

In [None]:
def objective(trial):
    dt_max_depth = trial.suggest_int('dt_max_depth', 2, 32, log=False)
    dt_max_features = trial.suggest_int('dt_max_features', 2, 10, log=False)
    classifier_obj = DecisionTreeClassifier(max_features = dt_max_features, max_depth = dt_max_depth)
    classifier_obj.fit(x_train, y_train)
    accuracy = classifier_obj.score(x_test, y_test)
    return accuracy

In [None]:
study_dt = optuna.create_study(direction='maximize')
study_dt.optimize(objective, n_trials=30)
print(study_dt.best_trial)

In [None]:
dt = DecisionTreeClassifier(max_features = study_dt.best_trial.params['dt_max_features'], max_depth = study_dt.best_trial.params['dt_max_depth'])
dt.fit(x_train, y_train)

train_predictions = dt.predict(x_train)


leaf_indices = dt.apply(x_train)


leaf_counts = pd.Series(leaf_indices).value_counts()

threshold = 10

anomaly_leaf_nodes = leaf_counts[leaf_counts < threshold].index


anomaly_indices_dt = [i for i, node in enumerate(leaf_indices) if node in anomaly_leaf_nodes]

print("Anomaly indices:", anomaly_indices_dt)

dt_train, dt_test = dt.score(x_train, y_train), dt.score(x_test, y_test)

print(f"Train Score: {dt_train}")
print(f"Test Score: {dt_test}")

In [None]:
import matplotlib.pyplot as plt


plt.figure(figsize=(10, 6))
plt.bar(leaf_counts.index, leaf_counts.values, color='skyblue', edgecolor='black')
plt.title('Occurrences of Leaf Nodes')
plt.xlabel('Leaf Node Index')
plt.ylabel('Occurrences')
plt.axhline(y=threshold, color='red', linestyle='--', label='Threshold')
plt.legend()
plt.show()


In [None]:
fig = plt.figure(figsize = (30,12))
tree.plot_tree(dt, filled=True);
plt.show()

# Random Forest 

In [None]:
def objective(trial):
    rf_max_depth = trial.suggest_int('rf_max_depth', 2, 32, log=False)
    rf_max_features = trial.suggest_int('rf_max_features', 2, 10, log=False)
    rf_n_estimators = trial.suggest_int('rf_n_estimators', 3, 20, log=False)
    classifier_obj = RandomForestClassifier(max_features = rf_max_features, max_depth = rf_max_depth, n_estimators = rf_n_estimators)
    classifier_obj.fit(x_train, y_train)
    accuracy = classifier_obj.score(x_test, y_test)
    return accuracy


In [None]:
study_rf = optuna.create_study(direction='maximize')
study_rf.optimize(objective, n_trials=30)
print(study_rf.best_trial)

In [None]:
rf = RandomForestClassifier(max_features = study_rf.best_trial.params['rf_max_features'], max_depth = study_rf.best_trial.params['rf_max_depth'], n_estimators = study_rf.best_trial.params['rf_n_estimators'])
rf.fit(x_train, y_train)

leaf_indices_rf = dt.apply(x_train)
leaf_indices_rf_flat = leaf_indices_rf.flatten()

leaf_counts_rf = pd.Series(leaf_indices_rf_flat).value_counts()

threshold = 10

anomaly_leaf_nodes_rf = leaf_counts_rf[leaf_counts_rf < threshold].index


anomaly_indices_rf = [i for i, node in enumerate(leaf_indices_rf_flat) if node in anomaly_leaf_nodes_rf]


print("Anomaly indices :", anomaly_indices_rf)

rf_train, rf_test = rf.score(x_train, y_train), rf.score(x_test, y_test)

print(f"Train Score: {rf_train}")
print(f"Test Score: {rf_test}")

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.bar(leaf_counts_rf.index, leaf_counts_rf.values, color='skyblue', edgecolor='black')
plt.title('Occurrences of Leaf Nodes for Random Forest')
plt.xlabel('Leaf Node Index')
plt.ylabel('Occurrences')
plt.axhline(y=threshold, color='red', linestyle='--', label='Threshold')
plt.legend()
plt.show()


# Final Summary

In [None]:
data = [["KNN", KNN_train, KNN_test], 
        ["Decision Tree", dt_train, dt_test], 
        ["Random Forest", rf_train, rf_test], 
       ]

col_names = ["Model", "Train Accuracy", "Test Accuracy",]
print(tabulate(data, headers=col_names, tablefmt="fancy_grid"))