In [1]:
# Install pyspark module for xgboost.spark to work in GoogleColab
# Change the path directory of the dataset according to your own environment
# Not sure whether if you run xgboost.spark in jupyter notebook will work although you have invoked this code
# To install modules, remove the '#'

# "%" makes you to install pyspark module to the current kernel only. (Install only to this ipynb file)
# %pip install pyspark

#other required modules
# !pip install xgboost
# !pip install scikit-learn

In [2]:
#generate spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Incident Response Automation")
    .config("spark.streaming.stopGracefullyOnShutdown",True)
    .config("spark.sql.shuffle.partitions", "200")
    .master("local[*]")
    .getOrCreate()
    )

spark

#for real network testing
# spark.conf.set("spark.sql.shuffle.partitions",8)

In [3]:
streaming_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/home/jovyan/sparkTEST/dataset/KDDCup99.csv")
    # .load("KDDCup99.csv")
)

In [4]:
streaming_df.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- lnum_compromised: integer (nullable = true)
 |-- lroot_shell: integer (nullable = true)
 |-- lsu_attempted: integer (nullable = true)
 |-- lnum_root: integer (nullable = true)
 |-- lnum_file_creations: integer (nullable = true)
 |-- lnum_shells: integer (nullable = true)
 |-- lnum_access_files: integer (nullable = true)
 |-- lnum_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullabl

In [5]:
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
import pandas as pd
# exploded_df = streaming_df.withColumn("")

# df_words = streaming_df.withColumn("words",split("value",","))
# df_explode = df_words.withColumn("word",explode("words")).drop("value")
# df_explode = df_words.withColumn("word",explode("words")).drop("value","words")

df = pd.read_csv("/home/jovyan/sparkTEST/dataset/KDDCup99.csv")
print("Read {} rows.".format(len(df)))
df.dropna(inplace=True,axis=1)
# pd.set_option('display.max_columns', 5)
# pd.set_option('display.max_rows', 5)
df

Read 494020 rows.


Unnamed: 0,duration,protocol_type,service,flag,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,...,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate,label
0,0,tcp,http,SF,181,5450,0,0,0,0,...,9,1.0,0.0,0.11,0.00,0.00,0.00,0.0,0.0,normal
1,0,tcp,http,SF,239,486,0,0,0,0,...,19,1.0,0.0,0.05,0.00,0.00,0.00,0.0,0.0,normal
2,0,tcp,http,SF,235,1337,0,0,0,0,...,29,1.0,0.0,0.03,0.00,0.00,0.00,0.0,0.0,normal
3,0,tcp,http,SF,219,1337,0,0,0,0,...,39,1.0,0.0,0.03,0.00,0.00,0.00,0.0,0.0,normal
4,0,tcp,http,SF,217,2032,0,0,0,0,...,49,1.0,0.0,0.02,0.00,0.00,0.00,0.0,0.0,normal
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
494015,0,tcp,http,SF,310,1881,0,0,0,0,...,255,1.0,0.0,0.01,0.05,0.00,0.01,0.0,0.0,normal
494016,0,tcp,http,SF,282,2286,0,0,0,0,...,255,1.0,0.0,0.17,0.05,0.00,0.01,0.0,0.0,normal
494017,0,tcp,http,SF,203,1200,0,0,0,0,...,255,1.0,0.0,0.06,0.05,0.06,0.01,0.0,0.0,normal
494018,0,tcp,http,SF,291,1200,0,0,0,0,...,255,1.0,0.0,0.04,0.05,0.04,0.01,0.0,0.0,normal


In [6]:
#analyze the dataset
import pandas as pd
import os
import numpy as np
from sklearn import metrics
from scipy.stats import zscore

def expand_categories(values):
    result = []
    s = values.value_counts()
    t = float(len(values))
    for v in s.index:
        result.append("{}:{}%".format(v,round(100*(s[v]/t),2)))
    return "[{}]".format(",".join(result))

def analyze(df):
    print()
    cols = df.columns.values
    total = float(len(df))

    print("{} rows".format(int(total)))
    for col in cols:
        uniques = df[col].unique()
        unique_count = len(uniques)
        if unique_count>100:
            print("** {}:{} ({}%)".format(col,unique_count,\
                int(((unique_count)/total)*100)))
        else:
            print("** {}:{}".format(col,expand_categories(df[col])))
            expand_categories(df[col])

In [7]:
analyze(df) #analyze dataset for summarization


