# **Loading data**

In [11]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, \
                                        LogisticRegressionModel, RandomForestClassificationModel, GBTClassificationModel
from time import time

spark = SparkSession.builder.appName('SIMARGL2021').getOrCreate()

# **Data preprocessing**

In [12]:
base_df = spark.read.csv("SIMARGL2021.csv", header=True, inferSchema=True)
base_df.show(2)

+----------------+---------+-----------------------+--------------+--------------+-------------------+--------------------------+--------------------------+---------------------+------------+--------+---------------------+-----------------------+--------------+------------+--------+-------+--------------+--------------+-----------+-----------+-------------+--------------+--------------+---------------+----------------+---------+--------+--------+------------+----------------------+---------------------+-----------------------+----------------------+-----------------------+---------+--------------+---------------+--------------+---------------+--------------+---------------+----------------+-----------------+-------+-------+--------------+-----------------+---------------+-----------+
|BIFLOW_DIRECTION|DIRECTION|DST_TO_SRC_SECOND_BYTES|FIREWALL_EVENT|FIRST_SWITCHED|FLOW_ACTIVE_TIMEOUT|FLOW_DURATION_MICROSECONDS|FLOW_DURATION_MILLISECONDS|FLOW_END_MILLISECONDS|FLOW_END_SEC| FLOW_ID|FLOW_

In [13]:
df = base_df
df.printSchema()

