(1) Get the data \
(2) Discover and visualize the data \
(3) Prepare the data for machine learning algorithms \
(4) Select and train models \
(5) Fine-tune the models \
(6) Evaluate the outcomes

#Get the data

In [None]:
# Installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=a3bae2749507476f78189832e0c71549b83e84d614e68e6c471c2d288a0f682e
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
# Importing libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

spark = SparkSession.builder.appName('Multi-Classification').getOrCreate()
spark

In [None]:
# Read the dataset
from google.colab import drive
drive.mount("/content/gdrive")
df_training = spark.read.csv('/content/gdrive/My Drive/UNSW_NB15_training-set.csv', header = True, inferSchema = True)
df_testing = spark.read.csv('/content/gdrive/My Drive/UNSW_NB15_testing-set.csv', header = True, inferSchema = True)

df_training.show(truncate = False)
print(f'Train data set size: {df_training.count()} records')
print('\n\n')
df_testing.show(truncate = False)
print(f'Test data set size: {df_testing.count()} records')

Mounted at /content/gdrive
+---+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+-----+-----+---------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+----------+----------------+----------------+--------------+------------+----------+----------------+----------+----------+---------------+----------+-----+
|id |dur   |proto|service|state|spkts|dpkts|sbytes|dbytes|rate       |sttl|dttl|sload        |dload|sloss|dloss|sinpkt   |dinpkt|sjit|djit|swin|stcpb|dtcpb|dwin|tcprtt|synack|ackdat|smean|dmean|trans_depth|response_body_len|ct_srv_src|ct_state_ttl|ct_dst_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|is_ftp_login|ct_ftp_cmd|ct_flw_http_mthd|ct_src_ltm|ct_srv_dst|is_sm_ips_ports|attack_cat|label|
+---+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+-----+-----+---------+------+----+----+----+-----+-----+----+--

In [None]:
# Merge the datasets
import functools

def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

df = unionAll([df_training, df_testing])

#Discover and Visualise the Data

In [None]:
df.show(truncate = False)
print(f'Data set size: {df.count()} records')

