***Introduction:***

Spark, a renowned open-source distributed computing platform, excels in handling large datasets efficiently. Paired with Databricks, a cloud-based facilitator for Spark applications, the process becomes streamlined.

To leverage Databricks for Spark cluster creation and application execution, follow these steps:

1. Register and log in to Databricks.

2. Create a Spark cluster through the "Clusters" section.

3. Configure cluster settings as needed.

4. Initiate a notebook via the "Workspace."

5. Import and upload CSV files as tables.

6. Choose Python and link to the created cluster for the notebook.

7. Begin writing and executing Spark code for seamless data processing and analysis.
   
***Technologies used***

- DataBricks
- Spark
- scikitLearn

In [124]:
import pyspark
from sklearn.tree import DecisionTreeClassifier, export_graphviz
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from graphviz import Source
import pandas as pd
from pyspark.sql import SparkSession


In [125]:
spark = SparkSession.builder.appName("decisiontree").getOrCreate()
df_Train_Set = spark.read.format("csv").option("header","true").option("inferSchema","true").load("UNSW_NB15_training-set.csv")
df_Test_Set = spark.read.format("csv").option("header","true").option("inferSchema","true").load("UNSW_NB15_testing-set.csv")

# ***Check the Statistics***

statistics confirmation from (Table 6. A part of UNSW-NB15 data set distribution.)
Compute statistics for "Attack_cat" column in training and test sets and display them.

In [126]:
stats_train_set = df_Train_Set.groupBy("Attack_cat").count()
stats_train_set.show()

stats_test_set = df_Test_Set.groupBy("Attack_cat").count()
stats_train_set.show()

+--------------+-----+
|    Attack_cat|count|
+--------------+-----+
|         Worms|   44|
|     Shellcode|  378|
|       Fuzzers| 6062|
|      Analysis|  677|
|           DoS| 4089|
|Reconnaissance| 3496|
|      Backdoor|  583|
|      Exploits|11132|
|        Normal|37000|
|       Generic|18871|
+--------------+-----+

+--------------+-----+
|    Attack_cat|count|
+--------------+-----+
|         Worms|   44|
|     Shellcode|  378|
|       Fuzzers| 6062|
|      Analysis|  677|
|           DoS| 4089|
|Reconnaissance| 3496|
|      Backdoor|  583|
|      Exploits|11132|
|        Normal|37000|
|       Generic|18871|
+--------------+-----+



Merging CSV Files in Spark:

Use Spark's 'union' method for merging two dataframes, then convert the merged Spark dataframe to a Pandas dataframe for easier manipulation and visualization:

In [127]:
# Merge the two dataframes
spark_df = df_Train_Set.union(df_Test_Set)
# Convert the Spark dataframe to a Pandas dataframe
data = spark_df.toPandas()

In [129]:
#show 30 top rows
data.head(30)

Unnamed: 0,id,dur,proto,service,state,spkts,dpkts,sbytes,dbytes,rate,...,ct_dst_sport_ltm,ct_dst_src_ltm,is_ftp_login,ct_ftp_cmd,ct_flw_http_mthd,ct_src_ltm,ct_srv_dst,is_sm_ips_ports,attack_cat,label
0,1,1.1e-05,udp,-,INT,2,0,496,0,90909.0902,...,1,2,0,0,0,1,2,0,Normal,0
1,2,8e-06,udp,-,INT,2,0,1762,0,125000.0003,...,1,2,0,0,0,1,2,0,Normal,0
2,3,5e-06,udp,-,INT,2,0,1068,0,200000.0051,...,1,3,0,0,0,1,3,0,Normal,0
3,4,6e-06,udp,-,INT,2,0,900,0,166666.6608,...,1,3,0,0,0,2,3,0,Normal,0
4,5,1e-05,udp,-,INT,2,0,2126,0,100000.0025,...,1,3,0,0,0,2,3,0,Normal,0
5,6,3e-06,udp,-,INT,2,0,784,0,333333.3215,...,1,2,0,0,0,2,2,0,Normal,0
6,7,6e-06,udp,-,INT,2,0,1960,0,166666.6608,...,1,2,0,0,0,2,2,0,Normal,0
7,8,2.8e-05,udp,-,INT,2,0,1384,0,35714.28522,...,1,3,0,0,0,1,3,0,Normal,0
8,9,0.0,arp,-,INT,1,0,46,0,0.0,...,2,2,0,0,0,2,2,1,Normal,0
9,10,0.0,arp,-,INT,1,0,46,0,0.0,...,2,2,0,0,0,2,2,1,Normal,0


# Data Preparation: #

Limit data to 12 columns, including the target 'label'. Then, convert categorical data to numerical using Pandas' `get_dummies` method for columns "proto" and "state".

In [130]:
data = data[["proto", "state", "swin", "dwin", "trans_depth", "ct_srv_src", "ct_state_ttl",
             "ct_dst_ltm", "ct_src_dport_ltm", "ct_ftp_cmd", "is_sm_ips_ports", "label"]]


In [131]:
#show 30 top rows
data.head(30)


