In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("sparkIDS").getOrCreate()
sc = spark.sparkContext
spark

In [2]:
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.stat import Correlation
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
import pyarrow as pa
from pyspark.sql.types import DataType
from pyspark.sql.functions import col
from pyspark.sql.functions import split
from pyspark.sql.dataframe import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression

In [3]:
df_col = spark.read.csv('NUSW-NB15_features.csv', header="true", inferSchema=False)
#resolving the columns containing features to a list in the order they appear to use as headers for the dataframe
distinct_names = df_col.select(col("`No.`").cast("int"), "Name").distinct()
ordered_dataset_names = [row.Name for row in distinct_names.orderBy("`No.`").collect()]

In [4]:
dfs = []
csv_files = ["C:/users/python/UNSW-NB15_1.csv", "C:/users/python/UNSW-NB15_2.csv", "C:/users/python/UNSW-NB15_3.csv", "C:/users/python/UNSW-NB15_4.csv"]

for csv_file in csv_files:
    df = spark.read.option('header', False).csv(csv_file)
    renamed_df = df.toDF(*ordered_dataset_names)
    dfs.append(renamed_df)

def union_all(*dfss):
    return reduce(DataFrame.unionAll, dfss)


dataframe = union_all(*dfs)

In [5]:
dataframe = dataframe.na.fill('normal', ['attack_cat'])
dataframe = dataframe.na.fill('0', ['ct_flw_http_mthd'])
dataframe = dataframe.na.fill('0', ['is_ftp_login'])
dataframe = dataframe.withColumn("is_ftp_login", F.when(F.col("is_ftp_login") > 1, 1).otherwise(F.col("is_ftp_login")))
dataframe = dataframe.withColumn("service", F.when(F.col("service") == "-", None).otherwise(F.col("service")))
dataframe = dataframe.fillna("unknown", ["service"])
dataframe = dataframe.fillna("0", ["sport"])
dataframe = dataframe.fillna("0", ["dsport"])

In [6]:
#resolving the ip address columns from string to numeric type while preserving the value
dataframe = dataframe.withColumn("srcip_int",split(col("srcip"),"\.")[0]*16777216 +split(col("srcip"),"\.")[1]*65536+ split(col("srcip"),"\.")[2]*256 + split(col("srcip"),"\.")[3])
dataframe = dataframe.withColumn("dstip_int",split(col("dstip"),"\.")[0]*16777216 +split(col("dstip"),"\.")[1]*65536+ split(col("dstip"),"\.")[2]*256 + split(col("dstip"),"\.")[3])

In [7]:
columns_to_convert = ["dur", "dload", "sload", "sjit", "djit", "Sintpkt", "Dintpkt", "tcprtt", "synack", "ackdat"]
for col_name in columns_to_convert:
    dataframe = dataframe.withColumn(col_name, dataframe[col_name].cast("double"))

In [8]:
#specifying the columns to exclude from the transformation of dataframe from string type to double type
#'srcip','dstip' will be dropped so no need to transform that, 'srcip_int' and 'dstip_int' have been transformed to int type, the rest are nominal datatypes that will be transformed later user stringindexer
columns_to_exclude = ["srcip", "dstip", "srcip_int","dstip_int","proto", "state", "service","attack_cat", "dur", "dload", "sload", "sjit", "djit", "Sintpkt" , "Dintpkt","tcprtt", "synack", "ackdat"]

In [9]:
#performing stringindexing of the nominal datatypes
for col_name, col_type in dataframe.dtypes:
    if col_name not in columns_to_exclude:
        dataframe = dataframe.withColumn(col_name, dataframe[col_name].cast("int"))
columns_to_convert = ["proto", "state", "service", "attack_cat"]
indexers = [StringIndexer(inputCol="proto", outputCol="proto_index", handleInvalid="skip"), StringIndexer(inputCol="state", outputCol="state_index", handleInvalid="skip"),StringIndexer(inputCol="service", outputCol="service_index",  handleInvalid="skip"),StringIndexer(inputCol="attack_cat", outputCol="attackcat_index",  handleInvalid="skip")]
pipeline = Pipeline(stages=indexers)
dataframe_r = pipeline.fit(dataframe).transform(dataframe)

In [10]:
#columns to drop after transformation, ct_ftp_cmd was dropped due to having about half null values, label is for binary classification
cols = ("srcip", "dstip", "proto","state","service", "Label","attack_cat", "ct_ftp_cmd")

In [11]:
dataframe_d = dataframe_r.drop(*cols)

