In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import seaborn as sns; sns.set()

In [3]:
from pyspark.sql import SparkSession
from bigdl.util.common import *
from bigdl.nn.layer import *
from bigdl.nn.criterion import BinaryCrossEntropy
from bigdl.optim.optimizer import *
from bigdl.util.common import Sample
from bigdl.transform.vision.image import *

from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

In [4]:
spark = (SparkSession
	.builder
	.appName("DS5110/CS5501: my awesome Spark program")
	.master(private_IPv4_addr_of_master:7077) # Change it to the private IP address of the master
	.config("spark.executor.memory", "1024M")
	.getOrCreate())

sc = spark.sparkContext
init_engine()  # Init BigDL Engine

In [6]:
data_normal = spark.read.csv('dataset/dataset_normal.csv', header=True, inferSchema=True)
data_attack = spark.read.csv('dataset/dataset_attack.csv', header=True, inferSchema=True)
data = data_normal.union(data_attack)
data = data.drop('ip.src', 'ip.dst', 'frame.protocols')
def label_transform(label):
    return 1.0 if label == 'normal' else 0.0
data = data.withColumn('label', col('class').apply(label_transform))
assembler = VectorAssembler(inputCols=features, outputCol='features')
data = assembler.transform(data)

In [7]:
def row_to_sample(row):
    features = Vectors.dense(row.features)
    label = row.label
    return Sample.from_ndarray(features, label)

rdd_samples = data.rdd.map(row_to_sample)
train_rdd, test_rdd = rdd_samples.randomSplit([0.8, 0.2]) # Divide training and predict datasets

In [8]:
def build_model():
    model = Sequential()
    model.add(Bidirectional(LSTM(64, activation='tanh')))
    model.add(Dense(128, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))
    return model

In [9]:
model = build_model()
optimizer = Optimizer(
    model=model,
    training_rdd=train_rdd,
    criterion=BinaryCrossEntropy(),
    optim_method=Adam(),
    end_trigger=MaxEpoch(40),
    batch_size=64
)

optimizer.set_validation(
    batch_size=64,
    val_rdd=test_rdd,
    trigger=EveryEpoch(),
    val_method=[Top1Accuracy()]
)

trained_model = optimizer.optimize()

In [27]:
predictions = trained_model.predict(test_rdd)
predicted_labels = predictions.map(lambda x: (float(x.argmax() + 1), float(x.label)))

metrics_rdd = predicted_labels.map(lambda x: (x[0], x[1], 1)).keyBy(lambda x: (x[0], x[1]))
counts = metrics_rdd.reduceByKey(lambda x, y: (x[0], x[1], x[2] + y[2])).map(lambda x: ((x[0][0], x[0][1]), x[1][2]))

counts_dict = counts.collectAsMap()
tp = counts_dict.get((1.0, 1.0), 0)
tn = counts_dict.get((0.0, 0.0), 0)
fp = counts_dict.get((1.0, 0.0), 0)
fn = counts_dict.get((0.0, 1.0), 0)

accuracy = (tp + tn) / (tp + fp + fn + tn)
precision = tp / (tp + fp) if (tp + fp) != 0 else 0
recall = tp / (tp + fn) if (tp + fn) != 0 else 0

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
to_heat_map = pd.DataFrame([[tn, fp], [fn, tp]], index=["Attack", "Normal"], columns=["Attack", "Normal"])

plt.figure(figsize=(10, 7))
sns.heatmap(to_heat_map, annot=True, fmt="d", cmap="Blues")
plt.title('Confusion Matrix')
plt.show()