## PKA-KMEANS

Setting up the PySpark environment and creating a Spark session.


In [None]:
import pyspark
import os
import sys
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .appName("MovieRecommendationSystem") \
    .getOrCreate()


In [None]:
spark = SparkSession.builder \
    .config("spark.driver.memory", "16g") \
    .appName('chapter_5') \
    .getOrCreate()


Setting up the PySpark environment and creating a Spark session.
Setting the driver memory and creating a Spark session with a specific configuration.


In [None]:
 data_without_header = spark.read.option("inferSchema", True).\
option("header", False).\
csv("data/kddcup.data_10_percent_corrected")
column_names = [ "duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot", "num_failed_logins", "logged_in", "num_compromised",
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is_guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
"dst_host_count", "dst_host_srv_count",
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate",
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label"]
data = data_without_header.toDF(*column_names)

In [None]:
from pyspark.sql.functions import col
data.select("label").groupBy("label").count().orderBy(col("count").desc()).show(25)

- The data is read into a DataFrame `data_without_header` without inferring the schema and
without headers.
- Column names are specified for the DataFrame.
- The DataFrame `data` is created with the specified column names.
- The `select` method is used to select the "label" column, followed by `groupBy` and `count`
to count the occurrences of each label.
- The results are ordered in descending order based on the count and displayed

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml import Pipeline
numeric_only = data.drop("protocol_type", "service", "flag").cache()
assembler = VectorAssembler().setInputCols(numeric_only.columns[:-1]).\
setOutputCol("featureVector")
kmeans = KMeans().setPredictionCol("cluster").setFeaturesCol("featureVector")
pipeline = Pipeline().setStages([assembler, kmeans])
pipeline_model = pipeline.fit(numeric_only)
kmeans_model = pipeline_model.stages[1]
from pprint import pprint
pprint(kmeans_model.clusterCenters())


- Columns "protocol_type", "service", and "flag" are dropped from the DataFrame to retain only
numeric features.
- `VectorAssembler` is used to assemble the numeric features into a single vector column
named "featureVector".
- `KMeans` clustering algorithm is applied with "featureVector" as features and "cluster" as
prediction column.
- A `Pipeline` is created with stages including the `assembler` and `kmeans`.
- The `pipeline_model` is trained on `numeric_only` data.
- `clusterCenters()` method is used to get the centroid coordinates of the clusters found by
KMeans and printed using `pprint`.
- The `transform` method is used to add a "cluster" column to the `numeric_only` DataFrame
using the trained `pipeline_model`.
- The `select` method is used to select the "cluster" and "label" columns.
- `groupBy` and `count` methods are applied to count the occurrences of each label within
each cluster.
- The results are ordered first by "cluster" and then by count in descending order, and displayed.

In [None]:
from pyspark.sql import DataFrame
from random import randint
def clustering_score(input_data, k):
input_numeric_only = input_data.drop("protocol_type", "service", "flag")
assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).
,→setOutputCol("featureVector")
kmeans = KMeans().setSeed(randint(100,100000)).setK(k).
,→setPredictionCol("cluster").setFeaturesCol("featureVector")
pipeline = Pipeline().setStages([assembler, kmeans])
pipeline_model = pipeline.fit(input_numeric_only)
kmeans_model = pipeline_model.stages[-1]
training_cost = kmeans_model.summary.trainingCost
return training_cost
for k in list(range(20,100, 20)):
print(clustering_score(numeric_only, k))

- The `clustering_score` function takes input data and a number of
clusters `k` as parameters.
- Columns "protocol_type", "service", and "flag" are dropped from the input
data to retain only numeric features.
- `VectorAssembler` is used to assemble the numeric features into a
single vector column named "featureVector".
- `KMeans` clustering algorithm is applied with "featureVector" as
features, "cluster" as the prediction column, and a random seed.
- A `Pipeline` is created with stages including the `assembler` and
`kmeans`.
- The `pipeline_model` is trained on the `input_numeric_only` data.
- The training cost of the KMeans model is calculated using
`kmeans_model.summary.trainingCost`.
- The function is then called for different values of `k` ranging from 20 to 80
in steps of 20, and the training costs are printed.

In [None]:
def clustering_score_1(input_data, k):
input_numeric_only = input_data.drop("protocol_type", "service", "flag")
assembler = VectorAssembler().\
setInputCols(input_numeric_only.columns[:-1]).setOutputCol("featureVector")
kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).\
setTol(1.0e-5).setPredictionCol("cluster").setFeaturesCol("featureVector")
pipeline = Pipeline().setStages([assembler, kmeans])
pipeline_model = pipeline.fit(input_numeric_only)
kmeans_model = pipeline_model.stages[-1]
training_cost = kmeans_model.summary.trainingCost
return training_cost
for k in list(range(20,101, 20)):
    
    print(k, clustering_score_1(numeric_only, k))