In [12]:
training, test = dataframe_d.randomSplit([0.8, 0.2], seed=123)

In [13]:
#highly correlated features,srcip,dstip,sport, and dsport will ideally not be available in training data
columns_to_drop = ('ct_srv_dst', 'Ltime', 'ct_dst_src_ltm', 'ct_src_ltm', 'ct_src_dport_ltm', 'dloss', 'Dpkts', 'dwin', 'dtcpb', 'srcip', 'dstip', 'sport', 'dsport', 'srcip_int', 'dstip_int')

In [14]:
training_d = training.drop(*map(str, columns_to_drop))

In [15]:
#standardizing the dataset and excluding the label
columns = training_d.columns
column_to_exclude = 'attackcat_index'

In [16]:
feature_columns = [col for col in columns if col != column_to_exclude]
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features', handleInvalid='skip')
temptraining = assembler.transform(training_d)
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
training = scaler.fit(temptraining).transform(temptraining)

In [17]:
test_d = test.drop(*map(str, columns_to_drop))
feature_columns = [col for col in columns if col != column_to_exclude]
temptest = assembler.transform(test_d)
test = scaler.fit(temptest).transform(temptest)

In [18]:
from pyspark.ml.classification import GBTClassifier


In [19]:
training = training.withColumnRenamed('attackcat_index', 'label')
test = test.withColumnRenamed('attackcat_index', 'label')

In [20]:
from pyspark.ml.classification import DecisionTreeClassifier


In [21]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='scaled_features')
dt_param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .build()

tvs_dt = TrainValidationSplit(estimator=dt,
                              estimatorParamMaps=dt_param_grid,
                              evaluator=MulticlassClassificationEvaluator(),
                              trainRatio=0.8)
model_dt = tvs_dt.fit(training)
best_model_dt = model_dt.bestModel
predictions_gbt = best_model_dt.transform(test)


In [22]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol= 'label')

In [23]:
f1_score = evaluator.evaluate(predictions_gbt, {evaluator.metricName: "f1"})
print("F1 Score:", f1_score)
far = 1.0 - evaluator.evaluate(predictions_gbt, {evaluator.metricName: "weightedPrecision"})
print("False Alarm Rate:", far)

F1 Score: 0.848809872077377
False Alarm Rate: 0.11313360586306831


In [24]:
exclude_columns = ['features', 'scaled_features', 'label']
feature_names = [col for col in training.columns if col not in exclude_columns]


In [25]:
#using decision trees for feature importance
decision_tree_model = best_model_dt
feature_importances = decision_tree_model.featureImportances
feature_importance_dict = {}
for feature_name, importance in zip(feature_names, feature_importances):
    feature_importance_dict[feature_name] = importance

sorted_feature_importance = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)
print("Feature Importances:")
for feature_name, importance in sorted_feature_importance:
    print(f"{feature_name}: {importance}")

Feature Importances:
sttl: 0.7320935596824775
service_index: 0.1370838452072771
dttl: 0.031000957989280056
sbytes: 0.023752517744943507
proto_index: 0.02268114955074515
ct_srv_src: 0.009636370650814737
Stime: 0.008530330772837918
smeansz: 0.008026258421197034
dbytes: 0.006063649398807711
dmeansz: 0.0051253892265957736
state_index: 0.002352810767078016
ct_state_ttl: 0.0019349453048696937
dur: 0.001648214829367059
Spkts: 0.001352389724986748
sjit: 0.001082460432448507
sloss: 0.0009740812747460256
res_bdy_len: 0.0009246278538369211
stcpb: 0.0008680793182108881
ct_src_ ltm: 0.000723822859696204
Dintpkt: 0.0007116762644687317
Sintpkt: 0.000658882505671274
sload: 0.0005787191990122492
ct_dst_sport_ltm: 0.0005103463492835337
ct_dst_ltm: 0.0005084745234971394
djit: 0.00046996171764433276
dload: 0.00024587426891590645
trans_depth: 0.00024503521978918403
swin: 0.00014580618879300112
ct_flw_http_mthd: 2.7815107688340067e-05
is_ftp_login: 2.6000304953373218e-05
tcprtt: 1.5947340066563632e-05
synac

In [26]:
rf = RandomForestClassifier(labelCol='label', featuresCol='scaled_features')
rf_param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

tvs_rf = TrainValidationSplit(estimator=rf,
                              estimatorParamMaps=rf_param_grid,
                              evaluator=MulticlassClassificationEvaluator(),
                              trainRatio=0.8)

