# Install Spark

In [None]:
!pip3 install pyspark



# Donwload libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, MinMaxScaler, RobustScaler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Initialize SparkSession

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
                    .appName("NaiveBayesClassifierExample") \
                    .master("local[*]") \
                    .config("spark.executor.memory", "4g") \
                    .config("spark.driver.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
                    .getOrCreate()

spark

# Load and preprocess the data

In [None]:
# Load the compressed file as a text file
df = spark.read.csv("/content/kddcup.data_10_percent.gz", header = False)

In [None]:
df.show(5)

+---+---+----+---+---+----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|_c0|_c1| _c2|_c3|_c4| _c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|   _c41|
+---+---+----+---+---+----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|  0|tcp|http| SF|181|5450|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   8|   8|0.00|0.00|0.00|0.00|1.00|0.00|0.00|   9|   9|1.00|0.00|0.11|0.00|0.00|0.00|0.00|0.00|normal.|
|  0|tcp|http| SF|239| 486|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   8|   8|0.00|0.00|0.00|0.00|1.00|0.00|0.00|  19|  19|1.00|0.00|0.05

In [None]:
# more info
print(df.count())
print(df.rdd.getNumPartitions())


494021
1


In [None]:


df = df.repartition(2)
df.rdd.getNumPartitions()



2

# Add header

In [None]:
df = df.withColumnRenamed("_c0","duration") \
      .withColumnRenamed("_c1","protocol_type")\
      .withColumnRenamed("_c2","service")\
      .withColumnRenamed("_c3","flag")\
      .withColumnRenamed("_c4","src_bytes")\
      .withColumnRenamed("_c5","dst_bytes")\
      .withColumnRenamed("_c6","land")\
      .withColumnRenamed("_c7","wrong_fragment")\
      .withColumnRenamed("_c8","urgent")\
      .withColumnRenamed("_c9","host")\
      .withColumnRenamed("_c10","num_failed_logins")\
      .withColumnRenamed("_c11","logged_in")\
      .withColumnRenamed("_c12","num_compromised")\
      .withColumnRenamed("_c13","root_shell")\
      .withColumnRenamed("_c14","su_attempted")\
      .withColumnRenamed("_c15","num_root")\
      .withColumnRenamed("_c16","num_file_creations")\
      .withColumnRenamed("_c17","num_shells")\
      .withColumnRenamed("_c18","num_access_files")\
      .withColumnRenamed("_c19","num_outbound_cmds")\
      .withColumnRenamed("_c20","is_host_login")\
      .withColumnRenamed("_c21","is_guest_login")\
      .withColumnRenamed("_c22","count")\
      .withColumnRenamed("_c23","srv_count")\
      .withColumnRenamed("_c24","serror_rate")\
      .withColumnRenamed("_c25","srv_serror_rate")\
      .withColumnRenamed("_c26","rerror_rate")\
      .withColumnRenamed("_c27","srv_rerror_rate")\
      .withColumnRenamed("_c28","same_srv_rate")\
      .withColumnRenamed("_c29","diff_srv_rate")\
      .withColumnRenamed("_c30","srv_diff_host_rate")\
      .withColumnRenamed("_c31","dst_host_count")\
      .withColumnRenamed("_c32","dst_host_srv_count")\
      .withColumnRenamed("_c33","dst_host_same_srv_rate")\
      .withColumnRenamed("_c34","dst_host_diff_srv_rate")\
      .withColumnRenamed("_c35","dst_host_same_src_port_rate")\
      .withColumnRenamed("_c36","dst_host_srv_diff_host_rate")\
      .withColumnRenamed("_c37","dst_host_serror_rate")\
      .withColumnRenamed("_c38","dst_host_srv_serror_rate")\
      .withColumnRenamed("_c39","dst_host_rerror_rate")\
      .withColumnRenamed("_c40","dst_host_srv_rerror_rate")\
      .withColumnRenamed("_c41","connection_status")

df.show(5)

+--------+-------------+--------+----+---------+---------+----+--------------+------+----+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----------------+
|duration|protocol_type| service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|host|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_hos


# Check the Multi-class labels (Multi-label classification)

In [None]:
# Check the Multi-class labels (Multi-label classification)
df.select("connection_status").distinct().show(30)



+-----------------+
|connection_status|
+-----------------+
|     warezmaster.|
|           smurf.|
|             pod.|
|            nmap.|
|            imap.|
|    guess_passwd.|
|         ipsweep.|
|       portsweep.|
|           satan.|
|            land.|
|      loadmodule.|
|       ftp_write.|
| buffer_overflow.|
|         rootkit.|
|     warezclient.|
|        teardrop.|
|            perl.|
|             phf.|
|        multihop.|
|         neptune.|
|            back.|
|             spy.|
|          normal.|
+-----------------+



# Count labels

In [None]:
# Count the labels
df.groupBy("connection_status").count().orderBy('count', ascending = False).show()



+-----------------+------+
|connection_status| count|
+-----------------+------+
|           smurf.|280790|
|         neptune.|107201|
|          normal.| 97278|
|            back.|  2203|
|           satan.|  1589|
|         ipsweep.|  1247|
|       portsweep.|  1040|
|     warezclient.|  1020|
|        teardrop.|   979|
|             pod.|   264|
|            nmap.|   231|
|    guess_passwd.|    53|
| buffer_overflow.|    30|
|            land.|    21|
|     warezmaster.|    20|
|            imap.|    12|
|         rootkit.|    10|
|      loadmodule.|     9|
|       ftp_write.|     8|
|        multihop.|     7|
+-----------------+------+
only showing top 20 rows



## StringIndexer for other features

In [None]:
# StringIndexer for other features
#to map labels into indices
indexer = StringIndexer(inputCol = 'connection_status', outputCol = 'label')
df = indexer.fit(df).transform(df)



In [None]:

df.groupBy("label").count().orderBy('count', ascending = False).show()

+-----+------+
|label| count|
+-----+------+
|  0.0|280790|
|  1.0|107201|
|  2.0| 97278|
|  3.0|  2203|
|  4.0|  1589|
|  5.0|  1247|
|  6.0|  1040|
|  7.0|  1020|
|  8.0|   979|
|  9.0|   264|
| 10.0|   231|
| 11.0|    53|
| 12.0|    30|
| 13.0|    21|
| 14.0|    20|
| 15.0|    12|
| 16.0|    10|
| 17.0|     9|
| 18.0|     8|
| 19.0|     7|
+-----+------+
only showing top 20 rows



# string indexer for other features that are categorical

In [None]:
columns_to_index = ["protocol_type", "service", "flag", "connection_status"]

for column in columns_to_index:
  indexer = StringIndexer(inputCol = column, outputCol = column + '_id')
  df = indexer.fit(df).transform(df)

df.show(5)


+--------+-------------+--------+----+---------+---------+----+--------------+------+----+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----------------+-----+-----------------+-----------+---------------------+----------------+----------+-------+--------------------+
|duration|protocol_type| service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|host|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_log

In [None]:
# drop unnessary features
exclude_cols = ["protocol_type", "service", "flag", "connection_status"]
input_cols = [col for col in df.columns if col not in exclude_cols]
df = df[input_cols]

df.show(2)

+--------+---------+---------+----+--------------+------+----+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----+-----------------+-----------+---------------------+----------------+----------+-------+--------------------+
|duration|src_bytes|dst_bytes|land|wrong_fragment|urgent|host|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate

# Vector Assembler

In [None]:
df.printSchema()

root
 |-- duration: string (nullable = true)
 |-- src_bytes: string (nullable = true)
 |-- dst_bytes: string (nullable = true)
 |-- land: string (nullable = true)
 |-- wrong_fragment: string (nullable = true)
 |-- urgent: string (nullable = true)
 |-- host: string (nullable = true)
 |-- num_failed_logins: string (nullable = true)
 |-- logged_in: string (nullable = true)
 |-- num_compromised: string (nullable = true)
 |-- root_shell: string (nullable = true)
 |-- su_attempted: string (nullable = true)
 |-- num_root: string (nullable = true)
 |-- num_file_creations: string (nullable = true)
 |-- num_shells: string (nullable = true)
 |-- num_access_files: string (nullable = true)
 |-- num_outbound_cmds: string (nullable = true)
 |-- is_host_login: string (nullable = true)
 |-- is_guest_login: string (nullable = true)
 |-- count: string (nullable = true)
 |-- srv_count: string (nullable = true)
 |-- serror_rate: string (nullable = true)
 |-- srv_serror_rate: string (nullable = true)
 |-- r

In [None]:
df.show(5)  # Show first 5 rows

+--------+---------+---------+----+--------------+------+----+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----+-----------------+-----------+---------------------+----------------+----------+-------+--------------------+
|duration|src_bytes|dst_bytes|land|wrong_fragment|urgent|host|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate

In [None]:

from pyspark.sql.functions import col

for column in df.columns:
  df = df.withColumn(column, col(column).cast("double"))



In [None]:
df.printSchema()

root
 |-- duration: double (nullable = true)
 |-- src_bytes: double (nullable = true)
 |-- dst_bytes: double (nullable = true)
 |-- land: double (nullable = true)
 |-- wrong_fragment: double (nullable = true)
 |-- urgent: double (nullable = true)
 |-- host: double (nullable = true)
 |-- num_failed_logins: double (nullable = true)
 |-- logged_in: double (nullable = true)
 |-- num_compromised: double (nullable = true)
 |-- root_shell: double (nullable = true)
 |-- su_attempted: double (nullable = true)
 |-- num_root: double (nullable = true)
 |-- num_file_creations: double (nullable = true)
 |-- num_shells: double (nullable = true)
 |-- num_access_files: double (nullable = true)
 |-- num_outbound_cmds: double (nullable = true)
 |-- is_host_login: double (nullable = true)
 |-- is_guest_login: double (nullable = true)
 |-- count: double (nullable = true)
 |-- srv_count: double (nullable = true)
 |-- serror_rate: double (nullable = true)
 |-- srv_serror_rate: double (nullable = true)
 |-- r

In [None]:
features_cols = [col for col in df.columns if col != 'label']

# Create a vectorassembler with all input features
assembler = VectorAssembler(inputCols = features_cols, outputCol = 'features')
data = assembler.transform(df)

data = data.select('features', 'label')

data.show(5, truncate = False)

+------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                              |label|
+------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(45,[1,8,19,20,25,26,28,29,30,31,32,38,39,40,41,42,44],[7233.0,1.0,8.0,4.0,0.5,0.38,255.0,10.0,0.04,0.34,0.04,1.0,6.0,2.0,1.0,6.0,2.0])               |2.0  |
|(45,[19,20,23,24,25,26,28,29,30,31,36,37,38,39,40,41,42,43,44],[137.0,4.0,1.0,1.0,0.03,0.06,255.0,4.0,0.02,0.05,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0]) |1.0  |
|(45,[1,19,20,25,28,29,30,32],[1032.0,250.0,250.0,1.0,255.0,255.0,1.0,1.0])                                                                            |0.0  |
|(45,[19,20,23,24,25,26,28,29,30,31,36,37,38,3

# Normalization and Standarization

In [None]:
#standard scaler
#scaler = StandardScaler(inputCol = "features", outputCol = "scaledFeatures")

#MinMax (x - Min)/ (Max - min)
#scaler = MinMaxScaler(inputCol = "features", outputCol = "scaledFeatures")

#robust scaler (X - med)/ IQR  -----IQR = Q3- Q1

scaler = RobustScaler(inputCol = "features", outputCol = "scaledFeaturess")

scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

data = data.select("scaledFeaturess", "label")
data.show(3, truncate = False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|scaledFeaturess                                                                                                                                                                                                 |label|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(45,[1,8,19,20,25,26,28,29,30,31,32,38,39,40,41,42,44],[7.328267477203648,0.0,0.02030456852791878,0.007984031936127744,0.0,0.0,0.0,0.04784688995215311,0.06779661016949151,8.5,0.04,1.0,6.0,2.0,1.0,6.0,2.0])   |2.0  |
|(45,[19,20,23,24,25,26,28,29,30,31,36,37,38,39,40,41,42,43,44],[0.3477157360406091,0.007984031936127744,0.0,0.0,0.0,0.0,0.0,0.01913

# Split Data: Training and Testing

In [None]:
train_data, test_data = data.randomSplit([0.7, 0.3], seed = 1234)
train_data.show(2, truncate = False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|scaledFeaturess                                                                                                                                                                                                                                                 |label|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(45,[0,1,2,6,8,9,10,12,13,14,15,19,20,25,28,29,30,32,38,39,40,41,42,44],[0.0,1.430597771023303,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0025380710659898475,0.001996007984031936,0.0,0.0,0.004784688995215311,1

# Apply Naive Bayes model

In [None]:
nb = NaiveBayes(featuresCol= "scaledFeaturess", labelCol = "label", smoothing= 0.5 ) # (0.1, 1)
model = nb.fit(train_data)

nb_predictions_test = model.transform(test_data)

In [None]:
nb_predictions_test.select('label', 'prediction').show(25)

+-----+----------+
|label|prediction|
+-----+----------+
| 18.0|       5.0|
| 11.0|      11.0|
| 12.0|      12.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
|  3.0|       3.0|
| 18.0|      14.0|
| 14.0|      16.0|
| 12.0|      12.0|
| 16.0|      16.0|
|  2.0|       1.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
|  7.0|       2.0|
+-----+----------+
only showing top 25 rows



# Evaluate the Model

# Confusion Matrix

In [None]:
cm = nb_predictions_test.groupBy('label', 'prediction'). count()
cm.show(cm.count(), truncate = False)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|6.0  |1.0       |7    |
|6.0  |8.0       |34   |
|13.0 |11.0      |12   |
|7.0  |12.0      |66   |
|5.0  |1.0       |3    |
|18.0 |14.0      |2    |
|5.0  |4.0       |26   |
|12.0 |12.0      |4    |
|9.0  |10.0      |8    |
|1.0  |1.0       |31935|
|21.0 |16.0      |2    |
|4.0  |5.0       |1    |
|4.0  |2.0       |17   |
|10.0 |10.0      |5    |
|14.0 |16.0      |1    |
|2.0  |7.0       |35   |
|22.0 |16.0      |1    |
|6.0  |4.0       |264  |
|14.0 |14.0      |6    |
|7.0  |6.0       |16   |
|7.0  |2.0       |188  |
|2.0  |2.0       |26361|
|4.0  |8.0       |15   |
|8.0  |4.0       |13   |
|2.0  |15.0      |8    |
|18.0 |5.0       |1    |
|2.0  |3.0       |59   |
|7.0  |10.0      |3    |
|4.0  |4.0       |397  |
|20.0 |20.0      |1    |
|9.0  |9.0       |74   |
|10.0 |4.0       |31   |
|2.0  |4.0       |1760 |
|10.0 |5.0       |36   |
|6.0  |10.0      |9    |
|8.0  |8.0       |255  |
|7.0  |11.0      |3    |


In [None]:
confusion_matrix = cm.groupBy("label").pivot("prediction").sum("count").na.fill(0)
confusion_matrix.show()

+-----+-----+-----+-----+---+----+---+---+---+---+---+----+----+----+----+----+----+----+----+
|label|  0.0|  1.0|  2.0|3.0| 4.0|5.0|6.0|7.0|8.0|9.0|10.0|11.0|12.0|14.0|15.0|16.0|17.0|20.0|
+-----+-----+-----+-----+---+----+---+---+---+---+---+----+----+----+----+----+----+----+----+
|  8.0|    0|    0|    0|  0|  13|  0|  0|  0|255|  0|   0|   0|   0|   0|   0|   0|   0|   0|
|  0.0|83792|    0|   12|  0|   0|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|
|  7.0|    0|    0|  188|  0|   0|  0| 16|  0|  0|  0|   3|   3|  66|   0|   0|   0|   0|   0|
| 18.0|    0|    0|    0|  0|   0|  1|  0|  0|  0|  0|   0|   0|   0|   2|   0|   0|   0|   0|
|  1.0|    0|31935|   12|  0|   0|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|
|  4.0|    0|    4|   17|  0| 397|  1|  0|  0| 15|  0|   0|   0|   0|   0|   0|   0|   0|   0|
| 11.0|    0|    0|    0|  0|   0|  0|  0|  0|  0|  0|   0|  23|   0|   0|   0|   0|   0|   0|
| 21.0|    0|    0|    0|  0|   0|  0|  0|  0|  0|

# Additional Performance metrics



# Hamming Loss and Log Loss

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction', labelCol = 'label')

# compute
accuracy = evaluator.evaluate(nb_predictions_test, {evaluator.metricName: "accuracy"})
f1_score = evaluator.evaluate(nb_predictions_test, {evaluator.metricName: "f1"})
precision = evaluator.evaluate(nb_predictions_test, {evaluator.metricName: "precisionByLabel"})
recall = evaluator.evaluate(nb_predictions_test, {evaluator.metricName: "recallByLabel"})
hamming = evaluator.evaluate(nb_predictions_test, {evaluator.metricName: "hammingLoss"})
log = evaluator.evaluate(nb_predictions_test, {evaluator.metricName: "logLoss"})

# print
print("Accuracy:", round(accuracy, 5) * 100)
print("f1_score:", round(f1_score, 5) * 100)
print("precision:", round(precision, 5) * 100)
print("recall:", round(recall, 5) * 100)
print("hamming:", hamming)
print("log loss:", log)

Accuracy: 97.465
f1_score: 97.775
precision: 100.0
recall: 99.98599999999999
hamming: 0.025351788242786392
log loss: 0.4405305855092477


# Optimization (Grid Search and Random Search)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator



# Grid Search results

In [None]:
nb = NaiveBayes(featuresCol = 'scaledFeaturess', labelCol = 'label')

# Define the parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
              .addGrid(nb.smoothing, [0.1, 0.15, 0.2, 0.27, 0.3, 0.33, 0.4, 0.48, 0.5, 0.6, 0.7, 0.72, 0.76, 0.8, 0.9, 1]) \
              .build()

# Define the evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction', labelCol = 'label', metricName = 'accuracy')

# Define the cross_validator
crossval = CrossValidator(estimator = nb,
                          estimatorParamMaps = param_grid,
                          evaluator = evaluator,
                          numFolds = 5)

# Train the models
cv_model = crossval.fit(train_data)

# Get the best model
best_model = cv_model.bestModel

# Make predictions
predictions = best_model.transform(test_data)

# Evaluate
accuracy = evaluator.evaluate(predictions)

# Get the best smoothing value
best_smoothing = best_model.getOrDefault("smoothing")

# Print
print("best smoothing value: " , best_smoothing)
print("accuracy: ", accuracy)

best smoothing value:  0.1
accuracy:  0.9749192089591675
