

This implementation performs the following changes:
1. Changes any instances of "null" or NaN for numbers to the mean value.
2. Changes any null values for strings to 'null'
3. Uses a StringIndexer on the following features to conver them to numerical data. The feature name is changed to +'_indexed' when using this method. a. 'service' b. 'conn_state' c. 'history' d. 'proto' e. 'dest_ip_zeek' f. 'community_id' g. 'uid' h. 'src_ip_zeek'
4. The original feature columns (not _indexed) are removed.
5. Uses a StringIndexer on the class labels, 'label_tactic', to convert them to numerical data.
6. Uses a VectorAssembler on the new features.
7. Trains the SVM model using OVR.

In [None]:
!pip install pyspark

In [2]:
# PySpark Imports
import pyspark
from pyspark.sql import SparkSession

# ML Classifier Imports
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import OneVsRest
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import mean, col

In [4]:
# Initialize Spark session
spark = SparkSession.builder.appName("ce53") \
    .master("local") \
    .config("spark.driver.cores", "4") \
    .config("spark.driver.memory", "5g") \
    .config("spark.executor.memory", "5g") \
    .config("spark.executor.cores", "4") \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "4") \
    .config("spark.executor.instances", "2") \
.getOrCreate()

In [5]:
# Get the parquet files (current example is 2 from the website)
parquet_files = ["/content/drive/MyDrive/School/Parquet/part-00000-7c2e9adb-5430-4792-a42b-10ff5bbd46e8-c000.snappy.parquet", "/content/drive/MyDrive/School/Parquet/part-00000-df678a79-4a73-452b-8e72-d624b2732f17-c000.snappy.parquet"]

# Read the parquet files into a dataframe
df = spark.read.parquet(*parquet_files, inferSchema=True)

In [None]:
# print
df.printSchema()

In [None]:
# Examine the unique values in the df
for col in df.columns:
  unique_values = df.select(col).distinct().collect()
  print(f"Unique values for column '{col}': {unique_values}")

In [6]:
from pyspark.sql.functions import mean, col

# List of numeric column names
numeric_columns = ['resp_pkts', 'orig_ip_bytes', 'missed_bytes', 'duration', 'orig_pkts',
                   'resp_ip_bytes', 'dest_port_zeek', 'orig_bytes', 'resp_bytes',
                   'src_port_zeek', 'ts']

# Calculate mean for each numeric column
mean_values = df.select([mean(col(column)).alias(column) for column in numeric_columns]).collect()[0].asDict()

# Replace null or NaN values with mean
for column in numeric_columns:
    mean_value = mean_values[column]
    df = df.fillna({column: mean_value}, subset=[column])

# Show updated DataFrame
df.show()

+---------+-------+-------------+----------+------------+-----+--------------------+----------+---------------+---------+--------------------+-------------+--------------+----------+----------+--------------------+-------+----------+------------------+-------------+-------------------+--------------------+------------+
|resp_pkts|service|orig_ip_bytes|local_resp|missed_bytes|proto|            duration|conn_state|   dest_ip_zeek|orig_pkts|        community_id|resp_ip_bytes|dest_port_zeek|orig_bytes|local_orig|            datetime|history|resp_bytes|               uid|src_port_zeek|                 ts|         src_ip_zeek|label_tactic|
+---------+-------+-------------+----------+------------+-----+--------------------+----------+---------------+---------+--------------------+-------------+--------------+----------+----------+--------------------+-------+----------+------------------+-------------+-------------------+--------------------+------------+
|  1689365|   NULL|    141906660|    

In [7]:
# Drop the datetime column
df = df.drop("datetime")

# Define columns to index
columns_to_index = ['service', 'conn_state', 'history', 'proto', 'dest_ip_zeek', 'community_id', 'uid', 'src_ip_zeek', 'label_tactic']

# Impute null values with 'null' string
for column in columns_to_index:
    df = df.fillna('null', subset=[column])

# Apply StringIndexer to each column
indexers = [StringIndexer(inputCol=column, outputCol=column+"_indexed").fit(df) for column in columns_to_index]

# Chain indexers together
pipeline = Pipeline(stages=indexers)

# Fit and transform the data
df_indexed = pipeline.fit(df).transform(df)

# Drop original columns
df_indexed = df_indexed.drop(*columns_to_index)

# Show the schema of the DataFrame
df_indexed.show()

+---------+-------------+----------+------------+--------------------+---------+-------------+--------------+----------+----------+----------+-------------+-------------------+---------------+------------------+---------------+-------------+--------------------+--------------------+-----------+-------------------+--------------------+
|resp_pkts|orig_ip_bytes|local_resp|missed_bytes|            duration|orig_pkts|resp_ip_bytes|dest_port_zeek|orig_bytes|local_orig|resp_bytes|src_port_zeek|                 ts|service_indexed|conn_state_indexed|history_indexed|proto_indexed|dest_ip_zeek_indexed|community_id_indexed|uid_indexed|src_ip_zeek_indexed|label_tactic_indexed|
+---------+-------------+----------+------------+--------------------+---------+-------------+--------------+----------+----------+----------+-------------+-------------------+---------------+------------------+---------------+-------------+--------------------+--------------------+-----------+-------------------+-----------

In [None]:
# Check for null values in each column
for column in df_indexed.columns:
    null_count = df_indexed.where(col(column).isNull()).count()
    print(f"Null count in column {column}: {null_count}")

In [8]:
# List of columns to assemble
columns_to_assemble = df_indexed.columns

# Remove the target column (label) if it's in the list
columns_to_assemble.remove('label_tactic_indexed')

# Create the VectorAssembler
assembler = VectorAssembler(inputCols=columns_to_assemble, outputCol="features")

# Transform the DataFrame
df_assembled = assembler.transform(df_indexed)

# Select only the features and label columns
df_assembled = df_assembled.select("features", "label_tactic_indexed")

# Show the schema of the DataFrame
df_assembled.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label_tactic_indexed: double (nullable = false)



In [9]:
# Split the data into training and test sets
train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=1)

# Create the SVM model
svm = LinearSVC(labelCol="label_tactic_indexed", featuresCol="features", maxIter=10)

# One Vs. Rest
ovr = OneVsRest(classifier=svm, labelCol='label_tactic_indexed')

# Fit the model
svm_model = ovr.fit(train_data)

# Make predictions
predictions = svm_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label_tactic_indexed", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 1.0


In [10]:
spark.sparkContext.stop()