In [None]:
from pyspark.sql import *


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('find_hacker').getOrCreate()

In [4]:
from pyspark.ml.clustering import KMeans

In [1]:
dataset = spark.read.csv("hack_data.csv",header=True,inferSchema=True)

In [2]:
dataset.show(10)

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+
|                    8.0|           391.09|              1|             2.96|            7.0|            Slovenia|           72.37|
|                   20.0|           720.99|              0|             3.04|            9.0|British Virgin Is...|           69.08|
|                   31.0|           356.32|              1|             3.71|            8.0|             Tokelau|           70.58|
|                    2.0|           228.08|              1|             2.48|            8.0|             Bolivia|            70.8|
|                   20.0|            408.5|              0|             3.57

In [5]:
dataset.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)



Importing VectorAssembler and creating our Features
We must transform our data using the VectorAssembler function to a single column where each row of the DataFrame contains a feature vector. In order to create our clusters, we need to select columns based on which we will then create our features column. Here we are using the columns:

Session_Connection_Time
Bytes Transferred
Kali_Trace_Used
Servers_Corrupted
Pages_Corrupted
WPM_Typing_Speed : Words Per Minute

In [6]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
 
feat_cols = ['Session_Connection_Time', 'Bytes Transferred', 'Kali_Trace_Used',
'Servers_Corrupted', 'Pages_Corrupted','WPM_Typing_Speed']
 
vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')
 
final_data = vec_assembler.transform(dataset)

Importing the StandardScaler Library and Creating Scaler
Centering and Scaling happen independently on each feature by computing the relevant statistics on the samples in the training set. Mean and standard deviation are then stored to be used on later data using the transform method.

Standardization of a dataset is a common requirement for many machine learning estimators: they might behave badly if the individual feature does not more or less look like standard normally distributed data.

In [7]:
from pyspark.ml.feature import StandardScaler
 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

Computing Summary Statistics
Let’s Compute summary statistics by fitting the StandardScaler. Then Normalize each feature to have a unit standard deviation.

In [8]:
scalerModel = scaler.fit(final_data)
 
cluster_final_data = scalerModel.transform(final_data)
 
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)

Building KMeans Model and Calculating WSSE (Within Set of Squared Errors)
We have to first build our Model. The number of desired clusters is then passed to the algorithm. We then compute Within Set Sum of Squared Error (WSSSE). We use the values derived from these to figure out whether we have 2 or 3 hackers.

In [10]:
model_k3 = kmeans3.fit(cluster_final_data)
model_k2 = kmeans2.fit(cluster_final_data)
 

 
wssse_k3 = model_k3.computeCost(cluster_final_data)
wssse_k2 = model_k2.computeCost(cluster_final_data)
 
 
print("With K=3")
print("Within Set Sum of Squared Errors = " + str(wssse_k3))
print('--'*30)
print("With K=2")
print("Within Set Sum of Squared Errors = " + str(wssse_k2))

With K=3
Within Set Sum of Squared Errors = 434.1492898715845
------------------------------------------------------------
With K=2
Within Set Sum of Squared Errors = 601.7707512676716


In [11]:
#Checking the Elbow Point (WSSSE)
for k in range(2,9):
    kmeans = KMeans(featuresCol='scaledFeatures',k=k)
    model = kmeans.fit(cluster_final_data)
    wssse = model.computeCost(cluster_final_data)
    print("With K={}".format(k))
    print("Within Set Sum of Squared Errors = " + str(wssse))
    print('--'*30)

With K=2
Within Set Sum of Squared Errors = 601.7707512676716
------------------------------------------------------------
With K=3
Within Set Sum of Squared Errors = 434.1492898715845
------------------------------------------------------------
With K=4
Within Set Sum of Squared Errors = 267.1336116887891
------------------------------------------------------------
With K=5
Within Set Sum of Squared Errors = 400.35725711494183
------------------------------------------------------------
With K=6
Within Set Sum of Squared Errors = 232.7357957188543
------------------------------------------------------------
With K=7
Within Set Sum of Squared Errors = 221.5875750519926
------------------------------------------------------------
With K=8
Within Set Sum of Squared Errors = 209.433374495304
------------------------------------------------------------


In [12]:
#Final Check for the number of Hacker
model_k3.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   83|
|         2|   84|
|         0|  167|
+----------+-----+



In [13]:
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()


+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+