494020 rows
** duration:2495 (0%)
** protocol_type:[icmp:57.41%,tcp:38.47%,udp:4.12%]
** service:[ecr_i:56.96%,private:22.45%,http:13.01%,smtp:1.97%,other:1.46%,domain_u:1.19%,ftp_data:0.96%,eco_i:0.33%,ftp:0.16%,finger:0.14%,urp_i:0.11%,telnet:0.1%,ntp_u:0.08%,auth:0.07%,pop_3:0.04%,time:0.03%,csnet_ns:0.03%,remote_job:0.02%,gopher:0.02%,imap4:0.02%,discard:0.02%,domain:0.02%,iso_tsap:0.02%,systat:0.02%,shell:0.02%,echo:0.02%,rje:0.02%,whois:0.02%,sql_net:0.02%,printer:0.02%,nntp:0.02%,courier:0.02%,sunrpc:0.02%,netbios_ssn:0.02%,mtp:0.02%,vmnet:0.02%,uucp_path:0.02%,uucp:0.02%,klogin:0.02%,bgp:0.02%,ssh:0.02%,supdup:0.02%,nnsp:0.02%,login:0.02%,hostnames:0.02%,efs:0.02%,daytime:0.02%,link:0.02%,netbios_ns:0.02%,pop_2:0.02%,ldap:0.02%,netbios_dgm:0.02%,exec:0.02%,http_443:0.02%,kshell:0.02%,name:0.02%,ctf:0.02%,netstat:0.02%,Z39_50:0.02%,IRC:0.01%,urh_i:0.0%,X11:0.0%,tim_i:0.0%,pm_dump:0.0%,tftp_u:0.0%,red_i:0.0%]
** flag:[SF:76.6%,S0:17.61%,REJ:5.44%,RSTR:0.18%,RSTO:0.12%,SH:0.02%,S

In [8]:
# df
# df_words = streaming_df.withColumn("words",split("value",","))
# df_explode = df_words.withColumn("word",explode("words")).drop("value")
# df_explode = df_words.withColumn("word",explode("words")).drop("value","words")

In [9]:
from pyspark.sql.functions import col, when

# Import dataset
df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema","true") \
        .load("/home/jovyan/sparkTEST/dataset/KDDCup99.csv") 
        # .load("KDDCup99.csv")

# Further encode each label into fewer labels
attack_types = ['normal', 'dos', 'u2r', 'r2l', 'probe']
df = df.withColumn(
    'label',
    when(col('label').isin(['back', 'land', 'neptune', 'pod', 'smurf', 'teardrop']), 'dos')
    .when(col('label').isin(['buffer_overflow', 'loadmodule', 'perl', 'rootkit']), 'u2r')
    .when(col('label').isin(['ftp_write', 'guess_passwd', 'imap', 'multihop', 'phf', 'spy', 'warezclient', 'warezmaster']), 'r2l')
    .when(col('label').isin(['ipsweep', 'nmap', 'portsweep', 'satan']), 'probe')
    .otherwise('normal')
)

# Map the string labels to index values
df = df.withColumn(
    'label',
    when(col('label') == 'dos', attack_types.index('dos'))
    .when(col('label') == 'u2r', attack_types.index('u2r'))
    .when(col('label') == 'r2l', attack_types.index('r2l'))
    .when(col('label') == 'probe', attack_types.index('probe'))
    .otherwise(attack_types.index('normal'))
)

df = df.select(["duration", "protocol_type", "src_bytes", "srv_count", "dst_host_same_src_port_rate", "label"])

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Define StringIndexers for categorical columns
indexer_protocol_type = StringIndexer(inputCol="protocol_type", outputCol="protocol_type_index")

df = indexer_protocol_type.fit(df).transform(df)

# Define OneHotEncoders for categorical columns
encoder_protocol_type = OneHotEncoder(inputCol="protocol_type_index", outputCol="protocol_type_oh")

df = encoder_protocol_type.fit(df).transform(df)

# Assemble features into a single vector column
feature_columns = [col for col in df.columns if col not in ["label", "protocol_type", "protocol_type_index"]]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df)

df_assembled = data.select(col("features"), col("label"))

In [11]:
# Data Scaling

from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(df_assembled)
scaled_data = scaler_model.transform(df_assembled)

In [12]:
# Oversampling

def oversample_class(df, label, target_count):
    class_df = df.filter(col("label") == label)
    num_to_add = target_count - class_df.count()

    if num_to_add > 0:
        additional_df = class_df.sample(withReplacement=True, fraction=num_to_add / class_df.count())
        return df.union(additional_df)
    else:
        return df

class_counts = df.groupBy("label").count().collect()
class_counts_dict = {row['label']: row['count'] for row in class_counts}
max_count = max(class_counts_dict.values())

