**CLUSTERING WITH SPARK**<br />
Clustering helps in grouping the unlabelled data together. The main problem while clustering is prediction of the number of clusters "k". We need domain knowledge to decide the "k" value. "Elbow method" can be used to compute the "k" value.
<br />*Extra Reading - Book name - Introduction to Statistical Learning by Gareth James- Chapter 10*

In [1]:
%%time
#spark installation
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 40kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 41.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=3c09d10ce5b391b57986a1ddd06985bd480451902e74ebbb5cd3d306c317fd68
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5
CPU 

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"


import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Clustering").getOrCreate()

In [4]:
from google.colab import files
files.upload()

Saving sample_kmeans_data.txt to sample_kmeans_data.txt


{'sample_kmeans_data.txt': b'0 1:0.0 2:0.0 3:0.0\n1 1:0.1 2:0.1 3:0.1\n2 1:0.2 2:0.2 3:0.2\n3 1:9.0 2:9.0 3:9.0\n4 1:9.1 2:9.1 3:9.1\n5 1:9.2 2:9.2 3:9.2\n'}

In [5]:
sampleDf = spark.read.format('libsvm').load("sample_kmeans_data.txt")
sampleDf.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [6]:
finalData = sampleDf.select('features')
finalData.show(5)

+--------------------+
|            features|
+--------------------+
|           (3,[],[])|
|(3,[0,1,2],[0.1,0...|
|(3,[0,1,2],[0.2,0...|
|(3,[0,1,2],[9.0,9...|
|(3,[0,1,2],[9.1,9...|
+--------------------+
only showing top 5 rows



In [7]:
from pyspark.ml.clustering import KMeans

# Trains a k-means model.
kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(finalData)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(finalData)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Within Set Sum of Squared Errors = 0.07499999999994544
Cluster Centers: 
[9.1 9.1 9.1]
[0.05 0.05 0.05]
[0.2 0.2 0.2]


In [8]:
results = model.transform(finalData)
results.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         1|
|(3,[0,1,2],[0.1,0...|         1|
|(3,[0,1,2],[0.2,0...|         2|
|(3,[0,1,2],[9.0,9...|         0|
|(3,[0,1,2],[9.1,9...|         0|
|(3,[0,1,2],[9.2,9...|         0|
+--------------------+----------+



In [9]:
from google.colab import files
files.upload()

Saving seeds_dataset.csv to seeds_dataset.csv


{'seeds_dataset.csv': b'area,perimeter,compactness,length_of_kernel,width_of_kernel,asymmetry_coefficient,length_of_groove\n15.26,14.84,0.871,5.763,3.312,2.221,5.22\n14.88,14.57,0.8811,5.553999999999999,3.333,1.018,4.956\n14.29,14.09,0.905,5.291,3.3369999999999997,2.699,4.825\n13.84,13.94,0.8955,5.324,3.3789999999999996,2.259,4.805\n16.14,14.99,0.9034,5.6579999999999995,3.562,1.355,5.175\n14.38,14.21,0.8951,5.386,3.312,2.4619999999999997,4.956\n14.69,14.49,0.8799,5.563,3.259,3.5860000000000003,5.218999999999999\n14.11,14.1,0.8911,5.42,3.302,2.7,5.0\n16.63,15.46,0.8747,6.053,3.465,2.04,5.877000000000001\n16.44,15.25,0.888,5.8839999999999995,3.505,1.969,5.5329999999999995\n15.26,14.85,0.8696,5.7139999999999995,3.242,4.543,5.314\n14.03,14.16,0.8796,5.438,3.201,1.7169999999999999,5.001\n13.89,14.02,0.888,5.439,3.199,3.986,4.738\n13.78,14.06,0.8759,5.479,3.156,3.136,4.872\n13.74,14.05,0.8744,5.482,3.114,2.932,4.825\n14.59,14.28,0.8993,5.351,3.333,4.185,4.781000000000001\n13.99,13.83,0.9183,

In [11]:
seedDataset = spark.read.csv("seeds_dataset.csv", inferSchema=True, header=True)
seedDataset.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)



In [12]:
seedDataset.show(5)

+-----+---------+-----------+------------------+------------------+---------------------+----------------+
| area|perimeter|compactness|  length_of_kernel|   width_of_kernel|asymmetry_coefficient|length_of_groove|
+-----+---------+-----------+------------------+------------------+---------------------+----------------+
|15.26|    14.84|      0.871|             5.763|             3.312|                2.221|            5.22|
|14.88|    14.57|     0.8811| 5.553999999999999|             3.333|                1.018|           4.956|
|14.29|    14.09|      0.905|             5.291|3.3369999999999997|                2.699|           4.825|
|13.84|    13.94|     0.8955|             5.324|3.3789999999999996|                2.259|           4.805|
|16.14|    14.99|     0.9034|5.6579999999999995|             3.562|                1.355|           5.175|
+-----+---------+-----------+------------------+------------------+---------------------+----------------+
only showing top 5 rows



In [0]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=seedDataset.columns, outputCol='features')
finalData = assembler.transform(seedDataset)

In [19]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
scaler_model = scaler.fit(finalData)
finalData = scaler_model.transform(finalData)
finalData.printSchema()

root
 |-- area: double (nullable = true)
 |-- perimeter: double (nullable = true)
 |-- compactness: double (nullable = true)
 |-- length_of_kernel: double (nullable = true)
 |-- width_of_kernel: double (nullable = true)
 |-- asymmetry_coefficient: double (nullable = true)
 |-- length_of_groove: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [21]:
# Trains a k-means model.
kmeans = KMeans(featuresCol='scaledFeatures', k=3).setSeed(1)
model = kmeans.fit(finalData)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(finalData)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Within Set Sum of Squared Errors = 428.6333432285446
Cluster Centers: 
[ 6.35645488 12.40730852 37.41990178 13.93860446  9.7892399   2.41585013
 12.29286107]
[ 4.07135818 10.14438097 35.86461803 11.81349589  7.53471695  3.18317127
 10.39230304]
[ 4.94114963 10.95557919 37.3028184  12.42383591  8.60815545  1.80983376
 10.40657797]


In [26]:
model.transform(finalData).select('area','perimeter','prediction').show()

+-----+---------+----------+
| area|perimeter|prediction|
+-----+---------+----------+
|15.26|    14.84|         2|
|14.88|    14.57|         2|
|14.29|    14.09|         2|
|13.84|    13.94|         2|
|16.14|    14.99|         2|
|14.38|    14.21|         2|
|14.69|    14.49|         2|
|14.11|     14.1|         2|
|16.63|    15.46|         0|
|16.44|    15.25|         2|
|15.26|    14.85|         2|
|14.03|    14.16|         2|
|13.89|    14.02|         2|
|13.78|    14.06|         2|
|13.74|    14.05|         2|
|14.59|    14.28|         2|
|13.99|    13.83|         2|
|15.69|    14.75|         2|
| 14.7|    14.21|         2|
|12.72|    13.57|         1|
+-----+---------+----------+
only showing top 20 rows



In [27]:
from google.colab import files
files.upload()

Saving hack_data.csv to hack_data.csv


{'hack_data.csv': b"Session_Connection_Time,Bytes Transferred,Kali_Trace_Used,Servers_Corrupted,Pages_Corrupted,Location,WPM_Typing_Speed\n8.0,391.09,1,2.96,7.0,Slovenia,72.37\n20.0,720.99,0,3.04,9.0,British Virgin Islands,69.08\n31.0,356.32,1,3.71,8.0,Tokelau,70.58\n2.0,228.08,1,2.48,8.0,Bolivia,70.8\n20.0,408.5,0,3.57,8.0,Iraq,71.28\n1.0,390.69,1,2.79,9.0,Marshall Islands,71.57\n18.0,342.97,1,5.1,7.0,Georgia,72.32\n22.0,101.61,1,3.03,7.0,Timor-Leste,72.03\n15.0,275.53,1,3.53,8.0,Palestinian Territory,70.17\n12.0,424.83,1,2.53,8.0,Bangladesh,69.99\n15.0,249.09,1,3.39,9.0,Northern Mariana Islands,70.77\n32.0,242.48,0,4.24,8.0,Zimbabwe,67.93\n23.0,514.54,0,3.18,8.0,Isle of Man,68.56\n9.0,284.77,0,3.12,9.0,Sao Tome and Principe,70.82\n27.0,779.25,1,2.37,8.0,Greece,72.73\n12.0,307.31,1,3.22,7.0,Solomon Islands,67.95\n21.0,355.94,1,2.0,7.0,Guinea-Bissau,72.0\n10.0,372.65,0,3.33,7.0,Burkina Faso,69.19\n20.0,347.23,1,2.33,7.0,Mongolia,70.41\n22.0,456.57,0,1.52,8.0,Nigeria,69.35\n25.0,582.03,

In [28]:
hackDataset = spark.read.csv('hack_data.csv', inferSchema=True, header=True)
hackDataset.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)



In [29]:
hackDataset.show()

+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+
|Session_Connection_Time|Bytes Transferred|Kali_Trace_Used|Servers_Corrupted|Pages_Corrupted|            Location|WPM_Typing_Speed|
+-----------------------+-----------------+---------------+-----------------+---------------+--------------------+----------------+
|                    8.0|           391.09|              1|             2.96|            7.0|            Slovenia|           72.37|
|                   20.0|           720.99|              0|             3.04|            9.0|British Virgin Is...|           69.08|
|                   31.0|           356.32|              1|             3.71|            8.0|             Tokelau|           70.58|
|                    2.0|           228.08|              1|             2.48|            8.0|             Bolivia|            70.8|
|                   20.0|            408.5|              0|             3.57

In [31]:
hackDataset.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed']

In [0]:
assembler = VectorAssembler(inputCols=('Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted', 'WPM_Typing_Speed'), outputCol='features')
finalData = assembler.transform(hackDataset)

In [35]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
scaler_model = scaler.fit(finalData)
finalData = scaler_model.transform(finalData)
finalData.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [37]:
# Trains a k-means model.
kmeans = KMeans(featuresCol='scaledFeatures', k=2).setSeed(1)
model_k2 = kmeans.fit(finalData)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model_k2.computeCost(finalData)
print("Within Set Sum of Squared Errors = " + str(wssse))


# Trains a k-means model.
kmeans = KMeans(featuresCol='scaledFeatures', k=3).setSeed(1)
model_k3 = kmeans.fit(finalData)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model_k3.computeCost(finalData)
print("Within Set Sum of Squared Errors = " + str(wssse))

Within Set Sum of Squared Errors = 601.7707512676716
Within Set Sum of Squared Errors = 434.75507308487647


In [39]:
model_k3.transform(finalData).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   88|
|         2|   79|
|         0|  167|
+----------+-----+



In [40]:
model_k2.transform(finalData).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+

