In [2]:
import pandas as pd
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, label_binarize
from sklearn.metrics import accuracy_score, precision_recall_curve, f1_score
from imblearn.under_sampling import RandomUnderSampler
from sklearn.multiclass import OneVsRestClassifier

In [4]:
df = pd.read_csv("APPRAISE_NETFLOW.csv").dropna()

# print(df.head())
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 7780805 entries, 0 to 7780804
Data columns (total 19 columns):
 #   Column            Dtype  
---  ------            -----  
 0   FLOW_ID           int64  
 1   IPV4_SRC_ADDR     object 
 2   IPV4_DST_ADDR     object 
 3   IN_PKTS           int64  
 4   IN_BYTES          int64  
 5   OUT_PKTS          int64  
 6   OUT_BYTES         int64  
 7   FIRST_SWITCHED    int64  
 8   LAST_SWITCHED     float64
 9   L4_SRC_PORT       float64
 10  L4_DST_PORT       float64
 11  TCP_FLAGS         float64
 12  PROTOCOL          float64
 13  PROTOCOL_MAP      object 
 14  TOTAL_FLOWS_EXP   float64
 15  L7_PROTO          float64
 16  L7_PROTO_NAME     object 
 17  ANOMALY_CATEGORY  object 
 18  ANOMALY           float64
dtypes: float64(8), int64(6), object(5)
memory usage: 1.2+ GB


In [None]:
def change_forth_octet(ip_addr):
    split_parts = ip_addr.split('.')
    split_parts[3] = '0'
    return '.'.join(split_parts)

df['IPV4_SRC_ADDR'] = df['IPV4_SRC_ADDR'].apply(change_forth_octet)
df['IPV4_DST_ADDR'] = df['IPV4_DST_ADDR'].apply(change_forth_octet)

# print(df.head())

In [None]:
def ip_to_float(ip):
    a,b,c,d = map(int, ip.split('.'))
    # bitwise left shift
    return (a << 24) + (b << 16) + (c << 8) + d

df['IPV4_SRC_ADDR'] = df['IPV4_SRC_ADDR'].apply(ip_to_float)
df['IPV4_DST_ADDR'] = df['IPV4_DST_ADDR'].apply(ip_to_float)

In [None]:
print(df.head())

In [None]:
mapping = {
    '-' : 0.0,
    'Reconnaissance' : 1.0,
    'BruteForce' : 2.0
}

df['ANOMALY_CLASS'] = df['ANOMALY_CATEGORY'].map(mapping)

# df.head()
df.info()

In [None]:
le = LabelEncoder()
df['L7_PROTO_NAME'] = le.fit_transform(df['L7_PROTO_NAME'])
df['PROTOCOL_MAP'] = le.fit_transform(df['PROTOCOL_MAP'])
target = df['ANOMALY_CLASS']

df.info()

In [None]:
drop_cols = df.drop(columns=[col for col in df.columns 
                             if col.startswith('ANOMALY') and col != 'ANOMALY_CLASS'])
drop_cols.info()

In [None]:
split_results = train_test_split(drop_cols, target, test_size=0.1, random_state=42,stratify=target)

X_train = split_results[0]
X_test = split_results[1]
Y_train = split_results[2]
Y_test = split_results[3]

In [None]:
mlp = MLPClassifier(hidden_layer_sizes=(1000,),activation='relu', solver='adam', alpha=0.0001, random_state=42, early_stopping=True, 
                    validation_fraction=0.1, verbose=False)

mlp.fit(X_train, Y_train)
y_prediction = mlp.predict(X_test)
accuracy = accuracy_score(Y_test, y_prediction)
print(accuracy)

In [None]:
# visualize
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Predict
y_pred = mlp.predict(X_test)

# Compute confusion matrix
cm = confusion_matrix(Y_test, y_prediction, labels=mlp.classes_)

# Display
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=mlp.classes_)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

In [None]:
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, roc_auc_score
# Receiver Operating Characteristic (ROC) graph

# Binarize y_test
classes = [0, 1, 2]
y_test_bin = label_binarize(Y_test, classes=classes)

# Get predicted probabilities for each class
y_scores = mlp.predict_proba(X_test) 

# Plot ROC for each class
for i, c in enumerate(classes):
    fpr, tpr, thresholds = roc_curve(y_test_bin[:, i], y_scores[:, i])
    auc = roc_auc_score(y_test_bin[:, i], y_scores[:, i])
    plt.plot(fpr, tpr, label=f'Class {c} (AUC={auc:.2f})')

plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Multiclass ROC Curve')
plt.legend()
plt.show()

In [None]:
for i, class_label in enumerate(classes):
    precision, recall, thresholds = precision_recall_curve(y_test_bin[:, i], y_scores[:, i])
    plt.plot(recall, precision, label=f'Class {class_label}')

plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve (One-vs-Rest)')
plt.legend()
plt.show()

In [None]:
f1 = f1_score(Y_test, y_prediction, average='micro')
print("F1 Score (binary):", f1)

In [None]:
mlp_2 = MLPClassifier(hidden_layer_sizes=(100,), activation='relu', solver='adam', batch_size='auto', 
                      learning_rate_init=0.005, max_iter=200, random_state=42, verbose=True, early_stopping=False, validation_fraction=0 )

In [None]:
mlp_2.fit(X_train, Y_train)

In [None]:
y_pred_2 = mlp_2.predict(X_test)
accuracy_2 = accuracy_score(Y_test, y_pred_2)
print(accuracy_2)

In [None]:
# visualize
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt


# Compute confusion matrix
cm = confusion_matrix(Y_test, y_pred_2, labels=mlp_2.classes_)

# Display
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=mlp_2.classes_)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

In [None]:
for i, class_label in enumerate(classes):
    precision, recall, thresholds = precision_recall_curve(y_test_bin[:, i], y_scores[:, i])
    plt.plot(recall, precision, label=f'Class {class_label}')

plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve (One-vs-Rest)')
plt.legend()
plt.show()

In [None]:
X = drop_cols.drop('ANOMALY_CLASS', axis=1)
y = drop_cols['ANOMALY_CLASS']    

resample = RandomUnderSampler(sampling_strategy='auto', random_state=42)

X_resampled, y_resampled = resample.fit_resample(X, y)

In [None]:
print(X_resampled.shape, y_resampled.shape)

In [None]:
split_results = train_test_split(X_resampled, y_resampled, test_size=0.05, random_state=42,stratify=y_resampled)

X_train = split_results[0]
X_test = split_results[1]
Y_train = split_results[2]
Y_test = split_results[3]

In [None]:
mlp_3 = MLPClassifier(hidden_layer_sizes=(128,64,32), activation='relu', solver='adam', batch_size='auto', 
                      learning_rate_init=0.005, max_iter=200, random_state=42, verbose=True, early_stopping=False, validation_fraction=0)

In [None]:
mlp_3.fit(X_train, Y_train)

In [None]:
y_pred_3 = mlp_3.predict(X_test)
accuracy_3 = accuracy_score(Y_test, y_pred_3)
print(accuracy_3)

In [None]:
f1 = f1_score(Y_test, y_pred_3, average='micro')
print("F1 Score (binary):", f1)

In [None]:
# visualize
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt


# Compute confusion matrix
cm = confusion_matrix(Y_test, y_pred_3, labels=mlp_3.classes_)

# Display
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=mlp_3.classes_)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

In [None]:
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, roc_auc_score
# Receiver Operating Characteristic (ROC) graph

# Binarize y_test
classes = [0, 1, 2]
y_test_bin = label_binarize(Y_test, classes=classes)

# Get predicted probabilities for each class
y_scores = mlp_3.predict_proba(X_test) 

# Plot ROC for each class
for i, c in enumerate(classes):
    fpr, tpr, thresholds = roc_curve(y_test_bin[:, i], y_scores[:, i])
    auc = roc_auc_score(y_test_bin[:, i], y_scores[:, i])
    plt.plot(fpr, tpr, label=f'Class {c} (AUC={auc:.2f})')

plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Multiclass ROC Curve')
plt.legend()
plt.show()

In [None]:
# prediction vs recall curve for multiclass

y_train_binary = label_binarize(Y_train, classes=classes)

ovr = OneVsRestClassifier(mlp_3)

In [None]:
ovr.fit(X_train, y_train_binary)

y_score = ovr.predict_proba(X_test)

In [None]:
for i, class_label in enumerate(classes):
    precision, recall, thresholds = precision_recall_curve(y_test_bin[:, i], y_score[:, i])
    plt.plot(recall, precision, label=f'Class {class_label}')

plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve (One-vs-Rest)')
plt.legend()
plt.show()