- The `clustering_score_1` function is similar to the previous function but
with additional parameters for maximum iterations and tolerance.
- Columns "protocol_type", "service", and "flag" are dropped from the input
data to retain only numeric features.
- `VectorAssembler` is used to assemble the numeric features into a
single vector column named "featureVector".
- `KMeans` clustering algorithm is applied with "featureVector" as
features, "cluster" as the prediction column, a random seed, maximum
iterations set to 40, and a tolerance of 1.0e-5.
- A `Pipeline` is created with stages including the `assembler` and
`kmeans`.
- The `pipeline_model` is trained on the `input_numeric_only` data.
- The training cost of the KMeans model is calculated using
`kmeans_model.summary.trainingCost`.
- The function is then called for different values of `k` ranging from 20 to
100 in steps of 20, and the training costs along with `k` values are printed.

In [None]:
: from pyspark.ml.feature import StandardScaler
def clustering_score_2(input_data, k):
input_numeric_only = input_data.drop("protocol_type", "service", "flag")
assembler = VectorAssembler().\
setInputCols(input_numeric_only.columns[:-1]).\
setOutputCol("featureVector")
scaler = StandardScaler().setInputCol("featureVector").\
setOutputCol("scaledFeatureVector").\
setWithStd(True).setWithMean(False)
kmeans = KMeans().setSeed(randint(100,100000)).\
setK(k).setMaxIter(40).\
setTol(1.0e-5).setPredictionCol("cluster").\
setFeaturesCol("scaledFeatureVector")
pipeline = Pipeline().setStages([assembler, scaler, kmeans])
pipeline_model = pipeline.fit(input_numeric_only)
kmeans_model = pipeline_model.stages[-1]
training_cost = kmeans_model.summary.trainingCost
return training_cost
for k in list(range(60, 271, 30)):
print(k, clustering_score_2(numeric_only, k))


- The `clustering_score_2` function is an extension of the previous
function but includes a `StandardScaler` to standardize the features.
- Columns "protocol_type", "service", and "flag" are dropped from the input
data to retain only numeric features.
- `VectorAssembler` is used to assemble the numeric features into a
single vector column named "featureVector".
- `StandardScaler` is applied to standardize the "featureVector" and
output a "scaledFeatureVector".
- `KMeans` clustering algorithm is applied with "scaledFeatureVector" as
features, "cluster" as the prediction column, a random seed, maximum
iterations set to 40, and a tolerance of 1.0e-5.
- A `Pipeline` is created with stages including the `assembler`, `scaler`,
and `kmeans`.
- The `pipeline_model` is trained on the `input_numeric_only` data.
- The training cost of the KMeans model is calculated using
`kmeans_model.summary.trainingCost`.
- The function is then called for different values of `k` ranging from 60 to
270 in steps of 30, and the training costs along with `k` values are printed.

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
def one_hot_pipeline(input_col):
    indexer = StringIndexer().setInputCol(input_col).setOutputCol(input_col +"_indexed")
encoder = OneHotEncoder().setInputCol(input_col + "_indexed").
,→setOutputCol(input_col + "_vec")
pipeline = Pipeline().setStages([indexer, encoder])
return pipeline, input_col + "_vec"


- The `one_hot_pipeline` function is defined to create a pipeline that
includes `StringIndexer` and `OneHotEncoder` stages.
- `StringIndexer` is used to convert the categorical variable into indices.
- `OneHotEncoder` is used to encode the indexed categorical variable into
a one-hot encoded vector.
- The pipeline and the output column name for the one-hot encoded vector
are returned by the function.

In [None]:
def clustering_score_3(input_data, k):
proto_type_pipeline, proto_type_vec_col = one_hot_pipeline("protocol_type")
service_pipeline, service_vec_col = one_hot_pipeline("service")
flag_pipeline, flag_vec_col = one_hot_pipeline("flag")
assemble_cols = set(input_data.columns) - \
{"label", "protocol_type", "service", "flag"} | \
{proto_type_vec_col, service_vec_col, flag_vec_col}
assembler = VectorAssembler().setInputCols(list(assemble_cols)).
,→setOutputCol("featureVector")
scaler = StandardScaler().setInputCol("featureVector").
,→setOutputCol("scaledFeatureVector").setWithStd(True).setWithMean(False)
kmeans = KMeans().setSeed(randint(100,100000)).setK(k).setMaxIter(40).
,→setTol(1.0e-5).setPredictionCol("cluster").
,→setFeaturesCol("scaledFeatureVector")
pipeline = Pipeline().setStages([proto_type_pipeline, service_pipeline,
,→flag_pipeline, assembler, scaler, kmeans])
pipeline_model = pipeline.fit(input_data)
kmeans_model = pipeline_model.stages[-1]
training_cost = kmeans_model.summary.trainingCost
return training_cost
for k in list(range(60, 271, 30)):
print(k, clustering_score_3(data, k))

