In [3]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('kmeans1').getOrCreate()


In [4]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [5]:
df = spark.read.load("./finalEdited-HumanHappiness-BDAS-mining.csv", format="csv", header=True, inferSchema=True)

# vector Assembler

In [6]:
# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
assembler = VectorAssembler(
    inputCols=['Year',
 'Rule of Law',
 'Disappearances, Conflicts, and Terrorism',
 'Women Security & Safety',
 'Security & Safety',
 'Women Movement',
 'Movement',
 'Legal and Regulatory Restrictions',
 'State Control over Internet Access',
 'Expression & Information',
 'Same Sex Relationships',
 'Divorce',
 'PERSONAL FREEDOM (Score)',
 'PERSONAL FREEDOM (Rank)',
 'Government  enterprises and investment',
 'Top marginal income tax rate',
 'Legal enforcement of contracts',
 'Reliability of police',
 'Gender Legal Rights Adjustment',
 'Money growth',
 'Inflation: Most recent year',
 'Compliance costs of importing and exporting',
 'Regulatory trade barriers',
 'Foreign ownership/investment restrictions',
 'Freedom to trade internationally',
 'Hiring regulations and minimum wage',
 'Labour market regulations',
 'Licensing restrictions',
 'Business regulations',
 'ECONOMIC FREEDOM (Score)',
 'ECONOMIC FREEDOM (Rank)'],
    outputCol="features")

In [7]:
# Now that we've created the assembler variable, let's actually transform the data.
output = assembler.transform(df)

In [8]:
output.show(2, truncate=False)

+----+---------+--------------------------+---------------------+--------------------+------------------------+-----------+----------------------------------------+-----------------------+-----------------+--------------+-----------+---------------------------------+----------------------------------+------------------------+----------------------+-------+------------------------+-----------------------+--------------------------------------+----------------------------+------------------------------+---------------------+------------------------------+------------+---------------------------+-------------------------------------------+-------------------------+-----------------------------------------+--------------------------------+-----------------------------------+-------------------------+----------------------+--------------------+------------------------+-----------------------+----------------------------------------------------------------------------------------------------

# StandardScaler

In [9]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
standard_scaler = StandardScaler(inputCol="features", outputCol="scaled")

In [10]:
train = standard_scaler.fit(output).transform(output)

