In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("GMMExample").getOrCreate()

# Read CSV data from the data folder
df = spark.read.csv("../data/Thursday.csv", header=True, inferSchema=True)

# Select current columns plus an additional label column for testing
columns_to_keep = [
    "protocol",
    "src2dst_packets",
    "dst2src_packets",
    "src2dst_bytes",
    "dst2src_bytes",
    "bidirectional_duration_ms",
    "bidirectional_min_ps",
    "bidirectional_max_ps",
    "bidirectional_mean_ps",
    "bidirectional_stddev_ps",
    "src2dst_max_ps",
    "src2dst_min_ps",
    "src2dst_mean_ps",
    "src2dst_stddev_ps",
    "dst2src_max_ps",
    "dst2src_min_ps",
    "dst2src_mean_ps",
    "dst2src_stddev_ps",
    "bidirectional_mean_piat_ms",
    "bidirectional_stddev_piat_ms",
    "bidirectional_max_piat_ms",
    "bidirectional_min_piat_ms",
    "src2dst_mean_piat_ms",
    "src2dst_stddev_piat_ms",
    "src2dst_max_piat_ms",
    "src2dst_min_piat_ms",
    "dst2src_mean_piat_ms",
    "dst2src_stddev_piat_ms",
    "dst2src_max_piat_ms",
    "dst2src_min_piat_ms",
    "bidirectional_fin_packets",
    "bidirectional_syn_packets",
    "bidirectional_rst_packets",
    "bidirectional_psh_packets",
    "bidirectional_ack_packets",
    "bidirectional_urg_packets",
    "bidirectional_cwr_packets",
    "bidirectional_ece_packets",
    "src2dst_psh_packets",
    "dst2src_psh_packets",
    "src2dst_urg_packets",
    "dst2src_urg_packets",
    # Add your label column here (e.g. "label" or "attack_type")
    "label"
]

df = df.select([col(c) for c in columns_to_keep if c in df.columns])
df.show(5)

In [None]:
df.groupBy("label").count().show()

In [None]:
import matplotlib.pyplot as plt

# Visualize the barchart of the number of samples in each class
if "label" in df.columns:
    class_counts = df.groupBy("label").count().toPandas()
    plt.figure(figsize=(8, 5))
    plt.bar(class_counts["label"].astype(str), class_counts["count"])
    plt.xlabel("Class Label")
    plt.ylabel("Number of Samples")
    plt.title("Number of Samples in Each Class")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
else:
    print("No label column found for class visualization.")

In [None]:
from pyspark.sql.functions import when

# Encode label: BENIGN as 0, all others as 1 (ANOMALY)
if "label" in df.columns:
    df = df.withColumn("label", when(col("label") == "BENIGN", "BENIGN").otherwise("ANOMALY"))
    df.groupBy("label").count().show()
else:
    print("No label column found for encoding.")

In [None]:
# Step 2: Data Cleaning (drop missing values)
from pyspark.sql.functions import isnan, isnull

nan_condition = None
for c in df.columns:
    cond = isnull(col(c)) | isnan(col(c))
    nan_condition = cond if nan_condition is None else nan_condition | cond

df.filter(nan_condition).show(5, truncate=False)

df_clean = df.dropna()
print("Rows after dropna():", df_clean.count())

In [None]:
# Step 3: Data Encoding (categorical to numeric)
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import StringType

# Exclude 'label' from categorical_cols for encoding as it is the target
categorical_cols = [field.name for field in df_clean.schema.fields if isinstance(field.dataType, StringType) and field.name != "label"]
numeric_cols = [field.name for field in df_clean.schema.fields if field.dataType.typeName() in ['integer', 'double', 'long', 'float']]

for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_idx").fit(df_clean)
    df_clean = indexer.transform(df_clean)

# Encode label column if it's string type
if "label" in [f.name for f in df_clean.schema.fields]:
    label_field = [f for f in df_clean.schema.fields if f.name == "label"][0]
    if isinstance(label_field.dataType, StringType):
        label_indexer = StringIndexer(inputCol="label", outputCol="label_idx").fit(df_clean)
        df_clean = label_indexer.transform(df_clean)
        label_col = "label_idx"
    else:
        label_col = "label"
else:
    label_col = None

feature_cols = [col + "_idx" for col in categorical_cols] + numeric_cols
df_encoded = df_clean
df_encoded.select(feature_cols + ([label_col] if label_col else [])).show(5)

In [None]:
# Step 4: Data Normalization (scaling)
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
df_features = assembler.transform(df_encoded)

scaler = StandardScaler(inputCol="features_vec", outputCol="features_scaled")
scaler_model = scaler.fit(df_features)
df_scaled = scaler_model.transform(df_features)

df_scaled.select("features_scaled").show(5, truncate=False)

In [None]:
# Step 5: Train/Test Gaussian Mixture Model
from pyspark.ml.clustering import GaussianMixture

# Use only the scaled features and label for evaluation
if label_col:
    data = df_scaled.select("features_scaled", label_col).withColumnRenamed("features_scaled", "features").withColumnRenamed(label_col, "label")
else:
    data = df_scaled.select("features_scaled").withColumnRenamed("features_scaled", "features")

# Split data
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Train GMM
gmm = GaussianMixture(featuresCol="features", k=2, seed=42)
model = gmm.fit(train_data)

# Predict clusters for test data
predictions = model.transform(test_data)
predictions.show(5)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Ensure prediction column is double type for evaluator compatibility
if "label" in predictions.columns:
    predictions = predictions.withColumn("prediction", col("prediction").cast("double"))
    predictions = predictions.withColumn("label", col("label").cast("double"))
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print(f"Accuracy: {accuracy}")

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
    f1 = evaluator.evaluate(predictions)
    print(f"F1 Score: {f1}")
else:
    print("No label column found; skipping classification metrics.")

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

# Use the same train/test data as before
if "label" in train_data.columns:
    dt = DecisionTreeClassifier(featuresCol="features", labelCol="label", seed=42)
    dt_model = dt.fit(train_data)
    dt_predictions = dt_model.transform(test_data)
    dt_predictions.show(5)

    # Evaluate Decision Tree model
    dt_predictions = dt_predictions.withColumn("prediction", col("prediction").cast("double"))
    dt_predictions = dt_predictions.withColumn("label", col("label").cast("double"))
    dt_accuracy = evaluator.evaluate(dt_predictions)
    print(f"Decision Tree Accuracy: {dt_accuracy}")

    dt_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1").evaluate(dt_predictions)
    print(f"Decision Tree F1 Score: {dt_f1}")
else:
    print("No label column found; skipping Decision Tree classification.")