In [2]:
import pyspark
import os
import sys
from pyspark import SparkContext
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_5').getOrCreate()



In [12]:
data_without_header = spark.read.option("inferSchema", True).option("header", False).csv("kddcup.data_10_percent_corrected")

column_names = [ "duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised",
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is_guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate",
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label"]

data = data_without_header.toDF(*column_names)
# len(column_names

In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml import Pipeline
numeric_only = data.drop("protocol_type", "service", "flag").cache()

assembler = VectorAssembler().setInputCols(numeric_only.columns[:-1]).\
setOutputCol("featureVector")

kmeans = KMeans().setPredictionCol("cluster").setFeaturesCol("featureVector")

pipeline = Pipeline().setStages([assembler, kmeans])
pipeline_model = pipeline.fit(numeric_only)
kmeans_model = pipeline_model.stages[1]

from pprint import pprint
pprint(kmeans_model.clusterCenters())

[array([4.79793956e+01, 1.62207883e+03, 8.68534183e+02, 4.45326100e-05,
       6.43293794e-03, 1.41694668e-05, 3.45168212e-02, 1.51815716e-04,
       1.48247035e-01, 1.02121372e-02, 1.11331525e-04, 3.64357718e-05,
       1.13517671e-02, 1.08295211e-03, 1.09307315e-04, 1.00805635e-03,
       0.00000000e+00, 0.00000000e+00, 1.38658354e-03, 3.32286248e+02,
       2.92907143e+02, 1.76685418e-01, 1.76607809e-01, 5.74330999e-02,
       5.77183920e-02, 7.91548844e-01, 2.09816404e-02, 2.89968625e-02,
       2.32470732e+02, 1.88666046e+02, 7.53781203e-01, 3.09056111e-02,
       6.01935529e-01, 6.68351484e-03, 1.76753957e-01, 1.76441622e-01,
       5.81176268e-02, 5.74111170e-02]),
 array([2.0000000e+00, 6.9337564e+08, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 1.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00, 0.0000000e+00,
       0.0000000e+00, 0.0000000e+00, 0.0000000e+00

In [17]:
from pyspark.sql.functions import col
with_cluster = pipeline_model.transform(numeric_only)

with_cluster.select("cluster", "label").groupBy("cluster", "label").count().orderBy(col("cluster"), col("count").desc()).show(25)

+-------+----------------+------+
|cluster|           label| count|
+-------+----------------+------+
|      0|          smurf.|280790|
|      0|        neptune.|107201|
|      0|         normal.| 97278|
|      0|           back.|  2203|
|      0|          satan.|  1589|
|      0|        ipsweep.|  1247|
|      0|      portsweep.|  1039|
|      0|    warezclient.|  1020|
|      0|       teardrop.|   979|
|      0|            pod.|   264|
|      0|           nmap.|   231|
|      0|   guess_passwd.|    53|
|      0|buffer_overflow.|    30|
|      0|           land.|    21|
|      0|    warezmaster.|    20|
|      0|           imap.|    12|
|      0|        rootkit.|    10|
|      0|     loadmodule.|     9|
|      0|      ftp_write.|     8|
|      0|       multihop.|     7|
|      0|            phf.|     4|
|      0|           perl.|     3|
|      0|            spy.|     2|
|      1|      portsweep.|     1|
+-------+----------------+------+



In [34]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.sql import DataFrame
from random import randint
def one_hot_pipeline(input_col):
    indexer = StringIndexer().setInputCol(input_col).setOutputCol(input_col +"_indexed")
    encoder = OneHotEncoder().setInputCol(input_col + "_indexed").setOutputCol(input_col + "_vec")
    pipeline = Pipeline().setStages([indexer, encoder])
    return pipeline, input_col + "_vec"

In [35]:
from pyspark.sql import functions as fun
from pyspark.sql import Window
cluster_label = pipeline_model.transform(data).select("cluster", "label")
df = cluster_label.groupBy("cluster", "label").count().orderBy("cluster")

w = Window.partitionBy("cluster")
p_col = df['count'] / fun.sum(df['count']).over(w)
with_p_col = df.withColumn("p_col", p_col)
result = with_p_col.groupBy("cluster").\
agg((-fun.sum(col("p_col") * fun.log2(col("p_col")))).alias("entropy"),
fun.sum(col("count")).alias("cluster_size"))
result = result.withColumn('weightedClusterEntropy',fun.col('entropy') * fun.col('cluster_size'))
weighted_cluster_entropy_avg = result.agg(fun.sum(col('weightedClusterEntropy'))).collect()
weighted_cluster_entropy_avg[0][0]/data.count()

0.024287268880425223

In [36]:
def fit_pipeline_4(data, k):
    (proto_type_pipeline, proto_type_vec_col) = one_hot_pipeline("protocol_type")
    (service_pipeline, service_vec_col) = one_hot_pipeline("service")
    (flag_pipeline, flag_vec_col) = one_hot_pipeline("flag")
    assemble_cols = set(data.columns) - {"label", "protocol_type", "service","flag"} | {proto_type_vec_col, service_vec_col, flag_vec_col}
    assembler = VectorAssembler(inputCols=list(assemble_cols),outputCol="featureVector")
    scaler = StandardScaler(inputCol="featureVector",outputCol="scaledFeatureVector", withStd=True, withMean=False)
    
    kmeans = KMeans(seed=randint(100, 100000), k=k, predictionCol="cluster",featuresCol="scaledFeatureVector", maxIter=40, tol=1.0e-5)
    pipeline = Pipeline(stages=[proto_type_pipeline, service_pipeline,flag_pipeline, assembler, scaler, kmeans])
    return pipeline.fit(data)
def clustering_score_4(input_data, k):
    pipeline_model = fit_pipeline_4(input_data, k)
    cluster_label = pipeline_model.transform(input_data).select("cluster","label")
    df = cluster_label.groupBy("cluster", "label").count().orderBy("cluster")
    w = Window.partitionBy("cluster")
    p_col = df['count'] / fun.sum(df['count']).over(w)
    with_p_col = df.withColumn("p_col", p_col)
    result = with_p_col.groupBy("cluster").agg(-fun.sum(col("p_col") * fun.log2(col("p_col"))).alias("entropy"),
    fun.sum(col("count")).alias("cluster_size"))
    result = result.withColumn('weightedClusterEntropy', col('entropy') *col('cluster_size'))
    weighted_cluster_entropy_avg = result.agg(fun.sum(col('weightedClusterEntropy'))).collect()
    return weighted_cluster_entropy_avg[0][0] / input_data.count()


In [31]:
pipeline_model = fit_pipeline_4(data, 180)
count_by_cluster_label = pipeline_model.transform(data).\
select("cluster", "label").\
groupBy("cluster", "label").\
count().orderBy("cluster", "label")
count_by_cluster_label.show()

+-------+-----------+------+
|cluster|      label| count|
+-------+-----------+------+
|      0|   neptune.| 36502|
|      0| portsweep.|    10|
|      1|   ipsweep.|     4|
|      1|      nmap.|     1|
|      1|    normal.|   337|
|      1| portsweep.|     1|
|      1|     smurf.|280787|
|      2|  teardrop.|   711|
|      3|    normal.|   537|
|      3|     satan.|     1|
|      4|   ipsweep.|     2|
|      4|   neptune.|   103|
|      4|    normal.|    51|
|      5|     satan.|  1067|
|      6|   neptune.|   101|
|      6| portsweep.|     4|
|      6|     satan.|     1|
|      7|   neptune.|    21|
|      8| ftp_write.|     2|
|      8|loadmodule.|     2|
+-------+-----------+------+
only showing top 20 rows