+---+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+-----+-----+---------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+----------+----------------+----------------+--------------+------------+----------+----------------+----------+----------+---------------+----------+-----+
|id |dur   |proto|service|state|spkts|dpkts|sbytes|dbytes|rate       |sttl|dttl|sload        |dload|sloss|dloss|sinpkt   |dinpkt|sjit|djit|swin|stcpb|dtcpb|dwin|tcprtt|synack|ackdat|smean|dmean|trans_depth|response_body_len|ct_srv_src|ct_state_ttl|ct_dst_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|is_ftp_login|ct_ftp_cmd|ct_flw_http_mthd|ct_src_ltm|ct_srv_dst|is_sm_ips_ports|attack_cat|label|
+---+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+-----+-----+---------+------+----+----+----+-----+-----+----+------+------+------+-----+--

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- dur: double (nullable = true)
 |-- proto: string (nullable = true)
 |-- service: string (nullable = true)
 |-- state: string (nullable = true)
 |-- spkts: integer (nullable = true)
 |-- dpkts: integer (nullable = true)
 |-- sbytes: integer (nullable = true)
 |-- dbytes: integer (nullable = true)
 |-- rate: double (nullable = true)
 |-- sttl: integer (nullable = true)
 |-- dttl: integer (nullable = true)
 |-- sload: double (nullable = true)
 |-- dload: double (nullable = true)
 |-- sloss: integer (nullable = true)
 |-- dloss: integer (nullable = true)
 |-- sinpkt: double (nullable = true)
 |-- dinpkt: double (nullable = true)
 |-- sjit: double (nullable = true)
 |-- djit: double (nullable = true)
 |-- swin: integer (nullable = true)
 |-- stcpb: long (nullable = true)
 |-- dtcpb: long (nullable = true)
 |-- dwin: integer (nullable = true)
 |-- tcprtt: double (nullable = true)
 |-- synack: double (nullable = true)
 |-- ackdat: double (nullable 

#Prepare the Data for Machine Learning Algorithms

In [None]:
# Checking for null values
from pyspark.sql.functions import col, count, when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Results show that there are no null values within the dataset

+---+---+-----+-------+-----+-----+-----+------+------+----+----+----+-----+-----+-----+-----+------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+----------+----------------+----------------+--------------+------------+----------+----------------+----------+----------+---------------+----------+-----+
| id|dur|proto|service|state|spkts|dpkts|sbytes|dbytes|rate|sttl|dttl|sload|dload|sloss|dloss|sinpkt|dinpkt|sjit|djit|swin|stcpb|dtcpb|dwin|tcprtt|synack|ackdat|smean|dmean|trans_depth|response_body_len|ct_srv_src|ct_state_ttl|ct_dst_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|is_ftp_login|ct_ftp_cmd|ct_flw_http_mthd|ct_src_ltm|ct_srv_dst|is_sm_ips_ports|attack_cat|label|
+---+---+-----+-------+-----+-----+-----+------+------+----+----+----+-----+-----+-----+-----+------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+-----

In [None]:
# Study each column and their values
for column in df.columns:
    df.groupBy(column).count().show()

+----+-----+
|  id|count|
+----+-----+
| 148|    2|
| 463|    2|
| 471|    2|
| 496|    2|
| 833|    2|
|1088|    2|
|1238|    2|
|1342|    2|
|1580|    2|
|1591|    2|
|1645|    2|
|1829|    2|
|1959|    2|
|2122|    2|
|2142|    2|
|2366|    2|
|2659|    2|
|2866|    2|
|3175|    2|
|3749|    2|
+----+-----+
only showing top 20 rows

+---------+-----+
|      dur|count|
+---------+-----+
|16.800188|    2|
| 1.673592|    1|
| 0.432617|    1|
| 0.675253|    1|
| 1.297416|    1|
| 1.568354|    1|
| 0.504508|    1|
| 0.281319|    1|
| 0.627159|    1|
| 0.452864|    1|
| 1.480129|    1|
| 0.249731|    1|
| 0.520387|    2|
| 0.997137|    1|
| 1.045879|    1|
| 0.822681|    1|
| 1.121988|    2|
| 0.750328|    1|
| 1.446851|    1|
| 1.617054|    2|
+---------+-----+
only showing top 20 rows

+----------+-----+
|     proto|count|
+----------+-----+
|      cphb|  132|
|nsfnet-igp|  132|
|      larp|  132|
|       dgp|  132|
|       tcf|  132|
|     crudp|  132|
|       nvp|  132|
|       igp|  

In [None]:
# Dropping id column since id cannot determine the target
df = df.drop('id')

# Dropping label column since attack_cat is the target variable
df = df.drop('label')

In [None]:
# Converting categorical values to numeric values

catCols = [x for (x, dataType) in df.dtypes if dataType == 'string']

for catCol in catCols:
    stringIndexer = StringIndexer(inputCol = catCol, outputCol = catCol + '_index')
    df = stringIndexer.fit(df).transform(df)
df.show()

+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+-----+-----+---------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+----------+----------------+----------------+--------------+------------+----------+----------------+----------+----------+---------------+----------+-----------+-------------+-----------+----------------+
|   dur|proto|service|state|spkts|dpkts|sbytes|dbytes|       rate|sttl|dttl|        sload|dload|sloss|dloss|   sinpkt|dinpkt|sjit|djit|swin|stcpb|dtcpb|dwin|tcprtt|synack|ackdat|smean|dmean|trans_depth|response_body_len|ct_srv_src|ct_state_ttl|ct_dst_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|is_ftp_login|ct_ftp_cmd|ct_flw_http_mthd|ct_src_ltm|ct_srv_dst|is_sm_ips_ports|attack_cat|proto_index|service_index|state_index|attack_cat_index|
+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+----

In [None]:
# Change attack_cat_index column name to label

from functools import reduce

oldColumns = ['attack_cat_index']
newColumns = ['label']
df = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]),range(len(oldColumns)), df)
df.show()

