In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()



#### We define our own column names. Then, we show all possible labels in decreasing order of their count

In [18]:
data_without_header = spark.read.option("inferSchema","True").option("header","False").csv("data/kddcup.data_10_percent_corrected")

column_names = ["duration","protocol_type","service","flag","src_bytes","dst_bytes","land","wrong_fragment","urgent",
                "hot","num_failed_logins","logged_in","num_compromised","root_shell","su_attempted","num_root",
                "num_file_creations","num_shells","num_access_files","num_outbound_cmds","is_host_login","is_guest_login",
                "count","srv_count","serror_rate","srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
                "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count","dst_host_same_srv_rate",
                "dst_host_diff_srv_rate","dst_host_same_src_port_rate","dst_host_srv_diff_host_rate","dst_host_serror_rate",
                "dst_host_srv_serror_rate","dst_host_rerror_rate","dst_host_srv_rerror_rate","label"]

data = data_without_header.toDF(*column_names)

from pyspark.sql.functions import col
data.select("label").groupBy("label").count().orderBy(col("count").desc()).show()

+----------------+------+
|           label| count|
+----------------+------+
|          smurf.|280790|
|        neptune.|107201|
|         normal.| 97278|
|           back.|  2203|
|          satan.|  1589|
|        ipsweep.|  1247|
|      portsweep.|  1040|
|    warezclient.|  1020|
|       teardrop.|   979|
|            pod.|   264|
|           nmap.|   231|
|   guess_passwd.|    53|
|buffer_overflow.|    30|
|           land.|    21|
|    warezmaster.|    20|
|           imap.|    12|
|        rootkit.|    10|
|     loadmodule.|     9|
|      ftp_write.|     8|
|       multihop.|     7|
+----------------+------+
only showing top 20 rows



#### Converting to Vector which is acceptable as an input for the KMeans Model using Vector Assembler. It is then fit to the KMeans model. 

In [23]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans,KMeansModel
from pyspark.ml import Pipeline

numeric_only = data.drop("protocol_type","service","flag").cache()

assembler = VectorAssembler().setInputCols(numeric_only.columns[:-1]).setOutputCol("featureVector")

kmeans = KMeans().setPredictionCol("cluster").setFeaturesCol("featureVector")

pipeline = Pipeline().setStages([assembler,kmeans])
pipeline_model = pipeline.fit(numeric_only)
print(pipeline_model)
kmeans_model = pipeline_model.stages[1]

from pprint import pprint
pprint(kmeans_model.clusterCenters()) #We see that there are 2 cluster centers