model_rf = tvs_rf.fit(training)
best_model_rf = model_rf.bestModel
predictions_rf = best_model_rf.transform(test)

In [27]:
f1_score = evaluator.evaluate(predictions_rf, {evaluator.metricName: "f1"})
print("F1 Score:", f1_score)
far = 1.0 - evaluator.evaluate(predictions_rf, {evaluator.metricName: "weightedPrecision"})
print("False Alarm Rate:", far)

F1 Score: 0.9728624880051581
False Alarm Rate: 0.02470204118508701


In [29]:
#dropping columns less than 0.0009
drop_columns = ('stcpb','ct_src_ ltm','Dintpkt','Sintpkt','sload','ct_dst_sport_ltm','ct_dst_ltm','djit','dload','trans_depth','swin','ct_flw_http_mthd','is_ftp_login','tcprtt','synack',
'ackdat','is_sm_ips_ports')


In [30]:
training_sf = training_d.drop(*map(str, drop_columns))

In [31]:
columns = training_sf.columns
column_to_exclude = 'attackcat_index'
feature_columns = [col for col in columns if col != column_to_exclude]
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features', handleInvalid='skip')
temptraining = assembler.transform(training_sf)
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
training = scaler.fit(temptraining).transform(temptraining)

In [32]:
test_sf = test_d.drop(*map(str, drop_columns))

In [33]:
temptest = assembler.transform(test_sf)
test = scaler.fit(temptest).transform(temptest)

In [34]:
training = training.withColumnRenamed('attackcat_index', 'label')
test = test.withColumnRenamed('attackcat_index', 'label')

In [35]:
model_dt = tvs_dt.fit(training)
best_model_dt = model_dt.bestModel
predictions_dt = best_model_dt.transform(test)
f1_score = evaluator.evaluate(predictions_dt, {evaluator.metricName: "f1"})
print("F1 Score:", f1_score)
far = 1.0 - evaluator.evaluate(predictions_dt, {evaluator.metricName: "weightedPrecision"})
print("False Alarm Rate:", far)


F1 Score: 0.9736801465015182
False Alarm Rate: 0.023492641744311538


In [37]:
training_rf = training_d.drop(*map(str, drop_columns))

In [38]:
columns = training_rf.columns
column_to_exclude = 'attackcat_index'
feature_columns = [col for col in columns if col != column_to_exclude]
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features', handleInvalid='skip')
temptraining = assembler.transform(training_rf)
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
training = scaler.fit(temptraining).transform(temptraining)

In [39]:
test_rf = test_d.drop(*map(str, drop_columns))

In [40]:
temptest = assembler.transform(test_rf)
test = scaler.fit(temptest).transform(temptest)

In [41]:
training = training.withColumnRenamed('attackcat_index', 'label')
test = test.withColumnRenamed('attackcat_index', 'label')

In [42]:
model_rf = tvs_rf.fit(training)
best_model_rf = model_rf.bestModel
predictions_rf = best_model_rf.transform(test)
f1_score = evaluator.evaluate(predictions_rf, {evaluator.metricName: "f1"})
print("F1 Score:", f1_score)
far = 1.0 - evaluator.evaluate(predictions_rf, {evaluator.metricName: "weightedPrecision"})
print("False Alarm Rate:", far)


F1 Score: 0.9762279644088143
False Alarm Rate: 0.021888466843271503


In [44]:
training_d.show(5)

+--------+------+------+----+----+-----+-----------+-----+-----+----+-----+-------+-------+-----------+-----------+-------+----+----------+----------+-------+------+------+------+---------------+------------+----------------+------------+----------+----------+-----------+----------------+-----------+-----------+-------------+---------------+
|     dur|sbytes|dbytes|sttl|dttl|sloss|      sload|dload|Spkts|swin|stcpb|smeansz|dmeansz|trans_depth|res_bdy_len|   sjit|djit|     Stime|   Sintpkt|Dintpkt|tcprtt|synack|ackdat|is_sm_ips_ports|ct_state_ttl|ct_flw_http_mthd|is_ftp_login|ct_srv_src|ct_dst_ltm|ct_src_ ltm|ct_dst_sport_ltm|proto_index|state_index|service_index|attackcat_index|
+--------+------+------+----+----+-----+-----------+-----+-----+----+-----+-------+-------+-----------+-----------+-------+----+----------+----------+-------+------+------+------+---------------+------------+----------------+------------+----------+----------+-----------+----------------+-----------+-----------