+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+-----+-----+---------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+----------+----------------+----------------+--------------+------------+----------+----------------+----------+----------+---------------+----------+-----------+-------------+-----------+-----+
|   dur|proto|service|state|spkts|dpkts|sbytes|dbytes|       rate|sttl|dttl|        sload|dload|sloss|dloss|   sinpkt|dinpkt|sjit|djit|swin|stcpb|dtcpb|dwin|tcprtt|synack|ackdat|smean|dmean|trans_depth|response_body_len|ct_srv_src|ct_state_ttl|ct_dst_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|is_ftp_login|ct_ftp_cmd|ct_flw_http_mthd|ct_src_ltm|ct_srv_dst|is_sm_ips_ports|attack_cat|proto_index|service_index|state_index|label|
+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+-----+-----+---------+----

In [None]:
# Apply OneHotEncoder to the indexed categorical columns

catCols = ['proto','service','state']

for col in catCols:
    onehotEncoder = OneHotEncoder(inputCol=col+ '_index', outputCol=col + '_onehot')
    df = onehotEncoder.fit(df).transform(df)
df.show()

+------+-----+-------+-----+-----+-----+------+------+-----------+----+----+-------------+-----+-----+-----+---------+------+----+----+----+-----+-----+----+------+------+------+-----+-----+-----------+-----------------+----------+------------+----------+----------------+----------------+--------------+------------+----------+----------------+----------+----------+---------------+----------+-----------+-------------+-----------+-----+---------------+--------------+--------------+
|   dur|proto|service|state|spkts|dpkts|sbytes|dbytes|       rate|sttl|dttl|        sload|dload|sloss|dloss|   sinpkt|dinpkt|sjit|djit|swin|stcpb|dtcpb|dwin|tcprtt|synack|ackdat|smean|dmean|trans_depth|response_body_len|ct_srv_src|ct_state_ttl|ct_dst_ltm|ct_src_dport_ltm|ct_dst_sport_ltm|ct_dst_src_ltm|is_ftp_login|ct_ftp_cmd|ct_flw_http_mthd|ct_src_ltm|ct_srv_dst|is_sm_ips_ports|attack_cat|proto_index|service_index|state_index|label|   proto_onehot|service_onehot|  state_onehot|
+------+-----+-------+-----+--

In [None]:
# Initialize a list to store pipeline stages
stages = []

numericCols = ['dur', 'spkts', 'dpkts', 'sbytes', 'dbytes', 'rate', 'sttl', 'dttl', 'sload', 'dload', 'sloss', 'dloss', 'sinpkt', 'dinpkt', 'sjit',
            'djit', 'swin', 'stcpb', 'dtcpb', 'dwin', 'tcprtt', 'synack', 'ackdat', 'smean', 'dmean', 'trans_depth', 'response_body_len', 'ct_srv_src',
            'ct_state_ttl', 'ct_dst_ltm', 'ct_src_dport_ltm', 'ct_dst_sport_ltm', 'ct_dst_src_ltm', 'is_ftp_login', 'ct_ftp_cmd', 'ct_flw_http_mthd',
            'ct_src_ltm', 'ct_srv_dst', 'is_sm_ips_ports']

# Combine the input columns into features vector using VectorAssembler
assemblerInputs = [c + "_onehot" for c in catCols] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]

# Apply standardization to the data using StandardScaler
standard_scaler = StandardScaler(inputCol="features", outputCol="features_scaled")

stages += [standard_scaler]

In [None]:
# Create a Pipeline which includes VectorAssembler and StandardScaler

from pyspark.ml import Pipeline

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
df = df.select(['features_scaled','label'])
df.show(truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features_scaled                                                                                                                                                                                                                                                                                                                                                                                                                                                        |label|
+-----------------------------------------------------------------------

In [None]:
# Split the dataset into training and testing datasets
(train, test) = df.randomSplit([0.7,0.3], seed=42)

train.show(10, truncate = False)
test.show(10, truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features_scaled                                          

#Select and train models / Fine-tune the models

##LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

In [None]:
# Import classification algorithms, evaluator, paramgridbuilder, crossvalidator
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
# Define the LogisticRegression, DecisionTreeClassifier, RandomForestClassifier models
dt = DecisionTreeClassifier(featuresCol = 'features_scaled', labelCol = 'label')
rf = RandomForestClassifier(featuresCol = 'features_scaled', labelCol = 'label')
lr = LogisticRegression(featuresCol = 'features_scaled', labelCol = 'label', family = "multinomial")

# Fit each model with the training data
dtModel = dt.fit(train)
rfModel = rf.fit(train)
lrModel = lr.fit(train)

eval = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='label',metricName="accuracy")

###Decision Tree Classifier

In [None]:
# Use the trained Decision Tree model to make predictions with the testing data
predTarget = dtModel.transform(test)
print('Decision Tree Accuracy', eval.evaluate(predTarget))

Decision Tree Accuracy 0.720386837151944


Fine Tune Decision Tree

In [None]:
# Define a grid of hyperparameters to finetune the model
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.minInstancesPerNode, [1, 10, 20]) \
    .build()

