## Import Spark and MlLIB library

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import  KMeans

spark = SparkSession.builder.appName('KMeans_Clustering').getOrCreate()

## Load Dataset

In [2]:
#import the dataset
hack_df = spark.read.csv("hack_data.csv",header=True,inferSchema=True)

Describe the dataset

In [3]:
hack_df.describe().show()

+-------+-----------------------+------------------+------------------+-----------------+------------------+-----------+------------------+
|summary|Session_Connection_Time| Bytes Transferred|   Kali_Trace_Used|Servers_Corrupted|   Pages_Corrupted|   Location|  WPM_Typing_Speed|
+-------+-----------------------+------------------+------------------+-----------------+------------------+-----------+------------------+
|  count|                    334|               334|               334|              334|               334|        334|               334|
|   mean|     30.008982035928145| 607.2452694610777|0.5119760479041916|5.258502994011977|10.838323353293413|       NULL|57.342395209580864|
| stddev|     14.088200614636158|286.33593163576757|0.5006065264451406| 2.30190693339697|  3.06352633036022|       NULL| 13.41106336843464|
|    min|                    1.0|              10.0|                 0|              1.0|               6.0|Afghanistan|              40.0|
|    max|           

print the Schema and know about the data

In [4]:
hack_df.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)



## Import Vector Assembler

In [5]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [6]:
input_cols = ['Session_Connection_Time', 'Bytes Transferred', 'Kali_Trace_Used',
             'Servers_Corrupted', 'Pages_Corrupted','WPM_Typing_Speed']

In [8]:
assembler = VectorAssembler(inputCols = input_cols, outputCol='features')

In [9]:
final_data = assembler.transform(hack_df)

In [11]:
#Import StandardScaler
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

In [12]:
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_data)

In [13]:
# Normalize each feature to have unit standard deviation.
cluster_final_data = scalerModel.transform(final_data)

** let's find out whether its 2 or 3! **

In [14]:
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)

In [15]:
model_k3 = kmeans3.fit(cluster_final_data)
model_k2 = kmeans2.fit(cluster_final_data)

In [17]:
#Make Predictions
predicitons_k3 = model_k3.transform(cluster_final_data)
predicitons_k2 = model_k2.transform(cluster_final_data)

In [18]:
from pyspark.ml.evaluation import ClusteringEvaluator
# Evaluate Clustering 
evaluator = ClusteringEvaluator()
evaluate_k3 = evaluator.evaluate(predicitons_k3)
evaluate_k2 = evaluator.evaluate(predicitons_k2) 

In [19]:
print(evaluate_k3,evaluate_k2)

0.3068084951287429 0.6683623593283755


In [20]:
for k in range(2,9):
    kmeans = KMeans(featuresCol='scaledFeatures',k=k)
    model = kmeans.fit(cluster_final_data)
    wssse = model.transform(cluster_final_data)
    print("With K={}".format(k))
    print("Within Set Sum of Squared Errors = " + str(wssse))
    print('--'*30)

With K=2
Within Set Sum of Squared Errors = DataFrame[Session_Connection_Time: double, Bytes Transferred: double, Kali_Trace_Used: int, Servers_Corrupted: double, Pages_Corrupted: double, Location: string, WPM_Typing_Speed: double, features: vector, scaledFeatures: vector, prediction: int]
------------------------------------------------------------
With K=3
Within Set Sum of Squared Errors = DataFrame[Session_Connection_Time: double, Bytes Transferred: double, Kali_Trace_Used: int, Servers_Corrupted: double, Pages_Corrupted: double, Location: string, WPM_Typing_Speed: double, features: vector, scaledFeatures: vector, prediction: int]
------------------------------------------------------------
With K=4
Within Set Sum of Squared Errors = DataFrame[Session_Connection_Time: double, Bytes Transferred: double, Kali_Trace_Used: int, Servers_Corrupted: double, Pages_Corrupted: double, Location: string, WPM_Typing_Speed: double, features: vector, scaledFeatures: vector, prediction: int]
-----

In [21]:
model_k3.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         2|   83|
|         0|   84|
+----------+-----+



In [22]:
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+

