In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
import re
from sklearn import svm, datasets
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
from sklearn.multiclass import OneVsRestClassifier
from snowflake.snowpark.context import get_active_session

In [None]:
session = get_active_session()

In [None]:
def fetch_from_table_data(session,table_name):
    table_data = session.table(table_name)
    return table_data

data_h1=fetch_from_table_data(session,"H1")

In [None]:
def create_dataframe(data):
    df=data.to_pandas()
    return df

df_H1=create_dataframe(data_h1)

In [None]:
def preprocess(df, label_encoder=None, fit_label_encoder=True, scaler=None, fit_scaler=True):
    # Define a function to extract data within brackets or handle no bracket data
    def extract_request_type(text):
        match = re.search(r'\((.*?)\)', text)
        if match:
            return match.group(1)
        else:
            return "No specific request type"

    # Apply the function to the DataFrame
    df['REQUEST_TYPE'] = df['INFO'].apply(extract_request_type)

    # Additional feature based on interactions of Source and Destination
    df['SOURCE_DESTINATION'] = df['SOURCE'] + '_' + df['DESTINATION']

    # Encoding categorical variables
    categorical_columns = ['PROTOCOL', 'REQUEST_TYPE', 'SOURCE_DESTINATION']
    
    if label_encoder is None:
        # Initialize LabelEncoder for each categorical column
        label_encoder = {column: LabelEncoder() for column in categorical_columns}

    for column in categorical_columns:
        if fit_label_encoder:
            df[column] = label_encoder[column].fit_transform(df[column])
        else:
            df[column] = label_encoder[column].transform(df[column])
    
    # # Encode the target variable
    # if fit_label_encoder:
    #     df['TYPE_OF_ATTACK'] = label_encoder['TYPE_OF_ATTACK'].fit_transform(df['TYPE_OF_ATTACK'])
    # else:
    #     df['TYPE_OF_ATTACK'] = label_encoder['TYPE_OF_ATTACK'].transform(df['TYPE_OF_ATTACK'])

    df.drop(['NO', 'TYPE', 'INFO', 'SOURCE', 'DESTINATION'], axis=1, inplace=True)

    # Feature scaling
    if scaler is None:
        scaler = StandardScaler()

    if fit_scaler:
        df[df.columns] = scaler.fit_transform(df[df.columns])
    else:
        df[df.columns] = scaler.transform(df[df.columns])

    return df, label_encoder, scaler

In [None]:
def data_split_train_test(df):
    x=df.drop(['TYPE_OF_ATTACK'],axis=1)
    y=df['TYPE_OF_ATTACK']
    x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.2,random_state=42)
    return x_train,x_test,y_train,y_test

x_train_raw,x_test_raw,y_train_raw,y_test_raw=data_split_train_test(df_H1)


#preprocess the data
processed_x_train_df,label_encoder,scaler=preprocess(x_train_raw)
processed_x_test_df, _, _ = preprocess(x_test_raw)

#combine x and y for processing
train_df=pd.concat([processed_x_train_df,y_train_raw],axis=1)
test_df=pd.concat([processed_x_test_df,y_test_raw],axis=1)

In [None]:
# prepare data for modelling

def prepare_data_for_modeling(processed_df):
    x = processed_df.drop(['TYPE_OF_ATTACK'], axis=1)
    y = processed_df['TYPE_OF_ATTACK']

    # Encode the output as integer labels
    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(y)
    return x, y

x1_train, y1_train = prepare_data_for_modeling(train_df)
x1_test, y1_test = prepare_data_for_modeling(test_df)

In [None]:
# learn to predict each class
classifier=OneVsRestClassifier(svm.SVC(kernel='linear', probability=True))
y_score=classifier.fit(x1_train, y1_train).decision_function(x1_test)

In [None]:
classes = np.unique(y1_train)

# Binarize the output for ROC computation
y1_test_bin = label_binarize(y1_test, classes=classes)
 
#Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(len(classes)):
    fpr[i], tpr[i], _ = roc_curve(y1_test_bin[:, i], y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Compute macro-average ROC curve and ROC area
fpr["macro"], tpr["macro"], _ = roc_curve(y1_test_bin.ravel(), y_score.ravel())
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

In [None]:
import os

# Set a temporary directory path for Matplotlib configurations
os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib_config'

plt.figure()
plt.plot(fpr["macro"], tpr["macro"],
         label='macro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["macro"]))
for i in range(len(classes)):
    plt.plot(fpr[i], tpr[i], label='ROC curve of class {0} (area = {1:0.2f})'
                                   ''.format(i, roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Extension of Receiver operating characteristic to multi-class')
plt.legend(loc="lower right")
plt.show()

In [None]:
from sklearn.metrics import precision_score,confusion_matrix, recall_score, accuracy_score, f1_score
import seaborn as sns
y_pred_classes = np.argmax(y_score, axis=1)

# Calculate evaluation metrics
accuracy = accuracy_score(y1_test, y_pred_classes)
precision = precision_score(y1_test, y_pred_classes, average='weighted')
recall = recall_score(y1_test, y_pred_classes, average='weighted')
f1 = f1_score(y1_test, y_pred_classes, average='weighted')

# Print evaluation metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

# Compute confusion matrix
conf_matrix = confusion_matrix(y1_test, y_pred_classes)
print("Confusion Matrix:")
print(conf_matrix)

# Plot confusion matrix
plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=np.unique(y1_test), yticklabels=np.unique(y1_test))
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()