In [None]:
import pandas as pd

# Load the CSV file into a DataFrame
df = pd.read_csv('/content/UNSW-NB15_3.csv')

# Display the first 5 rows
display(df.head().T)

# Print the column names and their data types
print(df.info())

Unnamed: 0,0,1,2,3,4
59.166.0.1,59.166.0.3,59.166.0.8,149.171.126.18,149.171.126.18,59.166.0.3
18247,54771,13289,1043,1043,10275
149.171.126.4,149.171.126.2,149.171.126.9,175.45.176.3,175.45.176.3,149.171.126.0
7662,27709,5190,53,53,25
tcp,tcp,tcp,udp,udp,tcp
FIN,FIN,FIN,INT,INT,FIN
0.119596,0.650574,0.00798,0.000005,0.000005,0.486578
4550,8928,2158,264,264,37462
68342,320,2464,0,0,3380
31,31,31,60,60,31


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 700000 entries, 0 to 699999
Data columns (total 49 columns):
 #   Column         Non-Null Count   Dtype  
---  ------         --------------   -----  
 0   59.166.0.1     700000 non-null  object 
 1   18247          700000 non-null  int64  
 2   149.171.126.4  700000 non-null  object 
 3   7662           700000 non-null  object 
 4   tcp            700000 non-null  object 
 5   FIN            700000 non-null  object 
 6   0.119596       700000 non-null  float64
 7   4550           700000 non-null  int64  
 8   68342          700000 non-null  int64  
 9   31             700000 non-null  int64  
 10  29             700000 non-null  int64  
 11  7              700000 non-null  int64  
 12  33             700000 non-null  int64  
 13  -              700000 non-null  object 
 14  300478.2813    700000 non-null  float64
 15  4514398.5      700000 non-null  float64
 16  78             700000 non-null  int64  
 17  80             700000 non-nul

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("UNSW-NB15").getOrCreate()

# Load the dataset into a Spark DataFrame
spark_df = spark.read.csv('/content/UNSW-NB15_3.csv', header=True, inferSchema=True)

# Display the schema and the first few rows
spark_df.printSchema()
spark_df.show(5)

