# Anomaly detection with K-means Clustering 

Lab Exercises:
1. Implement a PySpark script to handle any missing values and scale numerical features.
2. Develop a PySpark script that uses the K-means algorithm to cluster data points.
3. Develop a PySpark script that labels data points as anomalies based on their cluster assignments.
4. Implement code to evaluate the effectiveness of the K-means clustering model in detecting anomalies.

In [1]:
import pyspark
import os
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_5').getOrCreate()



In [23]:
# read the CSV file into a Spark df without inferring the schema and without considering the first row as header
data_without_header = spark.read.option("inferSchema", True).option("header", False).csv("kddcup.csv")

# define the column names for df
column_names = [
    "duration", "protocol_type", "service", "flag",
    "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
    "hot", "num_failed_logins", "logged_in", "num_compromised",
    "root_shell", "su_attempted", "num_root", "num_file_creations",
    "num_shells", "num_access_files", "num_outbound_cmds",
    "is_host_login", "is_guest_login", "count", "srv_count",
    "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
    "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
    "dst_host_count", "dst_host_srv_count",
    "dst_host_same_srv_rate", "dst_host_diff_srv_rate",
    "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
    "dst_host_serror_rate", "dst_host_srv_serror_rate",
    "dst_host_rerror_rate", "dst_host_srv_rerror_rate",
    "label"
]

# Convert the df without header to df with specified column names
data = data_without_header.toDF(*column_names)

In [24]:
from pyspark.sql.functions import col
# Selecting the label column from df and grouping them by label then count the each unique value in label in  descending ordr

data.select("label").groupBy("label").count().orderBy(col("count").desc()).show(25)

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
|            phf.|     4|
|           perl.|     3|
|            spy.|     2|
+----------------+------+



In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml import Pipeline
from pprint import pprint

# Drop the columns "protocol_type", "service", and "flag" from the DataFrame
# Keep only the numeric columns and cache the result for efficient reuse
numeric_only = data.drop("protocol_type", "service", "flag").cache()

# Create a VectorAssembler to assemble the feature vector from the numeric columns
# Set the input columns to be all columns except the last one
# Set the output column name as "featureVector"
assembler = VectorAssembler().setInputCols(numeric_only.columns[:-1]).setOutputCol("featureVector")

# Create a KMeans model for clustering
# Set the prediction column name as "cluster"
# Set the features column name as "featureVector"
kmeans = KMeans().setPredictionCol("cluster").setFeaturesCol("featureVector")

# Create a Pipeline to chain together the VectorAssembler and KMeans stages
pipeline = Pipeline().setStages([assembler, kmeans])

# Fit the Pipeline model to the numeric_only DataFrame
pipeline_model = pipeline.fit(numeric_only)

# Get the KMeans model from the fitted Pipeline model
kmeans_model = pipeline_model.stages[1]

# print cluster centers
pprint(kmeans_model.clusterCenters())