PipelineModel_604d930b5970
[array([4.79793956e+01, 1.62207883e+03, 8.68534183e+02, 4.45326100e-05,
       6.43293794e-03, 1.41694668e-05, 3.45168212e-02, 1.51815716e-04,
       1.48247035e-01, 1.02121372e-02, 1.11331525e-04, 3.64357718e-05,
       1.13517671e-02, 1.08295211e-03, 1.09307315e-04, 1.00805635e-03,
       0.00000000e+00, 0.00000000e+00, 1.38658354e-03, 3.32286248e+02,
       2.92907143e+02, 1.76685418e-01, 1.76607809e-01, 5.74330999e-02,
       5.77183920e-02, 7.91548844e-01, 2.09816404e-02, 2.89968625e-02,
       2.32470732e+02, 1.88666046e+02, 7.53781203e-01, 3.09056111e-02,
       6.01935529e-01, 6.68351484e-03, 1.76753957e-01, 1.76441622e-01,
       5.81176268e-02, 5.74111170e-02]),
 array([2.0000000e+00, 6.9337564e+08, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 1.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0

##### Checking which cluster each label belongs to

In [21]:
with_cluster = pipeline_model.transform(numeric_only)

with_cluster.select("cluster","label").groupBy("cluster","label").count().\
            orderBy(col("cluster"),col("count").desc()).show(25)

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|          smurf.|280790|
|      0|        neptune.|107201|
|      0|         normal.| 97278|
|      0|           back.|  2203|
|      0|          satan.|  1589|
|      0|        ipsweep.|  1247|
|      0|      portsweep.|  1039|
|      0|    warezclient.|  1020|
|      0|       teardrop.|   979|
|      0|            pod.|   264|
|      0|           nmap.|   231|
|      0|   guess_passwd.|    53|
|      0|buffer_overflow.|    30|
|      0|           land.|    21|
|      0|    warezmaster.|    20|
|      0|           imap.|    12|
|      0|        rootkit.|    10|
|      0|     loadmodule.|     9|
|      0|      ftp_write.|     8|
|      0|       multihop.|     7|
|      0|            phf.|     4|
|      0|           perl.|     3|
|      0|            spy.|     2|
|      1|      portsweep.|     1|
+-------+----------------+------+



#### Choosing a good k value by checking the training cost with different values of k

In [34]:
from pyspark.sql import DataFrame
from random import randint

def clustering_score(input_data,k):
    input_numeric_only = input_data.drop("protocol_type","service","flag")
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setPredictionCol("cluster").setFeaturesCol("featureVector")
    pipeline = Pipeline().setStages([assembler,kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in list(range(20,100,20)):
    print(clustering_score(numeric_only,k))

34526681198781.35
29665347132096.52
7464324642470.006
5140760167652.76


In [35]:
def clustering_score1(input_data,k):
    input_numeric_only = input_data.drop("protocol_type","service","flag")
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).setTol(1.0e-5).\
                setPredictionCol("cluster").setFeaturesCol("featureVector")
    pipeline = Pipeline().setStages([assembler,kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in list(range(20,101,20)):
    print(k, clustering_score1(numeric_only,k))

20 34526683484566.53
40 13451819211364.23
60 15431179848879.225
80 1010315645827.3057
100 6585306634183.479


#### Feature Normalization - Scale the output vector using Standard Scaler. We observe a lower training cost after the Scaling

In [41]:
from pyspark.ml.feature import StandardScaler


def clustering_score2(input_data,k):
    input_numeric_only = input_data.drop("protocol_type","service","flag")
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
    scaler = StandardScaler().setInputCol("featureVector").setOutputCol("scaledFeatureVector").\
                setWithStd(True).setWithMean(False)
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).setTol(1.0e-5).\
                setPredictionCol("cluster").setFeaturesCol("scaledFeatureVector")
    pipeline = Pipeline().setStages([assembler,scaler,kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in range(60,271,30):
    print(k,clustering_score2(numeric_only,k))

60 590415.1909242754
90 323029.990531517
120 239660.47827700234
150 182955.6791634747
180 149045.53049408298
210 127174.45940500751
240 112796.81047590676
270 103331.46127469104


#### Handling Categorical Features using One-Hot Encoding. 

In [46]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

def one_hot_pipeline(input_col):
    indexer = StringIndexer().setInputCol(input_col).setOutputCol(input_col+"_indexed")
    encoder = OneHotEncoder().setInputCol(input_col+"_indexed").setOutputCol(input_col+"_vec")
    pipeline = Pipeline().setStages([indexer,encoder])
    return pipeline,input_col+"_vec"

def clustering_score3(input_data,k):
    proto_type_pipeline,proto_type_vec_col = one_hot_pipeline("protocol_type")
    service_pipeline,service_vec_col = one_hot_pipeline("service")
    flag_pipeline,flag_vec_col = one_hot_pipeline("flag")
    assemble_cols = set(input_data.columns) - {"label","protocol_type","service","flag"} | \
                    {proto_type_vec_col,service_vec_col,flag_vec_col}
    assembler = VectorAssembler().setInputCols(list(assemble_cols)).setOutputCol("featureVector")
    scaler = StandardScaler().setInputCol("featureVector").setOutputCol("scaledFeatureVector").\
                setWithStd(True).setWithMean(False)
    kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).setTol(1.0e-5).\
                setPredictionCol("cluster").setFeaturesCol("scaledFeatureVector")
    pipeline = Pipeline().setStages([proto_type_pipeline,service_pipeline,flag_pipeline,assembler,scaler,kmeans])
    pipeline_model = pipeline.fit(input_data)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in range(60,271,30):
    print(k,clustering_score3(data,k))

60 16871919.58534714
90 6957457.837591373
120 1497204.954539814
150 1098811.7808264522
180 766537.3279262196
210 586282.7890263062
240 477041.19137717434
270 443718.55476593855


#### Using labels with entropy. The dataframe now contains - cluster, label, count and probability of that label occuring 

In [57]:
from math import log
from pyspark.sql import functions as f
from pyspark.sql import Window

def entropy(counts):
    values = [c for c in counts if c>0]
    n = sum(values)
    p = [v/n for v in values]
    return sum([-1*(p_v) * log(p_v) for p_v in p])

cluster_label = pipeline_model.transform(data).select("cluster","label")
df = cluster_label.groupBy("cluster","label").count().orderBy("cluster")
w = Window.partitionBy("cluster")
p_col = df["count"]/f.sum(df["count"]).over(w)
with_p_col = df.withColumn("p_col",p_col)
with_p_col.show()

result = with_p_col.groupBy("cluster").agg((-f.sum(col("p_col")*f.log2(col("p_col")))).alias("entropy"),
                f.sum(col("count")).alias("cluster_size"))
result = result.withColumn("weightedClusterEntropy",f.col("entropy")*f.col("cluster_size"))
weighted_cluster_entropy_avg = result.agg(f.sum(col("weightedClusterEntropy"))).collect()
weighted_cluster_entropy_avg[0][0]/data.count()

+-------+----------------+------+--------------------+
|cluster|           label| count|               p_col|
+-------+----------------+------+--------------------+
|      0|        ipsweep.|  1247|0.002524189304076758|
|      0|       teardrop.|   979|0.001981701145702603|
|      0|buffer_overflow.|    30|6.072628638516659E-5|
|      0|          smurf.|280790|  0.5683777984696976|
|      0|        neptune.|107201| 0.21699728755920814|
|      0|   guess_passwd.|    53|1.072831059471276...|
|      0|         normal.| 97278| 0.19691105623254118|
|      0|           perl.|     3| 6.07262863851666E-6|
|      0|     loadmodule.|     9|1.821788591554997...|
|      0|      portsweep.|  1039|0.002103153718472...|
|      0|            pod.|   264| 5.34391320189466E-4|
|      0|       multihop.|     7|1.416946682320553...|
|      0|          satan.|  1589|0.003216468968867657|
|      0|           nmap.|   231|4.675924051657828E-4|
|      0|           back.|  2203|  0.0044593336302174|
|      0| 

1.557605039016584

### Final Pipeline

In [67]:
def fit_pipeline_4(data,k):
    proto_type_pipeline,proto_type_vec_col = one_hot_pipeline("protocol_type")
    service_pipeline,service_vec_col = one_hot_pipeline("service")
    flag_pipeline,flag_vec_col = one_hot_pipeline("flag")
    assemble_cols = set(data.columns) - {"label","protocol_type","service","flag"} | \
                    {proto_type_vec_col,service_vec_col,flag_vec_col}
    assembler = VectorAssembler(inputCols=list(assemble_cols),outputCol="featureVector")
    scaler = StandardScaler(inputCol="featureVector",outputCol="scaledFeatureVector")
    kmeans = KMeans(seed=randint(100,100000),k=k,predictionCol="cluster",featuresCol="scaledFeatureVector",maxIter=40,tol=1.0e-5)
    pipeline = Pipeline(stages=[proto_type_pipeline,service_pipeline,flag_pipeline,assembler,scaler,kmeans])
    return pipeline.fit(data)

# def clusering_score_4(input_data,k): 
#     pipeline_model = fit_pipeline_4(input_data,k)
#     cluster_label = transform(input_data).select("cluster","label")
#     df = cluster_label.groupBy("cluster","label").count().orderBy("cluster")
#     w = Window.partitionBy("cluster")
#     p_col = df["count"]/f.sum(df["count"]).over(w)
#     with_p_col = df.withColumn("p_col",p_col)
#     with_p_col.show()

#     result = with_p_col.groupBy("cluster").agg((-f.sum(col("p_col")*f.log2(col("p_col")))).alias("entropy"),
#                     f.sum(col("count")).alias("cluster_size"))
#     result = result.withColumn("weightedClusterEntropy",f.col("entropy")*f.col("cluster_size"))
#     weighted_cluster_entropy_avg = result.agg(f.sum(col("weightedClusterEntropy"))).collect()
#     return  weighted_cluster_entropy_avg[0][0]/data.count()

pipeline_model = fit_pipeline_4(data,180)
count_by_cluster_label = pipeline_model.transform(data).select("cluster","label").groupBy("cluster","label").count().orderBy("cluster","label")
count_by_cluster_label.show()

+-------+----------+------+
|cluster|     label| count|
+-------+----------+------+
|      0|  neptune.| 36502|
|      0|portsweep.|    10|
|      1|  ipsweep.|     4|
|      1|     nmap.|     1|
|      1|   normal.|   337|
|      1|portsweep.|     1|
|      1|    smurf.|280787|
|      2|  ipsweep.|     3|
|      2|  neptune.|   112|
|      2|portsweep.|     1|
|      2|    satan.|     1|
|      3|  neptune.|    86|
|      3|    satan.|     1|
|      4|  neptune.|    89|
|      5|  neptune.|   106|
|      6|  neptune.|    99|
|      7|  neptune.|    24|
|      7|portsweep.|     3|
|      8|  neptune.|   101|
|      9|  neptune.|    82|
+-------+----------+------+
only showing top 20 rows