root
 |-- 59.166.0.1: string (nullable = true)
 |-- 18247: integer (nullable = true)
 |-- 149.171.126.4: string (nullable = true)
 |-- 7662: string (nullable = true)
 |-- tcp: string (nullable = true)
 |-- FIN: string (nullable = true)
 |-- 0.119596: double (nullable = true)
 |-- 4550: integer (nullable = true)
 |-- 68342: integer (nullable = true)
 |-- 31: integer (nullable = true)
 |-- 29: integer (nullable = true)
 |-- 7: integer (nullable = true)
 |-- 33: integer (nullable = true)
 |-- -: string (nullable = true)
 |-- 300478.2813: double (nullable = true)
 |-- 4514398.5: double (nullable = true)
 |-- 78: integer (nullable = true)
 |-- 80: integer (nullable = true)
 |-- 25518: integer (nullable = true)
 |-- 25519: integer (nullable = true)
 |-- 1818376620: long (nullable = true)
 |-- 1818657356: long (nullable = true)
 |-- 58: integer (nullable = true)
 |-- 854: integer (nullable = true)
 |-- 024: integer (nullable = true)
 |-- 025: integer (nullable = true)
 |-- 87.137592: double (

Let's start by checking for missing values in the Spark DataFrame.

In [None]:
from pyspark.sql.functions import col, sum

# Check for null values in each column
spark_df.select(*(sum(col("`" + c + "`").isNull().cast("integer")).alias(c) for c in spark_df.columns)).show()

+----------+-----+-------------+----+---+---+--------+----+-----+---+---+---+---+---+-----------+---------+---+---+-----+-----+----------+----------+---+---+---+---+---------+---------+------------+------------+--------+-------+--------+--------+--------+---+---+------+------+---+---+---+---+---+---+---+---+------+---+
|59.166.0.1|18247|149.171.126.4|7662|tcp|FIN|0.119596|4550|68342| 31| 29|  7| 33|  -|300478.2813|4514398.5| 78| 80|25518|25519|1818376620|1818657356| 58|854|024|025|87.137592|85.643619|142423112928|142423112929|1.549156|1.50719|0.000644|0.000521|0.000123|035|036|  _c37|  _c38|   |  6|241|242|  5|144|145|246|  _c47|048|
+----------+-----+-------------+----+---+---+--------+----+-----+---+---+---+---+---+-----------+---------+---+---+-----+-----+----------+----------+---+---+---+---+---------+---------+------------+------------+--------+-------+--------+--------+--------+---+---+------+------+---+---+---+---+---+---+---+---+------+---+
|         0|    0|            0|   0|

In [None]:
# Drop columns with a large number of missing values
columns_to_drop = ["_c37", "_c38", "_c47"]
spark_df = spark_df.drop(*columns_to_drop)

# Verify that the columns have been dropped
print("Columns after dropping:", spark_df.columns)

Columns after dropping: ['59.166.0.1', '18247', '149.171.126.4', '7662', 'tcp', 'FIN', '0.119596', '4550', '68342', '31', '29', '7', '33', '-', '300478.2813', '4514398.5', '78', '80', '25518', '25519', '1818376620', '1818657356', '58', '854', '024', '025', '87.137592', '85.643619', '142423112928', '142423112929', '1.549156', '1.50719', '0.000644', '0.000521', '0.000123', '035', '036', ' ', '6', '241', '242', '5', '144', '145', '246', '048']


In [None]:
# Define new column names based on UNSW-NB15 dataset documentation
new_columns = [
    "srcip", "sport", "dstip", "dsport", "proto", "state", "dur", "sbytes",
    "dbytes", "sttl", "dttl", "sloss", "dloss", "service", "sload", "dload",
    "spkts", "dpkts", "swin", "dwin", "stcpb", "dtcpb", "smeansz", "dmeansz",
    "trans_depth", "res_bdy_len", "sjit", "djit", "stime", "ltime", "sintpkt",
    "dintpkt", "tcprtt", "synack", "ackdat", "is_sm_ips_ports",
    "ct_state_ttl", "ct_dst_ltm", "ct_src_dport_ltm", "ct_dst_sport_ltm",
    "ct_dst_src_ltm", "is_ftp_login", "ct_ftp_cmd", "ct_flw_http_mthd",
    "ct_srv_dst", "is_ssh", "ct_shost_dport", "ct_srv_src"
]

# Get the current column names
current_columns = spark_df.columns

# Create a mapping of old to new column names
rename_mapping = dict(zip(current_columns, new_columns))

# Rename the columns
spark_df = spark_df.select([col("`" + c + "`").alias(rename_mapping.get(c, c)) for c in current_columns])

# Display the new column names and schema
print("Columns after renaming:", spark_df.columns)
spark_df.printSchema()

Columns after renaming: ['srcip', 'sport', 'dstip', 'dsport', 'proto', 'state', 'dur', 'sbytes', 'dbytes', 'sttl', 'dttl', 'sloss', 'dloss', 'service', 'sload', 'dload', 'spkts', 'dpkts', 'swin', 'dwin', 'stcpb', 'dtcpb', 'smeansz', 'dmeansz', 'trans_depth', 'res_bdy_len', 'sjit', 'djit', 'stime', 'ltime', 'sintpkt', 'dintpkt', 'tcprtt', 'synack', 'ackdat', 'is_sm_ips_ports', 'ct_state_ttl', 'ct_dst_ltm', 'ct_src_dport_ltm', 'ct_dst_sport_ltm', 'ct_dst_src_ltm', 'is_ftp_login', 'ct_ftp_cmd', 'ct_flw_http_mthd', 'ct_srv_dst', 'is_ssh']
root
 |-- srcip: string (nullable = true)
 |-- sport: integer (nullable = true)
 |-- dstip: string (nullable = true)
 |-- dsport: string (nullable = true)
 |-- proto: string (nullable = true)
 |-- state: string (nullable = true)
 |-- dur: double (nullable = true)
 |-- sbytes: integer (nullable = true)
 |-- dbytes: integer (nullable = true)
 |-- sttl: integer (nullable = true)
 |-- dttl: integer (nullable = true)
 |-- sloss: integer (nullable = true)
 |-- 

In [None]:
# Identify categorical columns (object/string types) from the schema
categorical_cols = [field.name for field in spark_df.schema.fields if field.dataType.simpleString() == "string"]

# Display unique values and their counts for each categorical column
for col_name in categorical_cols:
    print(f"Unique values and counts for column: {col_name}")
    spark_df.groupBy(col_name).count().show(truncate=False)

Unique values and counts for column: srcip
+--------------+-----+
|srcip         |count|
+--------------+-----+
|149.171.126.16|2    |
|59.166.0.5    |41996|
|10.40.170.2   |302  |
|59.166.0.8    |39440|
|10.40.85.1    |588  |
|149.171.126.18|64749|
|59.166.0.3    |41800|
|175.45.176.1  |66063|
|149.171.126.15|44870|
|10.40.85.10   |341  |
|149.171.126.4 |5    |
|59.166.0.6    |40384|
|59.166.0.4    |42452|
|149.171.126.17|11   |
|149.171.126.13|108  |
|149.171.126.10|6    |
|59.166.0.1    |42262|
|149.171.126.3 |5    |
|59.166.0.0    |41477|
|59.166.0.2    |41602|
+--------------+-----+
only showing top 20 rows

Unique values and counts for column: dstip
+--------------+-----+
|dstip         |count|
+--------------+-----+
|149.171.126.16|6915 |
|10.40.170.2   |302  |
|59.166.0.8    |2    |
|10.40.85.1    |428  |
|149.171.126.18|80821|
|224.0.0.5     |640  |
|192.168.241.50|117  |
|149.171.126.7 |40556|
|59.166.0.3    |1    |
|175.45.176.1  |26950|
|149.171.126.15|43788|
|149.171.126.4

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Identify categorical columns (excluding the target column if it were present, but we don't have one yet)
# For now, we'll encode all string type columns identified earlier
categorical_cols = [field.name for field in spark_df.schema.fields if field.dataType.simpleString() == "string"]

# Create StringIndexer and OneHotEncoder stages for each categorical column
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_indexed", handleInvalid="keep") for col_name in categorical_cols]
encoders = [OneHotEncoder(inputCol=col_name + "_indexed", outputCol=col_name + "_encoded") for col_name in categorical_cols]

# Create a pipeline
pipeline = Pipeline(stages=indexers + encoders)

# Fit the pipeline to the data and transform the data
pipeline_model = pipeline.fit(spark_df)
spark_df_encoded = pipeline_model.transform(spark_df)

# Display the schema of the encoded DataFrame to see the new columns
print("Schema after encoding categorical features:")
spark_df_encoded.printSchema()

# Show the first few rows with the new encoded columns
print("First 5 rows after encoding:")
spark_df_encoded.show(5)

Schema after encoding categorical features:
root
 |-- srcip: string (nullable = true)
 |-- sport: integer (nullable = true)
 |-- dstip: string (nullable = true)
 |-- dsport: string (nullable = true)
 |-- proto: string (nullable = true)
 |-- state: string (nullable = true)
 |-- dur: double (nullable = true)
 |-- sbytes: integer (nullable = true)
 |-- dbytes: integer (nullable = true)
 |-- sttl: integer (nullable = true)
 |-- dttl: integer (nullable = true)
 |-- sloss: integer (nullable = true)
 |-- dloss: integer (nullable = true)
 |-- service: string (nullable = true)
 |-- sload: double (nullable = true)
 |-- dload: double (nullable = true)
 |-- spkts: integer (nullable = true)
 |-- dpkts: integer (nullable = true)
 |-- swin: integer (nullable = true)
 |-- dwin: integer (nullable = true)
 |-- stcpb: long (nullable = true)
 |-- dtcpb: long (nullable = true)
 |-- smeansz: integer (nullable = true)
 |-- dmeansz: integer (nullable = true)
 |-- trans_depth: integer (nullable = true)
 |-- re

In [None]:
from pyspark.ml.feature import VectorAssembler

# Identify numerical columns (excluding the time-based columns if they are not intended as features)
# For now, let's include all numerical columns except 'stime' and 'ltime' as they might represent timestamps
numerical_cols = [field.name for field in spark_df_encoded.schema.fields if field.dataType.simpleString() != "string" and field.name not in ["stime", "ltime"]]

# Identify the encoded categorical columns
encoded_categorical_cols = [col_name + "_encoded" for col_name in categorical_cols]

# Combine all feature columns
feature_cols = numerical_cols + encoded_categorical_cols

# Create a VectorAssembler to combine all feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the DataFrame to include the features vector
spark_df_features = assembler.transform(spark_df_encoded)

# Drop the original categorical columns and the intermediate indexed columns
columns_to_drop_final = categorical_cols + [col_name + "_indexed" for col_name in categorical_cols]
spark_df_final = spark_df_features.drop(*columns_to_drop_final)

# Display the schema and the first few rows of the final DataFrame
print("Schema after assembling features and dropping intermediate columns:")
spark_df_final.printSchema()

print("First 5 rows of the final DataFrame with features vector:")
spark_df_final.select("features").show(5, truncate=False)

Schema after assembling features and dropping intermediate columns:
root
 |-- sport: integer (nullable = true)
 |-- dur: double (nullable = true)
 |-- sbytes: integer (nullable = true)
 |-- dbytes: integer (nullable = true)
 |-- sttl: integer (nullable = true)
 |-- dttl: integer (nullable = true)
 |-- sloss: integer (nullable = true)
 |-- dloss: integer (nullable = true)
 |-- sload: double (nullable = true)
 |-- dload: double (nullable = true)
 |-- spkts: integer (nullable = true)
 |-- dpkts: integer (nullable = true)
 |-- swin: integer (nullable = true)
 |-- dwin: integer (nullable = true)
 |-- stcpb: long (nullable = true)
 |-- dtcpb: long (nullable = true)
 |-- smeansz: integer (nullable = true)
 |-- dmeansz: integer (nullable = true)
 |-- trans_depth: integer (nullable = true)
 |-- res_bdy_len: integer (nullable = true)
 |-- sjit: double (nullable = true)
 |-- djit: double (nullable = true)
 |-- stime: integer (nullable = true)
 |-- ltime: integer (nullable = true)
 |-- sintpkt: do

In [None]:
# Check unique values and counts for the 'is_ssh' column
print("Unique values and counts for 'is_ssh' column:")
spark_df_final.groupBy("is_ssh").count().show()

Unique values and counts for 'is_ssh' column:
+------+------+
|is_ssh| count|
+------+------+
|     1|157425|
|     0|542575|
+------+------+



In [None]:
# Split the data into training and testing sets (e.g., 80% train, 20% test)
train_df, test_df = spark_df_final.randomSplit([0.8, 0.2], seed=42)

# Print the number of rows in each set to verify the split
print("Training data count:", train_df.count())
print("Testing data count:", test_df.count())

Training data count: 559845
Testing data count: 140155


In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize the Random Forest Classifier model
# We use the 'features' column as input and 'is_ssh' as the label column
# Increased maxBins to handle high cardinality categorical features
rf = RandomForestClassifier(labelCol="is_ssh", featuresCol="features", numTrees=10, maxBins=65536) # Increased maxBins

# Train the model on the training data
rf_model = rf.fit(train_df)

print("Random Forest Model trained successfully!")

Random Forest Model trained successfully!


In [None]:
# Make predictions on the test data
predictions = rf_model.transform(test_df)

# Select (prediction, true label) pairs and compute evaluation metrics
evaluator_roc = BinaryClassificationEvaluator(labelCol="is_ssh", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
evaluator_pr = BinaryClassificationEvaluator(labelCol="is_ssh", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

# Calculate Area Under ROC Curve (AUC)
auc = evaluator_roc.evaluate(predictions)
print(f"Area Under ROC Curve (AUC): {auc:.4f}")

# Calculate Area Under PR Curve (AUPRC)
auprc = evaluator_pr.evaluate(predictions)
print(f"Area Under PR Curve (AUPRC): {auprc:.4f}")

# You can also show some prediction examples
print("Sample Predictions:")
predictions.select("features", "is_ssh", "prediction", "probability").show(5)

Area Under ROC Curve (AUC): 0.9900
Area Under PR Curve (AUPRC): 0.9733
Sample Predictions:
+--------------------+------+----------+--------------------+
|            features|is_ssh|prediction|         probability|
+--------------------+------+----------+--------------------+
|(116734,[2,10,16,...|     0|       0.0|[0.78983220958132...|
|(116734,[2,10,16,...|     0|       0.0|[0.75638931689210...|
|(116734,[2,10,16,...|     0|       0.0|[0.78983220958132...|
|(116734,[2,10,16,...|     0|       0.0|[0.78983220958132...|
|(116734,[2,10,16,...|     0|       0.0|[0.78983220958132...|
+--------------------+------+----------+--------------------+
only showing top 5 rows