oversampled_df = scaled_data
for label, count in class_counts_dict.items():
    oversampled_df = oversample_class(oversampled_df, label, max_count)

oversampled_df.groupBy("label").count().show()

+-----+------+
|label| count|
+-----+------+
|    1|391458|
|    3|390073|
|    4|391761|
|    2|391322|
|    0|392224|
+-----+------+



In [13]:
# Split train and test dataset
train_df, test_df = oversampled_df.randomSplit([0.7, 0.3], seed=7)

In [14]:
# The model with the highest accuracy, XGB
from xgboost.spark import SparkXGBClassifier
xgb_classifier = SparkXGBClassifier(
  features_col='scaledFeatures',
  label_col='label'
)

xgb_model = xgb_classifier.fit(train_df)
predictions = xgb_model.transform(test_df)

2024-08-23 07:04:49,433 INFO XGBoost-PySpark: _fit Running xgboost-2.1.1 on 1 workers with
	booster params: {'objective': 'multi:softprob', 'device': 'cpu', 'num_class': 5, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-08-23 07:06:21,823 INFO XGBoost-PySpark: _fit Finished xgboost training!


In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol='prediction', metricName='accuracy')

# Compute the accuracy of the model
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")

# Optionally, evaluate using other metrics like F1 score or confusion matrix
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol='prediction', metricName='f1')
f1_score = f1_evaluator.evaluate(predictions)
print(f"Test F1 Score: {f1_score}")

# Compute precision and recall for each class
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol='prediction', metricName='precisionByLabel')
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol='prediction', metricName='recallByLabel')

# Precision and Recall
precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)

print(f"Test Precision: {precision}")
print(f"Test Recall: {recall}")

# Optionally, compute the confusion matrix manually
from pyspark.sql.functions import col

confusion_matrix = predictions.groupBy("label", "prediction").count().toPandas()
print(f"Confusion Matrix:\n{confusion_matrix}")


Test Accuracy: 0.9727705829669713
Test F1 Score: 0.9730737536705101
Test Precision: 0.9902351534475887
Test Recall: 0.9712088379009985
Confusion Matrix:
    label  prediction   count
0       1         0.0      62
1       0         2.0     281
2       4         4.0  115992
3       0         3.0     432
4       0         0.0  114287
5       0         4.0    2630
6       4         0.0     850
7       3         3.0  116325
8       2         2.0  108614
9       1         1.0  116096
10      0         1.0      45
11      3         4.0     541
12      1         4.0    1633
13      1         2.0       4
14      4         1.0      74
15      1         3.0       2
16      3         0.0     215
17      3         2.0     104
18      4         3.0      33
19      2         4.0    9086


In [16]:
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import StringType

# Mapping of indexed labels to their corresponding attack types
label_mapping = {
    0: "Normal",
    1: "DOS",
    2: "R2L",
    3: "U2R",
    4: "Probe"
}

# UDF to map the label to its attack type
def label_to_attack_type(label):
    return label_mapping.get(label, "Unknown")

# Register the UDF
label_to_attack_udf = udf(label_to_attack_type, StringType())

# Add a new column 'attack_type' to the DataFrame by mapping the 'indexedLabel' to attack names
mapped_predictions = predictions.withColumn("connection_type", label_to_attack_udf(col("label")))

# Create a custom order column to enforce the order: Normal, DOS, R2L, U2R, Probe
ordered_summary = mapped_predictions.withColumn(
    "order",
    when(col("connection_type") == "Normal", 1)
    .when(col("connection_type") == "DOS", 2)
    .when(col("connection_type") == "R2L", 3)
    .when(col("connection_type") == "U2R", 4)
    .when(col("connection_type") == "Probe", 5)
)

# Group by 'attack_type' and 'order', then count and sort by the custom order
attack_summary = ordered_summary.groupBy("connection_type", "order").count().orderBy("order")

# Show the results
attack_summary.select("connection_type", "count").show()


+---------------+------+
|connection_type| count|
+---------------+------+
|         Normal|117675|
|            DOS|117797|
|            R2L|117700|
|            U2R|117185|
|          Probe|116949|
+---------------+------+



In [17]:
from pyspark.sql import Row

# Collect the metrics into a DataFrame
metrics = spark.createDataFrame([
    Row(Metric="Accuracy", Value=accuracy),
    Row(Metric="F1 Score", Value=f1_score),
    Row(Metric="Precision", Value=precision),
    Row(Metric="Recall", Value=recall)
])

metrics.write.mode("overwrite").csv('metrics.csv', header=True)
attack_summary.write.mode("overwrite").csv('connection summary', header=True)