In [None]:
#Anomaly detection with K-means Clustering
import pyspark
from pyspark.sql import SparkSession


In [None]:
ss = SparkSession.builder.appName('k means clustering').getOrCreate()
data_partial = ss.read.option('inferSchema', True).option('header', False).csv('kddcup.data_10_percent_corrected')
col_names = [ "duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised",
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is_guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate",
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label"]

data = data_partial.toDF(*col_names)
data.show()


+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|dst_hos

In [None]:
print("Dataframe contains " + str(data.count()) + " entries")
print('The schema of the database is')
data.printSchema()

Dataframe contains 494021 entries
The schema of the database is
root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- num_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login

In [None]:
from pyspark.sql.functions import col
data.select('label').groupBy('label').count().show()

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|            pod.|   264|
|   guess_passwd.|    53|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|     loadmodule.|     9|
|buffer_overflow.|    30|
|       teardrop.|   979|
|           perl.|     3|
|        neptune.|107201|
|         normal.| 97278|
|           imap.|    12|
|           nmap.|   231|
|          satan.|  1589|
|           land.|    21|
|      ftp_write.|     8|
|            phf.|     4|
|       multihop.|     7|
|           back.|  2203|
|    warezmaster.|    20|
+----------------+------+
only showing top 20 rows



In [None]:
#1. Implement a PySpark script to handle any missing values and scale numerical features.
df = data.dropna()
print("Dataframe contains " + str(df.count()) + " entries")
#no null entries were present

Dataframe contains 494021 entries


In [None]:
#2. Develop a PySpark script that uses the K-means algorithm to cluster data points.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml import Pipeline

#drop string values that cannot be used for KMeans clustering
numeric_only = df.drop("protocol_type", "service", "flag").cache()

#use all colmns except label to fit the kmeans model
assembler = VectorAssembler().setInputCols(numeric_only.columns[:-1]).setOutputCol("featureVector")
kmeans = KMeans().setPredictionCol("cluster").setFeaturesCol("featureVector")

pipeline = Pipeline().setStages([assembler, kmeans])
pipeline_model = pipeline.fit(numeric_only)
kmeans_model = pipeline_model.stages[1]
print(kmeans_model.clusterCenters())

[array([4.79793956e+01, 1.62207883e+03, 8.68534183e+02, 4.45326100e-05,
       6.43293794e-03, 1.41694668e-05, 3.45168212e-02, 1.51815716e-04,
       1.48247035e-01, 1.02121372e-02, 1.11331525e-04, 3.64357718e-05,
       1.13517671e-02, 1.08295211e-03, 1.09307315e-04, 1.00805635e-03,
       0.00000000e+00, 0.00000000e+00, 1.38658354e-03, 3.32286248e+02,
       2.92907143e+02, 1.76685418e-01, 1.76607809e-01, 5.74330999e-02,
       5.77183920e-02, 7.91548844e-01, 2.09816404e-02, 2.89968625e-02,
       2.32470732e+02, 1.88666046e+02, 7.53781203e-01, 3.09056111e-02,
       6.01935529e-01, 6.68351484e-03, 1.76753957e-01, 1.76441622e-01,
       5.81176268e-02, 5.74111170e-02]), array([2.0000000e+00, 6.9337564e+08, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 1.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00,

In [None]:
#without normalization, fitting data
from pyspark.sql import DataFrame
from random import randint

def clustering_score(input_data, k):
    input_numeric_only = input_data.drop("protocol_type", "service", "flag")
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).setTol(1.0e-5).setPredictionCol("cluster").setFeaturesCol("featureVector")
    pipeline = Pipeline().setStages([assembler, kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in list(range(20,101, 20)):
    print(k, clustering_score(numeric_only, k))

20 19217321125975.074
40 16863401436310.416
60 15585437610061.24
80 8336330447610.666
100 20792203780086.074


In [None]:
from pyspark.ml.feature import StandardScaler

def clustering_score_2(input_data, k):
    input_numeric_only = input_data.drop("protocol_type", "service", "flag")
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
    scaler = StandardScaler().setInputCol("featureVector").setOutputCol("scaledFeatureVector").setWithStd(True).setWithMean(False)
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).setTol(1.0e-5).setPredictionCol("cluster").setFeaturesCol("scaledFeatureVector")
    pipeline = Pipeline().setStages([assembler, scaler, kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in list(range(60, 271, 30)):
    print(k, clustering_score_2(numeric_only, k))


60 582762.7552196753
90 351061.45690443396
120 227514.07598669516
150 182726.99736269115
180 163193.50072085622
210 129149.55164067016
240 115569.90127791351
270 98602.2076593401


In [None]:
#encode categorical variables Use flag, protocol type and service)
from pyspark.ml.feature import OneHotEncoder, StringIndexer

def one_hot_pipeline(input_col):
    indexer = StringIndexer().setInputCol(input_col).setOutputCol(input_col +"_indexed")
    encoder = OneHotEncoder().setInputCol(input_col + "_indexed").setOutputCol(input_col + "_vec")
    pipeline = Pipeline().setStages([indexer, encoder])
    return pipeline, input_col + "_vec"

In [None]:
#add encoding function to clustering score function and choose k
def clustering_score_3(input_data, k):
    proto_type_pipeline, proto_type_vec_col = one_hot_pipeline("protocol_type")
    service_pipeline, service_vec_col = one_hot_pipeline("service")
    flag_pipeline, flag_vec_col = one_hot_pipeline("flag")
    assemble_cols = set(input_data.columns) - {"label", "protocol_type", "service", "flag"} | {proto_type_vec_col, service_vec_col, flag_vec_col}
    assembler = VectorAssembler().setInputCols(list(assemble_cols)).setOutputCol("featureVector")
    scaler = StandardScaler().setInputCol("featureVector").setOutputCol("scaledFeatureVector").setWithStd(True).setWithMean(False)
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).setTol(1.0e-5).setPredictionCol("cluster").setFeaturesCol("scaledFeatureVector")
    pipeline = Pipeline().setStages([proto_type_pipeline, service_pipeline, flag_pipeline, assembler, scaler, kmeans])
    pipeline_model = pipeline.fit(input_data)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in list(range(60, 271, 30)):
    print(k, clustering_score_3(data, k))


60 17088399.538600437
90 5010084.76856782
120 1451693.8729224904
150 1006816.6599392737
180 894501.8462999687
210 597317.0182113763
240 535500.66037835
270 407747.9736698347


In [None]:
#3. Develop a PySpark script that labels data points as anomalies based on their cluster assignments.
from math import log
def entropy(counts):
    values = [c for c in counts if (c > 0)]
    n = sum(values)
    p = [v/n for v in values]
    return sum([-1*(p_v) * log(p_v) for p_v in p])

In [None]:
from pyspark.sql import functions as fun
from pyspark.sql import Window

cluster_label = pipeline_model.transform(data).select("cluster", "label")
df = cluster_label.groupBy("cluster", "label").count().orderBy("cluster")
w = Window.partitionBy("cluster")
p_col = df['count'] / fun.sum(df['count']).over(w)
with_p_col = df.withColumn("p_col", p_col)
result = with_p_col.groupBy("cluster").agg((-fun.sum(col("p_col") * fun.log2(col("p_col")))).alias("entropy"),fun.sum(col("count")).alias("cluster_size"))
result = result.withColumn('weightedClusterEntropy',fun.col('entropy') * fun.col('cluster_size'))
weighted_cluster_entropy_avg = result.agg(fun.sum(col('weightedClusterEntropy'))).collect()
weighted_cluster_entropy_avg[0][0]/data.count()

1.557605039016584

In [None]:
def fit_pipeline_4(data, k):
    (proto_type_pipeline, proto_type_vec_col) = one_hot_pipeline("protocol_type")
    (service_pipeline, service_vec_col) = one_hot_pipeline("service")
    (flag_pipeline, flag_vec_col) = one_hot_pipeline("flag")
    assemble_cols = set(data.columns) - {"label", "protocol_type", "service","flag"} | {proto_type_vec_col, service_vec_col, flag_vec_col}
    assembler = VectorAssembler(inputCols=list(assemble_cols),outputCol="featureVector")
    scaler = StandardScaler(inputCol="featureVector", outputCol="scaledFeatureVector", withStd=True, withMean=False)
    kmeans = KMeans(seed=randint(100, 100000), k=k, predictionCol="cluster",featuresCol="scaledFeatureVector", maxIter=40, tol=1.0e-5)
    pipeline = Pipeline(stages=[proto_type_pipeline, service_pipeline, flag_pipeline, assembler, scaler, kmeans])
    return pipeline.fit(data)

def clustering_score_4(input_data, k):
    pipeline_model = fit_pipeline_4(input_data, k)
    cluster_label = pipeline_model.transform(input_data).select("cluster","label")
    df = cluster_label.groupBy("cluster", "label").count().orderBy("cluster")
    w = Window.partitionBy("cluster")
    p_col = df['count'] / fun.sum(df['count']).over(w)
    with_p_col = df.withColumn("p_col", p_col)
    result = with_p_col.groupBy("cluster").agg(-fun.sum(col("p_col") * fun.log2(col("p_col"))).alias("entropy"),fun.sum(col("count")).alias("cluster_size"))
    result = result.withColumn('weightedClusterEntropy', col('entropy') * col('cluster_size'))
    weighted_cluster_entropy_avg = result.agg(fun.sum(col('weightedClusterEntropy'))).collect()
    return weighted_cluster_entropy_avg[0][0] / input_data.count()

In [None]:
pipeline_model = fit_pipeline_4(data, 180)
count_by_cluster_label = pipeline_model.transform(data).\
select("cluster", "label").groupBy("cluster", "label").count().orderBy("cluster", "label")
count_by_cluster_label.show()

+-------+------------+-----+
|cluster|       label|count|
+-------+------------+-----+
|      0|    ipsweep.|    4|
|      0|       nmap.|    1|
|      0|     normal.|  343|
|      0|  portsweep.|    1|
|      0|      smurf.|12651|
|      1|    neptune.|  106|
|      1|  portsweep.|    1|
|      2|     normal.|    3|
|      2|warezmaster.|    1|
|      3|    ipsweep.|    1|
|      3|    neptune.|   97|
|      4|    neptune.|   89|
|      4|  portsweep.|    2|
|      5|     normal.|    8|
|      5|      satan.|    1|
|      6|    neptune.|  101|
|      7|    neptune.|  101|
|      8|    neptune.|45534|
|      9|    neptune.|   92|
|      9|  portsweep.|    2|
+-------+------------+-----+
only showing top 20 rows