In [11]:
train.select("scaled").show(2, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaled                                                                                                                                                                                                                                                                                                                                                                                                                                       

In [12]:
scalerModel = standard_scaler.fit(output)

In [13]:
# Normalize each feature to have unit standard deviation.
cluster_final_data = scalerModel.transform(output)

In [14]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('find_hacker').getOrCreate()
from pyspark.ml.clustering import KMeans

In [15]:
kmeans3 = KMeans(featuresCol='scaled',k=3)
kmeans2 = KMeans(featuresCol='scaled',k=2)
kmeans4 = KMeans(featuresCol='scaled',k=4)
kmeans5 = KMeans(featuresCol='scaled',k=5)

In [16]:
model_k3 = kmeans3.fit(cluster_final_data)
model_k2 = kmeans2.fit(cluster_final_data)
model_k4 = kmeans4.fit(cluster_final_data)
model_k5 = kmeans5.fit(cluster_final_data)

In [17]:
wssse_k3 = model_k3.computeCost(cluster_final_data)
wssse_k2 = model_k2.computeCost(cluster_final_data)
wssse_k4 = model_k4.computeCost(cluster_final_data)
wssse_k5 = model_k5.computeCost(cluster_final_data)

In [28]:
print("With K=3")
print("Within Set Sum of Squared Errors = " + str(wssse_k3))
print('--'*30)
print("With K=2")
print("Within Set Sum of Squared Errors = " + str(wssse_k2))
print('--'*30)
print("With K=4")
print("Within Set Sum of Squared Errors = " + str(wssse_k4))
print('--'*30)
print("With K=5")
print("Within Set Sum of Squared Errors = " + str(wssse_k5))

With K=3
Within Set Sum of Squared Errors = 30424.831201529385
------------------------------------------------------------
With K=2
Within Set Sum of Squared Errors = 37088.31139936291
------------------------------------------------------------
With K=4
Within Set Sum of Squared Errors = 27325.001510417664
------------------------------------------------------------
With K=5
Within Set Sum of Squared Errors = 25040.35169856845


In [19]:
for k in range(2,10):
    kmeans = KMeans(featuresCol='scaled',k=k)
    model = kmeans.fit(cluster_final_data)
    wssse = model.computeCost(cluster_final_data)
    print("With K={}".format(k))
    print("Within Set Sum of Squared Errors = " + str(wssse))
    print('--'*30)

With K=2
Within Set Sum of Squared Errors = 37088.31139936291
------------------------------------------------------------
With K=3
Within Set Sum of Squared Errors = 30424.831201529385
------------------------------------------------------------
With K=4
Within Set Sum of Squared Errors = 27325.001510417664
------------------------------------------------------------
With K=5
Within Set Sum of Squared Errors = 25040.35169856845
------------------------------------------------------------
With K=6
Within Set Sum of Squared Errors = 24171.434321481815
------------------------------------------------------------
With K=7
Within Set Sum of Squared Errors = 23309.555882499706
------------------------------------------------------------
With K=8
Within Set Sum of Squared Errors = 22588.624274133326
------------------------------------------------------------
With K=9
Within Set Sum of Squared Errors = 21866.685797117418
------------------------------------------------------------


In [20]:
model_k3.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  667|
|         2|   32|
|         0|  870|
+----------+-----+



In [21]:
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  793|
|         0|  776|
+----------+-----+



In [22]:
model_k4.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  471|
|         3|  441|
|         2|   32|
|         0|  625|
+----------+-----+



In [23]:
model_k5.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  468|
|         3|  155|
|         4|  621|
|         2|   32|
|         0|  293|
+----------+-----+



In [29]:
# Training a k-means model for cluster=5... we will do below steps for which this model fits best.
kmeans5 = KMeans(featuresCol='scaled',k=5)
model = kmeans5.fit(cluster_final_data)

In [30]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[705.36243466   2.16293188   3.64369963   2.227037     3.71313739
   1.56349489   1.82440406   3.23609883   1.4965975    3.82989334
   0.98236163   1.29816508   3.3413114    2.83715371   1.38201204
   2.19391129   1.75576456   1.19505937   5.67606167   7.86962824
   5.74621623   1.24878615   2.04945155   1.78668491   3.90382065
   1.76630298   3.26115979   3.00177986   3.3713686    4.68307871
   2.95751015]
[7.05294148e+02 4.18291192e+00 5.01619481e+00 3.81856816e+00
 5.40903205e+00 2.71857173e+00 3.43968350e+00 3.43784534e+00
 2.37683999e+00 5.17503755e+00 2.34096712e+00 2.57572154e+00
 5.27977090e+00 5.63985404e-01 2.61045794e+00 2.37531281e+00
 3.07980457e+00 2.93642758e+00 7.17811523e+00 8.62210767e+00
 6.72690337e+00 3.14145991e+00 3.97531817e+00 3.31071306e+00
 5.73216322e+00 2.22053619e+00 4.05058395e+00 3.80560514e+00
 4.87358726e+00 6.51373021e+00 5.93147120e-01]
[7.04478961e+02 0.00000000e+00 4.66095035e-01 0.00000000e+00
 5.17205051e-01 0.00000000e+00 0.000

In [31]:
model.transform(cluster_final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         1|
|         0|
|         0|
|         4|
|         1|
|         1|
|         1|
|         3|
|         4|
|         3|
|         0|
|         4|
|         4|
|         1|
|         4|
|         4|
|         3|
|         4|
|         4|
|         3|
+----------+
only showing top 20 rows