root
 |-- BIFLOW_DIRECTION: integer (nullable = true)
 |-- DIRECTION: integer (nullable = true)
 |-- DST_TO_SRC_SECOND_BYTES: string (nullable = true)
 |-- FIREWALL_EVENT: integer (nullable = true)
 |-- FIRST_SWITCHED: integer (nullable = true)
 |-- FLOW_ACTIVE_TIMEOUT: integer (nullable = true)
 |-- FLOW_DURATION_MICROSECONDS: integer (nullable = true)
 |-- FLOW_DURATION_MILLISECONDS: integer (nullable = true)
 |-- FLOW_END_MILLISECONDS: long (nullable = true)
 |-- FLOW_END_SEC: integer (nullable = true)
 |-- FLOW_ID: integer (nullable = true)
 |-- FLOW_INACTIVE_TIMEOUT: integer (nullable = true)
 |-- FLOW_START_MILLISECONDS: long (nullable = true)
 |-- FLOW_START_SEC: integer (nullable = true)
 |-- FRAME_LENGTH: integer (nullable = true)
 |-- IN_BYTES: integer (nullable = true)
 |-- IN_PKTS: integer (nullable = true)
 |-- IPV4_DST_ADDR: string (nullable = true)
 |-- IPV4_SRC_ADDR: string (nullable = true)
 |-- L4_DST_PORT: integer (nullable = true)
 |-- L4_SRC_PORT: integer (nullable

In [14]:
df.count()

8637207

In [15]:
df2 = df.na.drop()
df2.count()

8637207

In [16]:
from pyspark.ml.feature import StringIndexer

si = StringIndexer(inputCols=["IPV4_SRC_ADDR", "IPV4_DST_ADDR", "PROTOCOL_MAP", "L7_PROTO_NAME", "LABEL"], outputCols=["IPV4_SRC_ADDR_Index", "IPV4_DST_ADDR_Index", "PROTOCOL_MAP_Index", "L7_PROTO_NAME_Index", "LABEL_Index"])
si_fit = si.fit(df)


df_indexed = si_fit.transform(df)

In [17]:
df = df_indexed.drop("LABEL", "IPV4_SRC_ADDR", "IPV4_DST_ADDR", "DST_TO_SRC_SECOND_BYTES", "PROTOCOL_MAP", "SRC_TO_DST_SECOND_BYTES", "L7_PROTO_NAME", "BIFLOW_DIRECTION", "DIRECTION", "FLOW_ACTIVE_TIMEOUT", "FIREWALL_EVENT", "FLOW_INACTIVE_TIMEOUT", "FRAME_LENGTH", "MAX_IP_PKT_LEN", "MIN_IP_PKT_LEN", "SAMPLING_INTERVAL", "FLOW_START_SEC", "FLOW_END_SEC", "FLOW_DURATION_MICROSECONDS", "FLOW_ID", "OOORDER_IN_PKTS", "OOORDER_OUT_PKTS")
df.printSchema()

root
 |-- FIRST_SWITCHED: integer (nullable = true)
 |-- FLOW_DURATION_MILLISECONDS: integer (nullable = true)
 |-- FLOW_END_MILLISECONDS: long (nullable = true)
 |-- FLOW_START_MILLISECONDS: long (nullable = true)
 |-- IN_BYTES: integer (nullable = true)
 |-- IN_PKTS: integer (nullable = true)
 |-- L4_DST_PORT: integer (nullable = true)
 |-- L4_SRC_PORT: integer (nullable = true)
 |-- LAST_SWITCHED: integer (nullable = true)
 |-- OUT_BYTES: long (nullable = true)
 |-- OUT_PKTS: integer (nullable = true)
 |-- PROTOCOL: integer (nullable = true)
 |-- RETRANSMITTED_IN_BYTES: integer (nullable = true)
 |-- RETRANSMITTED_IN_PKTS: integer (nullable = true)
 |-- RETRANSMITTED_OUT_BYTES: integer (nullable = true)
 |-- RETRANSMITTED_OUT_PKTS: integer (nullable = true)
 |-- TCP_FLAGS: integer (nullable = true)
 |-- TCP_WIN_MAX_IN: integer (nullable = true)
 |-- TCP_WIN_MAX_OUT: integer (nullable = true)
 |-- TCP_WIN_MIN_IN: integer (nullable = true)
 |-- TCP_WIN_MIN_OUT: integer (nullable = tru

# Correlation Matrix

In [18]:
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

corr = Correlation.corr(df_vector, vector_col)

matrix = corr.collect()[0][0] 
corr_matrix = matrix.toArray().tolist() 
corr_matrix_df = pd.DataFrame(data=corr_matrix, columns = df.columns, index=df.columns) 
corr_matrix_df.style.background_gradient(cmap='coolwarm')

Unnamed: 0,FIRST_SWITCHED,FLOW_DURATION_MILLISECONDS,FLOW_END_MILLISECONDS,FLOW_START_MILLISECONDS,IN_BYTES,IN_PKTS,L4_DST_PORT,L4_SRC_PORT,LAST_SWITCHED,OUT_BYTES,OUT_PKTS,PROTOCOL,RETRANSMITTED_IN_BYTES,RETRANSMITTED_IN_PKTS,RETRANSMITTED_OUT_BYTES,RETRANSMITTED_OUT_PKTS,TCP_FLAGS,TCP_WIN_MAX_IN,TCP_WIN_MAX_OUT,TCP_WIN_MIN_IN,TCP_WIN_MIN_OUT,TCP_WIN_MSS_IN,TCP_WIN_MSS_OUT,TCP_WIN_SCALE_IN,TCP_WIN_SCALE_OUT,SRC_TOS,DST_TOS,TOTAL_FLOWS_EXP,IPV4_SRC_ADDR_Index,IPV4_DST_ADDR_Index,PROTOCOL_MAP_Index,L7_PROTO_NAME_Index,LABEL_Index
FIRST_SWITCHED,1.0,0.162694,0.999997,1.0,-0.006413,-0.004517,-0.051627,-0.04185,0.999997,-0.001023,-0.002079,-0.131243,-0.024126,0.010513,0.011986,0.012259,0.174129,0.082339,0.118535,0.082912,0.118995,0.062505,0.192049,0.051354,0.204915,-0.061782,-0.097314,0.950147,-0.048534,-0.020652,-0.173386,-0.080895,0.312277
FLOW_DURATION_MILLISECONDS,0.162694,1.0,0.165094,0.162694,0.014288,0.019655,-0.216952,-0.053719,0.165093,0.004375,0.007986,-0.302953,0.044909,0.384324,-0.011223,-0.011038,0.423331,0.000326,0.03994,0.001416,0.040517,-0.04316,0.099153,-0.027111,0.113693,-0.173051,-0.152401,0.122318,-0.099859,-0.123444,-0.362969,-0.181945,0.624942
FLOW_END_MILLISECONDS,0.999997,0.165094,1.0,0.999997,-0.006375,-0.004467,-0.052141,-0.041966,1.0,-0.001012,-0.002059,-0.131938,-0.024006,0.011456,0.011953,0.012227,0.175103,0.082307,0.118586,0.082883,0.119047,0.062373,0.192217,0.051267,0.205113,-0.062184,-0.097651,0.950065,-0.04876,-0.020948,-0.174211,-0.081311,0.313692
FLOW_START_MILLISECONDS,1.0,0.162694,0.999997,1.0,-0.006413,-0.004517,-0.051627,-0.04185,0.999997,-0.001023,-0.00208,-0.131243,-0.024126,0.010513,0.011986,0.012259,0.174129,0.082339,0.118535,0.082913,0.118995,0.062505,0.19205,0.051354,0.204915,-0.061782,-0.097314,0.950147,-0.048534,-0.020652,-0.173386,-0.080895,0.312277
IN_BYTES,-0.006413,0.014288,-0.006375,-0.006413,1.0,0.803119,0.006779,-0.002362,-0.006375,0.063159,0.210766,0.00386,0.034934,0.020215,0.004842,0.004839,0.00017,0.003532,0.016583,0.003457,0.016546,0.002821,0.00562,0.002721,0.005824,-0.003528,-0.000139,-0.006463,-0.000895,-0.001927,0.000653,0.027399,-0.008031
IN_PKTS,-0.004517,0.019655,-0.004467,-0.004517,0.803119,1.0,0.005942,-0.003057,-0.004467,0.571169,0.707349,0.002161,0.026904,0.016319,0.084243,0.084185,0.003884,0.006605,0.020426,0.006431,0.020365,0.005301,0.01004,0.005249,0.010244,-0.00487,-0.000363,-0.004553,-0.001497,-0.003171,-0.00215,0.030731,-0.007318
L4_DST_PORT,-0.051627,-0.216952,-0.052141,-0.051627,0.006779,0.005942,1.0,-0.054245,-0.052141,0.00119,0.001252,0.205233,-0.009005,-0.062357,-0.010545,-0.010313,-0.167271,-0.103364,-0.210682,-0.10225,-0.210528,-0.088296,-0.235629,-0.088218,-0.245189,0.049643,0.01446,-0.054896,0.09511,0.465629,0.080574,0.004016,-0.293805
L4_SRC_PORT,-0.04185,-0.053719,-0.041966,-0.04185,-0.002362,-0.003057,-0.054245,1.0,-0.041966,-0.00074,-0.001013,0.234348,0.004136,0.044342,-0.006856,-0.007472,0.028641,0.086947,0.022572,0.085792,0.022285,0.084763,-0.000466,0.095595,-0.008663,-0.215387,0.132924,-0.052349,-0.098551,0.105494,-0.247178,0.032835,-0.031599
LAST_SWITCHED,0.999997,0.165093,1.0,0.999997,-0.006375,-0.004467,-0.052141,-0.041966,1.0,-0.001012,-0.002059,-0.131937,-0.024006,0.011456,0.011953,0.012227,0.175102,0.082307,0.118586,0.082883,0.119047,0.062374,0.192217,0.051267,0.205113,-0.062184,-0.097651,0.950065,-0.04876,-0.020948,-0.174211,-0.081311,0.313692
OUT_BYTES,-0.001023,0.004375,-0.001012,-0.001023,0.063159,0.571169,0.00119,-0.00074,-0.001012,1.0,0.931187,-0.000981,0.001086,-0.000351,0.143005,0.143121,0.003783,0.006209,0.012077,0.005869,0.012055,0.005275,0.007564,0.004865,0.007477,-0.0019,-0.000317,-0.00083,-0.000441,-0.001051,-0.002562,0.010524,-0.005866


In [33]:
final_df = df.select(["FLOW_DURATION_MILLISECONDS", "TCP_FLAGS", "TCP_WIN_MAX_IN", "TCP_WIN_MIN_IN", "TCP_WIN_MSS_OUT", "TCP_WIN_SCALE_OUT", "LABEL_Index"])
final_df.show(5)

+--------------------------+---------+--------------+--------------+---------------+-----------------+-----------+
|FLOW_DURATION_MILLISECONDS|TCP_FLAGS|TCP_WIN_MAX_IN|TCP_WIN_MIN_IN|TCP_WIN_MSS_OUT|TCP_WIN_SCALE_OUT|LABEL_Index|
+--------------------------+---------+--------------+--------------+---------------+-----------------+-----------+
|                    104116|       19|         64240|          8192|           1460|                0|        0.0|
|                      9529|        0|             0|             0|              0|                0|        0.0|
|                      9529|        0|             0|             0|              0|                0|        0.0|
|                         0|        0|             0|             0|              0|                0|        0.0|
|                         0|        2|         65535|         65535|              0|                0|        0.0|
+--------------------------+---------+--------------+--------------+------------

In [36]:
final_df.repartition(1).write.option("header", True).csv("df")

## ML Algorithms

In [2]:
df = spark.read.csv("processedData.csv", header=True, inferSchema=True)
df.show(5)

+--------------------------+---------+--------------+--------------+---------------+-----------------+-----------+
|FLOW_DURATION_MILLISECONDS|TCP_FLAGS|TCP_WIN_MAX_IN|TCP_WIN_MIN_IN|TCP_WIN_MSS_OUT|TCP_WIN_SCALE_OUT|LABEL_Index|
+--------------------------+---------+--------------+--------------+---------------+-----------------+-----------+
|                    104116|       19|         64240|          8192|           1460|                0|        0.0|
|                      9529|        0|             0|             0|              0|                0|        0.0|
|                      9529|        0|             0|             0|              0|                0|        0.0|
|                         0|        0|             0|             0|              0|                0|        0.0|
|                         0|        2|         65535|         65535|              0|                0|        0.0|
+--------------------------+---------+--------------+--------------+------------

Assemble vector

In [3]:
df = df.limit(8_500_000)

input_cols = df.columns
input_cols.remove("LABEL_Index")

assembler = VectorAssembler(inputCols=input_cols, outputCol='features')
output = assembler.transform(df)

final_data = output.select(["features", "LABEL_Index"])

Training data and testing data split

In [4]:
trainData, testData = final_data.randomSplit([0.7, 0.3], seed = 16)

In [5]:
trainData.describe().show()
testData.describe().show()

+-------+-------------------+
|summary|        LABEL_Index|
+-------+-------------------+
|  count|            5950730|
|   mean|0.47126923923619457|
| stddev| 0.6726729031675439|
|    min|                0.0|
|    max|                2.0|
+-------+-------------------+

+-------+------------------+
|summary|       LABEL_Index|
+-------+------------------+
|  count|           2549270|
|   mean|0.4709814966637508|
| stddev|0.6726423069627532|
|    min|               0.0|
|    max|               2.0|
+-------+------------------+



# Logistic Regression

In [6]:
lr = LogisticRegression(labelCol='LABEL_Index')

startTime = time()

lr_model = lr.fit(trainData)

endTime = time()
print(f"{endTime - startTime} seconds")

65.35128283500671 seconds


In [8]:
lr_model.save("LR_Model")

In [9]:
lr_model = LogisticRegressionModel.load("LR_Model")

In [20]:
coefficients = lr_model.coefficients.toArray()
feature_names = ["FLOW_DURATION_MILLISECONDS", "TCP_FLAGS", "TCP_WIN_MAX_IN", "TCP_WIN_MIN_IN", "TCP_WIN_MSS_OUT", "TCP_WIN_SCALE_OUT", "LABEL_Index"]

feature_importances = pd.DataFrame({
    'feature': feature_names,
    'importance': np.abs(coefficients)
}).sort_values(by='importance', ascending=False)

plt.figure(figsize=(10, 6))
sns.barplot(x='importance', y='feature', data=feature_importances)
plt.title('Feature Importance for Logistic Regression')
plt.xlabel('Importance')
plt.ylabel('Feature')
plt.show()


Py4JJavaError: An error occurred while calling o201.coefficients.
: org.apache.spark.SparkException: Multinomial models contain a matrix of coefficients, use coefficientMatrix instead.
	at org.apache.spark.ml.classification.LogisticRegressionModel.coefficients(LogisticRegression.scala:1085)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