- The `clustering_score_3` function extends the previous clustering
function (`clustering_score_2`) to include one-hot encoding for
categorical variables.
- `one_hot_pipeline` is utilized to create pipelines for one-hot encoding of
the categorical variables "protocol_type", "service", and "flag".
- The assembled columns for the `VectorAssembler` are updated to
include the one-hot encoded vectors.
- The rest of the pipeline stages remain the same as in
`clustering_score_2`.
- The function is called for different values of `k` ranging from 60 to 270 in
steps of 30, and the training costs along with `k` values are printed.

In [None]:
from math import log
def entropy(counts):
values = [c for c in counts if (c > 0)]
n = sum(values)
p = [v/n for v in values]
return sum([-1*(p_v) * log(p_v) for p_v in p])

In [None]:
from pyspark.sql import functions as fun
from pyspark.sql import Window
cluster_label = pipeline_model.\
transform(data).\
select("cluster", "label")
df = cluster_label.\
groupBy("cluster", "label").\
count().orderBy("cluster")
w = Window.partitionBy("cluster")
p_col = df['count'] / fun.sum(df['count']).over(w)
with_p_col = df.withColumn("p_col", p_col)
result = with_p_col.groupBy("cluster").\
agg((-fun.sum(col("p_col") * fun.log2(col("p_col"))))
.alias("entropy"),
fun.sum(col("count"))
.alias("cluster_size"))
result = result.withColumn('weightedClusterEntropy',fun.col('entropy') * fun.
,→col('cluster_size'))
weighted_cluster_entropy_avg = result.\
agg(fun.sum(
col('weightedClusterEntropy'))).\
collect()
weighted_cluster_entropy_avg[0][0]/data.count()


- The `entropy` function calculates the entropy of a distribution given
counts.
- The entropy of each cluster is computed using the given formula.
- The weighted cluster entropy average is calculated by multiplying the
entropy of each cluster by the cluster size and then summing over all
clusters.
- Finally, the average weighted cluster entropy is computed by dividing the
total weighted cluster entropy by the total number of data points.

In [None]:
def fit_pipeline_4(data, k):
(proto_type_pipeline, proto_type_vec_col) = one_hot_pipeline("protocol_type")
(service_pipeline, service_vec_col) = one_hot_pipeline("service")
(flag_pipeline, flag_vec_col) = one_hot_pipeline("flag")
assemble_cols = set(data.columns) - {"label", "protocol_type", "service",
,→"flag"} | {proto_type_vec_col, service_vec_col, flag_vec_col}
assembler = VectorAssembler(inputCols=list(assemble_cols),
,→outputCol="featureVector")
scaler = StandardScaler(inputCol="featureVector",
,→outputCol="scaledFeatureVector", withStd=True, withMean=False)
kmeans = KMeans(seed=randint(100, 100000), k=k, predictionCol="cluster",
,→featuresCol="scaledFeatureVector", maxIter=40, tol=1.0e-5)
pipeline = Pipeline(stages=[proto_type_pipeline, service_pipeline,
,→flag_pipeline, assembler, scaler, kmeans])
return pipeline.fit(data)
def clustering_score_4(input_data, k):
pipeline_model = fit_pipeline_4(input_data, k)
cluster_label = pipeline_model.transform(input_data).select("cluster",
,→"label")
df = cluster_label.groupBy("cluster", "label").count().orderBy("cluster")
w = Window.partitionBy("cluster")
p_col = df['count'] / fun.sum(df['count']).over(w)
with_p_col = df.withColumn("p_col", p_col)
result = with_p_col.groupBy("cluster").agg(-fun.sum(col("p_col") * fun.
,→log2(col("p_col"))).alias("entropy"),
fun.sum(col("count")).
,→alias("cluster_size"))
result = result.withColumn('weightedClusterEntropy', col('entropy') *
,→col('cluster_size'))
weighted_cluster_entropy_avg = result.agg(fun.
,→sum(col('weightedClusterEntropy'))).collect()
return weighted_cluster_entropy_avg[0][0] / input_data.count()


- The `fit_pipeline_4` function is a modification of the `fit_pipeline_3` function with
more organized code and better readability.
- The `clustering_score_4` function is used to compute the weighted cluster entropy
for different values of `k` using the KMeans clustering algorithm with one-hot encoded
categorical variables.


In [None]:
pipeline_model = fit_pipeline_4(data, 180)
count_by_cluster_label = pipeline_model.transform(data).\
select("cluster", "label").\
groupBy("cluster", "label").\
count().orderBy("cluster", "label")
count_by_cluster_label.show()

The `fit_pipeline_4` function is called with `k=180` to fit the KMeans clustering
model.
- The `transform` method is applied to the input data using the fitted pipeline model.-
The resulting DataFrame is grouped by "cluster" and "label" to count the number of
occurrences of each label within each cluster and then count is displayed .