[array([4.79793956e+01, 1.62207883e+03, 8.68534183e+02, 4.45326100e-05,
       6.43293794e-03, 1.41694668e-05, 3.45168212e-02, 1.51815716e-04,
       1.48247035e-01, 1.02121372e-02, 1.11331525e-04, 3.64357718e-05,
       1.13517671e-02, 1.08295211e-03, 1.09307315e-04, 1.00805635e-03,
       0.00000000e+00, 0.00000000e+00, 1.38658354e-03, 3.32286248e+02,
       2.92907143e+02, 1.76685418e-01, 1.76607809e-01, 5.74330999e-02,
       5.77183920e-02, 7.91548844e-01, 2.09816404e-02, 2.89968625e-02,
       2.32470732e+02, 1.88666046e+02, 7.53781203e-01, 3.09056111e-02,
       6.01935529e-01, 6.68351484e-03, 1.76753957e-01, 1.76441622e-01,
       5.81176268e-02, 5.74111170e-02]),
 array([2.0000000e+00, 6.9337564e+08, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 1.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00

In [26]:
# Transform the numeric_only DataFrame using the fitted Pipeline model
with_cluster = pipeline_model.transform(numeric_only)

# Select the "cluster" and "label" columns from the transformed DataFrame
# Group the DataFrame by "cluster" and "label"
# Count the occurrences of each unique combination of "cluster" and "label"
# Order the result by "cluster" and the count of occurrences in descending order
# Show the top 25 rows of the result
with_cluster.select("cluster", "label").groupBy("cluster", "label").count().orderBy(col("cluster"), col("count").desc()).show(25)


+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|          smurf.|280790|
|      0|        neptune.|107201|
|      0|         normal.| 97278|
|      0|           back.|  2203|
|      0|          satan.|  1589|
|      0|        ipsweep.|  1247|
|      0|      portsweep.|  1039|
|      0|    warezclient.|  1020|
|      0|       teardrop.|   979|
|      0|            pod.|   264|
|      0|           nmap.|   231|
|      0|   guess_passwd.|    53|
|      0|buffer_overflow.|    30|
|      0|           land.|    21|
|      0|    warezmaster.|    20|
|      0|           imap.|    12|
|      0|        rootkit.|    10|
|      0|     loadmodule.|     9|
|      0|      ftp_write.|     8|
|      0|       multihop.|     7|
|      0|            phf.|     4|
|      0|           perl.|     3|
|      0|            spy.|     2|
|      1|      portsweep.|     1|
+-------+----------------+------+



In [27]:
from pyspark.sql import DataFrame
from random import randint

def clustering_score(input_data, k):
    # drop non-numeric columns
    input_numeric_only = input_data.drop("protocol_type", "service", "flag")
    
    # create a vctorassembler to assemble the feature vector from the numeric columns
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
    
    # create a KMeans model with a random seed, specified k
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setPredictionCol("cluster").setFeaturesCol("featureVector")
    
    # create a pipeline to chain together the vctorassembler and KMeans stages
    pipeline = Pipeline().setStages([assembler, kmeans])
    
    # fit the pipeline model to the input df
    pipeline_model = pipeline.fit(input_numeric_only)
    
    # get the KMeans model from the fitted pipeline model
    kmeans_model = pipeline_model.stages[-1]
    
    # get the training cost from KMeans modfel
    training_cost = kmeans_model.summary.trainingCost
    
    return training_cost

# iterate through a range of values for k
for k in list(range(20,100, 20)):
    print(clustering_score(numeric_only, k))

24257686995955.965
15536711568008.854
19215204119441.668
595801400002.5406


In [30]:
def clustering_score_1(input_data, k):
    input_numeric_only = input_data.drop("protocol_type", "service", "flag")
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).setTol(1.0e-5).\
        setPredictionCol("cluster").\
        setFeaturesCol("featureVector")
    pipeline = Pipeline().setStages([assembler, kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in list(range(20,101, 20)):
    print(k, clustering_score_1(numeric_only, k))

20 22967263578826.312
40 24051449729302.156
60 4217842680460.3354
80 11601400189802.18
100 10914450899300.459


In [18]:
from pyspark.ml.feature import StandardScaler

def clustering_score_2(input_data, k):
    input_numeric_only = input_data.drop("protocol_type", "service", "flag")
    
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
    
    # Create a standardscaler to scale the feature vector
    # StandardScaler standardizes features by removing the mean and scaling to unit variance
    scaler = StandardScaler().setInputCol("featureVector").setOutputCol("scaledFeatureVector").\
        setWithStd(True).setWithMean(False)
    
    kmeans = KMeans().\
        setSeed(randint(100,100000)).\
        setK(k).\
        setMaxIter(40).\
        setTol(1.0e-5).\
        setPredictionCol("cluster").\
        setFeaturesCol("scaledFeatureVector")
    
    # Create a Pipeline to chain together the VectorAssembler, StandardScaler, and KMeans stages
    pipeline = Pipeline().setStages([assembler, scaler, kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in list(range(300, 1101, 30)):
    print(k, clustering_score_2(numeric_only, k))


300 88978.02401692684
330 82721.35008461926
360 77102.15117534516
390 72495.57251859624
420 66867.30733985612
450 62903.26332547334
480 58422.70941652663
510 55438.201560245056
540 53443.773659448816
570 51363.73048668775
600 49083.723416677196
630 47455.25610319255
660 45640.43960431267
690 43419.99424008578
720 42411.61144291233
750 40927.25326573158
780 38535.83910908871
810 38056.34156665825
840 36322.18692424943
870 35696.49706375343
900 33995.724754888855
930 34043.309811621126
960 32377.997125947786
990 31089.609252061957
1020 30680.39374282738
1050 29470.31790763838
1080 29397.136154894517


In [32]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Define a function to create a pipeline for one-hot encoding of a categorical column
def one_hot_pipeline(input_col):
    # Create a StringIndexer to convert categorical values into indices
    indexer = StringIndexer().setInputCol(input_col).setOutputCol(input_col +"_indexed")
    
    # Create a OneHotEncoder to encode the indexed categorical values into a sparse vector representation
    encoder = OneHotEncoder().setInputCol(input_col + "_indexed").setOutputCol(input_col + "_vec")
    
    # Create a Pipeline to chain together the StringIndexer and OneHotEncoder stages
    pipeline = Pipeline().setStages([indexer, encoder])
    
    # Return the pipeline and the output column name of the OneHotEncoder stage
    return pipeline, input_col + "_vec"


In [None]:
from math import log

# Define a function to calculate the entropy given counts of different values
def entropy(counts):
    # Filter out counts with zero values
    values = [c for c in counts if (c > 0)]
    
    # Calculate the total count
    n = sum(values)
    
    # Calculate the probability of each value
    p = [v/n for v in values]
    
    # Calculate the entropy using the formula: -sum(p_i * log(p_i))
    return sum([-1*(p_v) * log(p_v) for p_v in p])

# The following code seems to be incomplete or out of place
# It appears to be a part of the previous conversation related to one-hot encoding using StringIndexer and OneHotEncoder
# I'm not sure where you intended this code to be placed or how it relates to the previous code snippet
# Please provide additional context or clarification if needed
indexer = StringIndexer().setInputCol(input_col).\
    setOutputCol(input_col +"_indexed")
encoder = OneHotEncoder().setInputCol(input_col + "_indexed").setOutputCol(input_col + "_vec")
pipeline = Pipeline().setStages([indexer, encoder])
return pipeline, input_col + "_vec"

In [35]:
from pyspark.sql import functions as fun
from pyspark.sql import Window

# Transform the DataFrame using the fitted Pipeline model and select relevant columns
cluster_label = pipeline_model.transform(data).select("cluster", "label")

# Group the DataFrame by "cluster" and "label", counting the occurrences of each combination and ordering by "cluster"
df = cluster_label.groupBy("cluster", "label").count().orderBy("cluster")

# Define a window specification partitioned by "cluster"
w = Window.partitionBy("cluster")

# Calculate the probability of each label within each cluster
p_col = df['count'] / fun.sum(df['count']).over(w)

# Add a column "p_col" representing the calculated probabilities
with_p_col = df.withColumn("p_col", p_col)

# calculate the entropy for each cluster
result = with_p_col.groupBy("cluster").agg((-fun.sum(col("p_col") * fun.log2(col("p_col")))).\
                                           alias("entropy"),fun.sum(col("count")).alias("cluster_size"))

# calculate the weighted cluster entropy for each cluster
result = result.withColumn('weightedClusterEntropy',fun.col('entropy') * fun.col('cluster_size'))

# aggregate the weighted cluster entropy across all clusters
weighted_cluster_entropy_avg = result.agg(fun.sum(col('weightedClusterEntropy'))).collect()

# calculate the average weighted cluster entropy over all clusters and total records
average_weighted_cluster_entropy = weighted_cluster_entropy_avg[0][0] / data.count()

print(average_weighted_cluster_entropy)

1.557605039016584

In [36]:
def fit_pipeline_4(data, k):
    # Apply one-hot encoding pipelines to "protocol_type", "service", and "flag" columns
    (proto_type_pipeline, proto_type_vec_col) = one_hot_pipeline("protocol_type")
    (service_pipeline, service_vec_col) = one_hot_pipeline("service")
    (flag_pipeline, flag_vec_col) = one_hot_pipeline("flag")
    
    # Determine columns for vector assembly excluding "label", "protocol_type", "service", "flag"
    assemble_cols = set(data.columns) - {"label", "protocol_type", "service", "flag"} | {proto_type_vec_col, service_vec_col, flag_vec_col}
    
    # Create a VectorAssembler to assemble the feature vector
    assembler = VectorAssembler(inputCols=list(assemble_cols), outputCol="featureVector")
    
    # Create a StandardScaler for feature scaling
    scaler = StandardScaler(inputCol="featureVector", outputCol="scaledFeatureVector", withStd=True, withMean=False)  
    kmeans = KMeans(seed=randint(100, 100000), k=k, predictionCol="cluster", featuresCol="scaledFeatureVector", maxIter=40, tol=1.0e-5)
    # Create a Pipeline to chain together the preprocessing and clustering stages
    pipeline = Pipeline(stages=[proto_type_pipeline, service_pipeline, flag_pipeline, assembler, scaler, kmeans])
    
    # Fit the Pipeline model to the data
    return pipeline.fit(data)


In [37]:
def clustering_score_4(input_data, k):
    # Fit the pipeline to the input data to obtain the pipeline model
    pipeline_model = fit_pipeline_4(input_data, k)
    
    # Transform the input data using the fitted pipeline model and select relevant columns
    cluster_label = pipeline_model.transform(input_data).select("cluster", "label")
    
    # Group the DataFrame by "cluster" and "label", counting the occurrences of each combination and ordering by "cluster"
    df = cluster_label.groupBy("cluster", "label").count().orderBy("cluster")
    
    # Define a window specification partitioned by "cluster"
    w = Window.partitionBy("cluster")
    
    # Calculate the probability of each label within each cluster
    p_col = df['count'] / fun.sum(df['count']).over(w)
    
    # Add a column "p_col" representing the calculated probabilities
    with_p_col = df.withColumn("p_col", p_col)
    
    # Calculate the entropy for each cluster
    result = with_p_col.groupBy("cluster").agg(-fun.sum(col("p_col") * fun.log2(col("p_col"))).alias("entropy"),
                                               fun.sum(col("count")).alias("cluster_size"))
    
    # Calculate the weighted cluster entropy for each cluster
    result = result.withColumn('weightedClusterEntropy', col('entropy') * col('cluster_size'))
    
    # Aggregate the weighted cluster entropy across all clusters
    weighted_cluster_entropy_avg = result.agg(fun.sum(col('weightedClusterEntropy'))).collect()
    
    # Calculate the average weighted cluster entropy over all clusters and total records
    return weighted_cluster_entropy_avg[0][0] / input_data.count()


In [38]:
# Fit the pipeline to the data with k=180
pipeline_model = fit_pipeline_4(data, 180)

# Transform the data using the fitted pipeline model
transformed_data = pipeline_model.transform(data)

# Group the transformed data by "cluster" and "label", count the occurrences of each combination, and order by "cluster" and "label"
count_by_cluster_label = transformed_data.select("cluster", "label").groupBy("cluster", "label").count().orderBy("cluster", "label")

# Show result
count_by_cluster_label.show()


+-------+----------+------+
|cluster|     label| count|
+-------+----------+------+
|      0|  neptune.| 82132|
|      0|portsweep.|    15|
|      1|  ipsweep.|     4|
|      1|     nmap.|     1|
|      1|   normal.|   337|
|      1|      pod.|     5|
|      1|portsweep.|     1|
|      1|    smurf.|280787|
|      2|  neptune.|   102|
|      2|portsweep.|     1|
|      2|    satan.|     1|
|      3|  neptune.|    98|
|      4|  neptune.|    79|
|      4|portsweep.|     1|
|      4|    satan.|     1|
|      5|  neptune.|    98|
|      5|portsweep.|     1|
|      6|  neptune.|    92|
|      6|    satan.|     1|
|      7|  ipsweep.|     1|
+-------+----------+------+
only showing top 20 rows

