In [6]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer


In [2]:
spark = SparkSession.builder.appName("hackers").getOrCreate()

24/01/20 22:18:14 WARN Utils: Your hostname, DESKTOP-0HJMRJF resolves to a loopback address: 127.0.1.1; using 172.20.13.164 instead (on interface eth0)
24/01/20 22:18:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/20 22:18:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/20 22:18:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
dataset = spark.read.csv('./data/ML/clustering/hack_data.csv', inferSchema=True, header=True)

In [4]:
dataset.show()

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+
|                    8.0|           391.09|              1|             2.96|            7.0|            Slovenia|           72.37|
|                   20.0|           720.99|              0|             3.04|            9.0|British Virgin Is...|           69.08|
|                   31.0|           356.32|              1|             3.71|            8.0|             Tokelau|           70.58|
|                    2.0|           228.08|              1|             2.48|            8.0|             Bolivia|            70.8|
|                   20.0|            408.5|              0|             3.57

In [5]:
dataset.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)


In [7]:
indexer = StringIndexer(inputCol="Location", outputCol="Location_index")

In [8]:
df_encoded = indexer.fit(dataset).transform(dataset)

In [9]:
df_encoded.show()

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|Location_index|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------+
|                    8.0|           391.09|              1|             2.96|            7.0|            Slovenia|           72.37|          88.0|
|                   20.0|           720.99|              0|             3.04|            9.0|British Virgin Is...|           69.08|          47.0|
|                   31.0|           356.32|              1|             3.71|            8.0|             Tokelau|           70.58|          92.0|
|                    2.0|           228.08|              1|             2.48|            8.0|             Bolivia|    

In [11]:
df_encoded.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed',
 'Location_index']

In [12]:
assembler = VectorAssembler(inputCols=['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'WPM_Typing_Speed',
 'Location_index'], outputCol='features')

In [13]:
final_data = assembler.transform(df_encoded)

In [14]:
final_data.show()

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------+--------------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|Location_index|            features|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------+--------------------+
|                    8.0|           391.09|              1|             2.96|            7.0|            Slovenia|           72.37|          88.0|[8.0,391.09,1.0,2...|
|                   20.0|           720.99|              0|             3.04|            9.0|British Virgin Is...|           69.08|          47.0|[20.0,720.99,0.0,...|
|                   31.0|           356.32|              1|             3.71|            8.0|             Tokelau|           70.58|          92.0|[31.0,356.32,1

In [17]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

In [18]:
scaler_model = scaler.fit(final_data)

In [19]:
final_data = scaler_model.transform(final_data)

In [20]:
final_data.show()

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------+--------------------+--------------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|Location_index|            features|     scaled_features|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+--------------+--------------------+--------------------+
|                    8.0|           391.09|              1|             2.96|            7.0|            Slovenia|           72.37|          88.0|[8.0,391.09,1.0,2...|[0.56785108466505...|
|                   20.0|           720.99|              0|             3.04|            9.0|British Virgin Is...|           69.08|          47.0|[20.0,720.99,0.0,...|[1.41962771166263...|
|                   31.0|           356.32|            

In [36]:
kmeans3 = KMeans(k=3, featuresCol='scaled_features')
kmeans2 = KMeans(k=2, featuresCol='scaled_features')

In [37]:
model_k3 = kmeans3.fit(final_data)
model_k2 = kmeans2.fit(final_data)

In [39]:
model_k3.summary.trainingCost

766.2453058299543

In [40]:
model_k2.summary.trainingCost

933.9678527392769

In [44]:
center_k3 = model_k3.clusterCenters()
center_k3

[array([2.99991988, 2.92319035, 1.05261534, 3.20390443, 4.51321315,
        3.28474   , 1.22567775]),
 array([1.30217042, 1.25830099, 0.        , 1.35793211, 2.57251009,
        5.24230473, 1.29928065]),
 array([1.21780112, 1.37901802, 1.99757683, 1.37198977, 2.55237797,
        5.29152222, 1.34848733])]

In [45]:
center_k2 = model_k2.clusterCenters()
center_k2

[array([2.99991988, 2.92319035, 1.05261534, 3.20390443, 4.51321315,
        3.28474   , 1.22567775]),
 array([1.26023837, 1.31829808, 0.99280765, 1.36491885, 2.5625043 ,
        5.26676612, 1.32373666])]

In [66]:
transformed_k3 = model_k3.transform(final_data)

In [67]:
transformed_k2 =model_k2.transform(final_data)

In [68]:
transformed_k3.select('prediction').groupby('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   84|
|         2|   83|
|         0|  167|
+----------+-----+


In [69]:
transformed_k2.select('prediction').groupby('prediction').count().show()


+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+
