<a href="https://colab.research.google.com/github/deepali17043/NetworkIntrusionDetection/blob/main/Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports/Misc

Note: If running for small dataset on Google Colab, uncomment the following three lines/cells

In [None]:
# !pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=18e439c8a295d75b3db8d50c99817d485e6f5b5ce74677f2b647ee0865781055
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
# %cd /path-to-project/           # replace path-to-project with your working directory that also has the small dataset.

/content/drive/MyDrive/Summer24/BigData/Project


In [None]:
# !ls                             # confirm that the input file is present

 generate_small_data.ipynb		    small_dataset
 NF_UQ_NIDS_v2.csv.bz2			    small_NF_UQ_NIDS_v2.csv
 Project.ipynb				    spark-3.1.1-bin-hadoop3.2
'Screenshot 2024-08-03 at 4.07.31 PM.png'   spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
import sys
import numpy as np

from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.stat import Correlation
from pyspark.ml import Pipeline

In [None]:
# Initialize Spark Session
spark = SparkSession.builder.appName('NetworkIntrusionDetection').getOrCreate()

Uncomment the following two lines if running for small dataset on Google Colab

In [None]:
# sys.argv[1] = 'small_NF_UQ_NIDS_v2.csv'
# sys.argv[2] = 'project_output_dir'

In [None]:
input_file = sys.argv[1]
output_dir = sys.argv[2]

In [None]:
output_log = []

# Read Data and EDA

In [None]:
# Load the data
data = spark.read.csv(input_file, header=True, inferSchema=True)

In [None]:
data_schema = data.schema
log = f'Data Read successful, schema infered:\n{data_schema}'

data.printSchema()
output_log.append(log)