Unnamed: 0,proto,state,swin,dwin,trans_depth,ct_srv_src,ct_state_ttl,ct_dst_ltm,ct_src_dport_ltm,ct_ftp_cmd,is_sm_ips_ports,label
0,udp,INT,0,0,0,2,2,1,1,0,0,0
1,udp,INT,0,0,0,2,2,1,1,0,0,0
2,udp,INT,0,0,0,3,2,1,1,0,0,0
3,udp,INT,0,0,0,3,2,2,2,0,0,0
4,udp,INT,0,0,0,3,2,2,2,0,0,0
5,udp,INT,0,0,0,2,2,2,2,0,0,0
6,udp,INT,0,0,0,2,2,2,2,0,0,0
7,udp,INT,0,0,0,3,2,1,1,0,0,0
8,arp,INT,0,0,0,2,2,2,2,0,1,0
9,arp,INT,0,0,0,2,2,2,2,0,1,0


In [132]:
# Let's find are the categorical variables

In [133]:
print(data.columns)


Index(['proto', 'state', 'swin', 'dwin', 'trans_depth', 'ct_srv_src',
       'ct_state_ttl', 'ct_dst_ltm', 'ct_src_dport_ltm', 'ct_ftp_cmd',
       'is_sm_ips_ports', 'label'],
      dtype='object')


In [134]:
print(data.dtypes)


proto               object
state               object
swin                 int32
dwin                 int32
trans_depth          int32
ct_srv_src           int32
ct_state_ttl         int32
ct_dst_ltm           int32
ct_src_dport_ltm     int32
ct_ftp_cmd           int32
is_sm_ips_ports      int32
label                int32
dtype: object


In [135]:
# Identify original boolean columns
#original_boolean_columns = [col for col in data.columns if data[col].dtype == bool]
#print(original_boolean_columns)


In [136]:
# based on the output we have two variables of type ojhect tahat are " proto and state

#  convert the categorical data into numerical. #



In [137]:
# One-hot encode original boolean columns
#data = pd.get_dummies(data, columns=original_boolean_columns)
data = pd.get_dummies(data, columns=["proto","state"])


In [138]:
data.head()

Unnamed: 0,swin,dwin,trans_depth,ct_srv_src,ct_state_ttl,ct_dst_ltm,ct_src_dport_ltm,ct_ftp_cmd,is_sm_ips_ports,label,...,state_CLO,state_CON,state_ECO,state_FIN,state_INT,state_PAR,state_REQ,state_RST,state_URN,state_no
0,0,0,0,2,2,1,1,0,0,0,...,False,False,False,False,True,False,False,False,False,False
1,0,0,0,2,2,1,1,0,0,0,...,False,False,False,False,True,False,False,False,False,False
2,0,0,0,3,2,1,1,0,0,0,...,False,False,False,False,True,False,False,False,False,False
3,0,0,0,3,2,2,2,0,0,0,...,False,False,False,False,True,False,False,False,False,False
4,0,0,0,3,2,2,2,0,0,0,...,False,False,False,False,True,False,False,False,False,False


# Splitting Data for Training and Testing: #

Utilize sklearn's `train_test_split` function to divide the dataset into training and testing subsets, allocating 20% for testing.

In [139]:
## Data splitting 
X_train, X_test, y_train, y_test = train_test_split(data.drop('label', axis=1), data['label'], test_size=0.2)


# ***Model Training***

Train a Decision Tree model using sklearn's `DecisionTreeClassifier`, with the Gini impurity as the criterion.

In [140]:
# Train the Decision Tree model using Gini impurity
model = DecisionTreeClassifier(criterion='gini')
model.fit(X_train, y_train)

# ***Dicision Tree Visualization***

Visualize the Decision Tree using sklearn's `export_graphviz` and graphviz's `Source`

In [147]:
import os
from sklearn.tree import export_graphviz
from graphviz import Source

# Add the path to the 'dot' executable to the system's PATH
os.environ["PATH"] += os.pathsep + 'C:\\Program Files\\Graphviz\\bin'  

# Assuming 'model' and 'data' are defined in your code
dot_data = export_graphviz(model, out_file=None, feature_names=data.columns[:-1], class_names=['Normal', 'Attack'])
graph = Source(dot_data)

# Specify the file name for rendering and viewing
file_name = "decision-tree"

# Render the decision tree to a file
graph.render(file_name, format='pdf', engine='dot', cleanup=True)

# View the decision tree using the default viewer (e.g., PDF viewer)
graph.view()

ExecutableNotFound: failed to execute WindowsPath('dot'), make sure the Graphviz executables are on your systems' PATH

# ***Confusion Matrix Computation***

Compute the confusion matrix using sklearn's `confusion_matrix` to evaluate the model's performance.

In [148]:
# Compute and print the confusion matrix
y_pred = model.predict(X_test)
cm = confusion_matrix(y_test, y_pred)
print("Confusion matrix:")
print(cm)

Confusion matrix:
[[14388  4382]
 [  816 31949]]


# ***Conslusion***

Spark is a powerful tool for distributed data processing, offering scalability and efficiency for working with large datasets. In intrusion detection, Spark's capabilities enable quick and efficient processing of extensive network traffic data. Its distributed and parallel nature, coupled with machine learning libraries like MLlib, facilitates the rapid training and evaluation of models for accurate predictions on new data. Overall, Spark proves valuable in enhancing intrusion detection systems by efficiently handling large datasets and enabling effective machine learning model training.