# Initialize CrossValidator for hyperparameter tuning
crossvalDT = CrossValidator(estimator=dt,
                            estimatorParamMaps=paramGrid,
                            evaluator=eval,
                            numFolds=5)

# Train the model on the training data using CrossValidator
cvModelDT = crossvalDT.fit(train)

# Make predictions with the testing data
predTargetTuned = cvModelDT.transform(test)
print('Decision Tree Accuracy after fine-tuning', eval.evaluate(predTargetTuned))

Decision Tree Accuracy after fine-tuning 0.8167715973258908


###Random Forest Classifier

In [None]:
# Use the trained Random Forest Classifier model to make predictions with the testing data
predTarget = rfModel.transform(test)
print('Random Forest Accuracy', eval.evaluate(predTarget))

Random Forest Accuracy 0.7401570714610242


Fine Tune Random Forest

In [None]:
# Define a grid of hyperparameters to finetune the model
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [5, 10, 15]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Initialize CrossValidator for hyperparameter tuning
crossvalRF = CrossValidator(estimator=rf,
                            estimatorParamMaps=paramGrid,
                            evaluator=eval,
                            numFolds=5)

# Train the model on the training data using CrossValidator
cvModelRF = crossvalRF.fit(train)

# Make predictions with the testing data
predTargetTuned = cvModelRF.transform(test)
print('Random Forest Accuracy after fine-tuning', eval.evaluate(predTargetTuned))

Random Forest Accuracy after fine-tuning 0.8093464009865645


###Logistic Regression

In [None]:
# Use the trained Logistic Regression model to make predictions with the testing data
predTarget = lrModel.transform(test)
print('Logistic Regression Accuracy', eval.evaluate(predTarget))

Logistic Regression Accuracy 0.7758421496722269


Fine Tune Logistic Regression

In [None]:
# Define a grid of hyperparameters to fine tune the model
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2, 1]) \
    .addGrid(lr.maxIter, [10, 20, 50, 100]) \
    .build()

# Initialize CrossValidator for hyperparameter tuning
crossvalLR = CrossValidator(estimator=lr,
                            estimatorParamMaps=paramGrid,
                            evaluator=eval,
                            numFolds=5)

# Train the model on the training data using CrossValidator
cvModelLR = crossvalLR.fit(train)

# Make predictions with the testing data
predTargetTuned = cvModelLR.transform(test)
print('Logistic Regression Accuracy after fine-tuning', eval.evaluate(predTargetTuned))

Logistic Regression Accuracy after fine-tuning 0.7758551307847082


#Evaluate the outcomes

(1) Decision Tree Accuracy after fine-tuning: 0.8167715973258908 \
(2) Random Forest Accuracy after fine-tuning: 0.8093464009865645 \
(3) Logistic Regression Accuracy after fine-tuning 0.7758551307847082 \

Among the three classifiers, Decision Tree performs the best, Random Forest second, and Logistic Regression third.

#Similarities between Sklearn and PySpark
(1) Both libraries provide a wide range of machine learning algorithms, including classification, regression, clustering, and more. \
(2) Both libraries offer tools for data transformation, preprocessing, and feature engineering. This includes handling missing values, scaling, encoding categorical variables, and more. \
(3) Both libraries offer tools for cross-validation and evaluation metrics to assess model performance.

#Differences between Sklearn and PySpark
-In pyspark, we need to use VectorAssembler to combine columns into a single vector.

-In sklearn, we do not need to combine the columns.