Data Read successful, schema infered:
StructType([StructField('IPV4_SRC_ADDR', StringType(), True), StructField('L4_SRC_PORT', IntegerType(), True), StructField('IPV4_DST_ADDR', StringType(), True), StructField('L4_DST_PORT', IntegerType(), True), StructField('PROTOCOL', IntegerType(), True), StructField('L7_PROTO', DoubleType(), True), StructField('IN_BYTES', IntegerType(), True), StructField('IN_PKTS', IntegerType(), True), StructField('OUT_BYTES', IntegerType(), True), StructField('OUT_PKTS', IntegerType(), True), StructField('TCP_FLAGS', IntegerType(), True), StructField('CLIENT_TCP_FLAGS', IntegerType(), True), StructField('SERVER_TCP_FLAGS', IntegerType(), True), StructField('FLOW_DURATION_MILLISECONDS', IntegerType(), True), StructField('DURATION_IN', IntegerType(), True), StructField('DURATION_OUT', IntegerType(), True), StructField('MIN_TTL', IntegerType(), True), StructField('MAX_TTL', IntegerType(), True), StructField('LONGEST_FLOW_PKT', IntegerType(), True), StructField('SH

In [None]:
data = data.drop('Dataset')

In [None]:
output_label = 'Label'
output_attack = 'Attack_index'
output_columns = [output_label, output_attack]

In [None]:
categorical_columns = [field for (field, dataType) in data.dtypes if dataType == "string"]

In [None]:
data_desc = data.describe()
log = f'Data Description:\n{data_desc}'
data_desc.show()
output_log.append(log)

Data Description:
DataFrame[summary: string, IPV4_SRC_ADDR: string, L4_SRC_PORT: string, IPV4_DST_ADDR: string, L4_DST_PORT: string, PROTOCOL: string, L7_PROTO: string, IN_BYTES: string, IN_PKTS: string, OUT_BYTES: string, OUT_PKTS: string, TCP_FLAGS: string, CLIENT_TCP_FLAGS: string, SERVER_TCP_FLAGS: string, FLOW_DURATION_MILLISECONDS: string, DURATION_IN: string, DURATION_OUT: string, MIN_TTL: string, MAX_TTL: string, LONGEST_FLOW_PKT: string, SHORTEST_FLOW_PKT: string, MIN_IP_PKT_LEN: string, MAX_IP_PKT_LEN: string, SRC_TO_DST_SECOND_BYTES: string, DST_TO_SRC_SECOND_BYTES: string, RETRANSMITTED_IN_BYTES: string, RETRANSMITTED_IN_PKTS: string, RETRANSMITTED_OUT_BYTES: string, RETRANSMITTED_OUT_PKTS: string, SRC_TO_DST_AVG_THROUGHPUT: string, DST_TO_SRC_AVG_THROUGHPUT: string, NUM_PKTS_UP_TO_128_BYTES: string, NUM_PKTS_128_TO_256_BYTES: string, NUM_PKTS_256_TO_512_BYTES: string, NUM_PKTS_512_TO_1024_BYTES: string, NUM_PKTS_1024_TO_1514_BYTES: string, TCP_WIN_MAX_IN: string, TCP_WIN_M

In [None]:
# Index Catagorical columns to get correlation matrix
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in categorical_columns]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded") for column in categorical_columns if column != 'Attack']
pipeline = Pipeline(stages=indexers+encoders)
data = pipeline.fit(data).transform(data)

In [None]:
data = data.drop(*categorical_columns)
indexed_cols = [column+"_index" for column in categorical_columns if column != 'Attack']
data = data.drop(*indexed_cols)

In [None]:
updated_schema = data.schema
log = f'Data Schema after indexing and encoding:\n{updated_schema}'

data.printSchema()
output_log.append(log)

Data Schema after indexing and encoding:
StructType([StructField('L4_SRC_PORT', IntegerType(), True), StructField('L4_DST_PORT', IntegerType(), True), StructField('PROTOCOL', IntegerType(), True), StructField('L7_PROTO', DoubleType(), True), StructField('IN_BYTES', IntegerType(), True), StructField('IN_PKTS', IntegerType(), True), StructField('OUT_BYTES', IntegerType(), True), StructField('OUT_PKTS', IntegerType(), True), StructField('TCP_FLAGS', IntegerType(), True), StructField('CLIENT_TCP_FLAGS', IntegerType(), True), StructField('SERVER_TCP_FLAGS', IntegerType(), True), StructField('FLOW_DURATION_MILLISECONDS', IntegerType(), True), StructField('DURATION_IN', IntegerType(), True), StructField('DURATION_OUT', IntegerType(), True), StructField('MIN_TTL', IntegerType(), True), StructField('MAX_TTL', IntegerType(), True), StructField('LONGEST_FLOW_PKT', IntegerType(), True), StructField('SHORTEST_FLOW_PKT', IntegerType(), True), StructField('MIN_IP_PKT_LEN', IntegerType(), True), Struc

In [None]:
feature_columns = [field for (field, dataType) in data.dtypes if (dataType in ['double', 'int']) & (field not in output_columns)]

#### Compute correlations between features and the two output columns

In [None]:
label_corr = [(col, data.stat.corr(col, output_label)) for col in feature_columns]
log = f'Correlation with Label:\n{label_corr}'
print(log)
output_log.append(log)

Correlation with Label:
[('L4_SRC_PORT', -0.2970770297137139), ('L4_DST_PORT', -0.1089288607847205), ('PROTOCOL', 0.0451885017479017), ('L7_PROTO', 0.38027546073261886), ('IN_BYTES', -0.01764480370422351), ('IN_PKTS', -0.006272327213910631), ('OUT_BYTES', -0.013547652959048442), ('OUT_PKTS', -0.021268306535884865), ('TCP_FLAGS', -0.27784372849679206), ('CLIENT_TCP_FLAGS', -0.28024965645780775), ('SERVER_TCP_FLAGS', -0.2981561685129924), ('FLOW_DURATION_MILLISECONDS', 0.6806141381425527), ('DURATION_IN', 0.45103167322641247), ('DURATION_OUT', 0.04236432854125922), ('MIN_TTL', 0.07248870805888227), ('MAX_TTL', 0.06955699057327304), ('LONGEST_FLOW_PKT', -0.2794073348405015), ('SHORTEST_FLOW_PKT', 0.14903897197160654), ('MIN_IP_PKT_LEN', -0.5118122523644874), ('MAX_IP_PKT_LEN', -0.2794073348405015), ('SRC_TO_DST_SECOND_BYTES', -0.008349061706112266), ('DST_TO_SRC_SECOND_BYTES', -0.008243838134781837), ('RETRANSMITTED_IN_BYTES', -0.013277961336664524), ('RETRANSMITTED_IN_PKTS', -0.018561883

In [None]:
attack_corr = [(col, data.stat.corr(col, output_attack)) for col in feature_columns]
log = f'Correlation with Attack_index:\n{attack_corr}'
print(log)
output_log.append(log)

Correlation with Attack_index:
[('L4_SRC_PORT', -0.055796459671425026), ('L4_DST_PORT', 0.04631251035486179), ('PROTOCOL', -0.20482416917688845), ('L7_PROTO', -0.14018942007757668), ('IN_BYTES', -0.0064398311456250815), ('IN_PKTS', -0.007437149834683559), ('OUT_BYTES', -0.007472378715848047), ('OUT_PKTS', -0.009538090444948924), ('TCP_FLAGS', -0.127414313594926), ('CLIENT_TCP_FLAGS', -0.144057405614664), ('SERVER_TCP_FLAGS', -0.016426324437646828), ('FLOW_DURATION_MILLISECONDS', 0.10710531109507221), ('DURATION_IN', 0.08406499500166001), ('DURATION_OUT', 0.09964026936668045), ('MIN_TTL', -0.06458189207019564), ('MAX_TTL', -0.06603751288669163), ('LONGEST_FLOW_PKT', 0.03835057754871984), ('SHORTEST_FLOW_PKT', 0.147227907457668), ('MIN_IP_PKT_LEN', -0.02696167929558926), ('MAX_IP_PKT_LEN', 0.03835057754871984), ('SRC_TO_DST_SECOND_BYTES', -0.005122078373183468), ('DST_TO_SRC_SECOND_BYTES', -0.005055813751691971), ('RETRANSMITTED_IN_BYTES', -0.0005881511260798234), ('RETRANSMITTED_IN_PKTS

#### Assemble features and create train-test split

In [None]:
# Assemble features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Select label and features columns
data_multi = data.select("features", "Attack_index")
data_binary = data.select("features", "Label")

# Split the data
(Train_multi, Test_multi) = data_multi.randomSplit([0.7, 0.3], seed=42)
(Train_binary, Test_binary) = data_binary.randomSplit([0.7, 0.3], seed=42)

In [None]:
log = 'EDA Completed & Data Splits created'
print(log)
output_log.append(log)

EDA Completed


# Random Forest Classifier

### Binary Classification - Attack (1) or not (0)

In [None]:
log = 'Binary Classification - Attack (1) or not (0) - starting'
print(log)
output_log.append(log)

Binary Classification - Attack (1) or not (0) - starting


In [None]:
# Train a RandomForest model
rf = RandomForestClassifier(labelCol="Label", featuresCol="features", numTrees=10)

# Train model
model = rf.fit(Train_binary)

# Make predictions
predictions = model.transform(Test_binary)

In [None]:
# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="Label", metricName="areaUnderROC")
areaUnderROC = evaluator.evaluate(predictions)
log = f"Test areaUnderROC - with standard RF model: {areaUnderROC}"
print(log)
output_log.append(log)

Test areaUnderROC - with standard RF model: 0.9931896753892805


In [None]:
# Hyperparameter tuning
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20, 30]).build()
crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
cvModel_bin = crossval.fit(Train_binary)

# Make predictions with the best model
predictions = cvModel_bin.transform(Test_binary)
areaUnderROC = evaluator.evaluate(predictions)
log = f"Test areaUnderROC - with tuned RF model (using 3-fold CrossValidation): {areaUnderROC}"
print(log)
output_log.append(log)

Test areaUnderROC - with tuned RF model (using 3-fold CrossValidation): 0.993306228874039


In [None]:
cvModel_bin.bestModel.save(output_dir+'/best_model_bin')
log = f'Best Model for binary classification saved to {output_dir}/best_model_bin'
print(log)
print(f'Best Model: {cvModel_bin.bestModel}')
output_log.append(log)

Best Model for binary classification saved to project_output_dir/best_model_bin


### Multi-class classification - Attack type

In [None]:
log = 'Multi-class classification - Attack type - starting'
print(log)
output_log.append(log)

Multi-class classification - Attack type - starting


In [None]:
# Train a RandomForest model
rf = RandomForestClassifier(labelCol="Attack_index", featuresCol="features", numTrees=10)

# Train model
model = rf.fit(Train_multi)

# Make predictions
predictions = model.transform(Test_multi)

In [None]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Attack_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
log = f"Test Accuracy - with standard RF model: {accuracy}"
print(log)
output_log.append(log)

Test Accuracy - with standard RF model: 0.8805421153499084


In [None]:
# Hyperparameter tuning
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20, 30]).build()
crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
cvModel_multi = crossval.fit(Train_multi)

# Make predictions with the best model
predictions = cvModel_multi.transform(Test_multi)
accuracy = evaluator.evaluate(predictions)
log = f"Test Accuracy - with tuned RF model (using 3-fold CrossValidation): {accuracy}"
print(log)
output_log.append(log)

Exception ignored in: <function JavaWrapper.__del__ at 0x7bf23c26da20>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/ml/wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'BinaryClassificationEvaluator' object has no attribute '_java_obj'


Test Accuracy - with tuned RF model (using 3-fold CrossValidation): 0.8739418523706804


In [None]:
cvModel_multi.bestModel.save(output_dir+'/best_model_multi')
log = f'Best Model for multi-class classification saved to {output_dir}/best_model_multi'
print(log)
print(f'Best Model: {cvModel_multi.bestModel}')
output_log.append(log)

Best Model for multi-class classification saved to project_output_dir/best_model_multi
Best Model: RandomForestClassificationModel: uid=RandomForestClassifier_b324af8cc291, numTrees=30, numClasses=19, numFeatures=39


# Write output to output file and conclude

In [None]:
# output_dir = 'output_dir'

In [None]:
output_rdd = spark.sparkContext.parallelize(output_log)
output_rdd.saveAsTextFile(f'{output_dir}/output_logs')

In [None]:
spark.stop()