In [97]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
#spark = sqlContext

In [98]:
beer = spark.read.csv('F:/JupyterML/ML_Practice/datasets/beers.csv',sep =',',inferSchema='true',header ='true')

In [99]:
beer.show(4)

+---+-----+----+----+-------------------+--------------------+----------+------+
|_c0|  abv| ibu|  id|               name|               style|brewery_id|ounces|
+---+-----+----+----+-------------------+--------------------+----------+------+
|  0| 0.05|null|1436|           Pub Beer| American Pale Lager|       408|  12.0|
|  1|0.066|null|2265|        Devil's Cup|American Pale Ale...|       177|  12.0|
|  2|0.071|null|2264|Rise of the Phoenix|        American IPA|       177|  12.0|
|  3| 0.09|null|2263|           Sinister|American Double /...|       177|  12.0|
+---+-----+----+----+-------------------+--------------------+----------+------+
only showing top 4 rows



In [100]:
beer = beer[['abv','style','brewery_id','ounces']]

In [101]:
beer.show(3)

+-----+--------------------+----------+------+
|  abv|               style|brewery_id|ounces|
+-----+--------------------+----------+------+
| 0.05| American Pale Lager|       408|  12.0|
|0.066|American Pale Ale...|       177|  12.0|
|0.071|        American IPA|       177|  12.0|
+-----+--------------------+----------+------+
only showing top 3 rows



In [102]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,VectorIndexer,OneHotEncoder, VectorAssembler, IndexToString
from pyspark.sql.functions import *

In [103]:
beer = beer.na.drop()

In [104]:
beer.count()

2346

In [105]:
(traindf, testdf) = beer.randomSplit([0.7,0.3],seed =42)

In [106]:
traindf.count()

1666

In [107]:
beer.dtypes

[('abv', 'double'),
 ('style', 'string'),
 ('brewery_id', 'int'),
 ('ounces', 'double')]

In [108]:
cat_features = [t[0] for t in beer.dtypes if t[1] =='string']
num_features = [t[0] for t in beer.dtypes if t[1] =='int' or t[1] =='double']

In [109]:
cat_features

['style']

In [110]:
num_features

['abv', 'brewery_id', 'ounces']

In [111]:
beer.select(num_features).show(5)

+-----+----------+------+
|  abv|brewery_id|ounces|
+-----+----------+------+
| 0.05|       408|  12.0|
|0.066|       177|  12.0|
|0.071|       177|  12.0|
| 0.09|       177|  12.0|
|0.075|       177|  12.0|
+-----+----------+------+
only showing top 5 rows



In [112]:
styleIndexer = StringIndexer(inputCol='style', outputCol='Indexedstyle')

In [113]:
input_col = ['abv', 'brewery_id', 'ounces','Indexedstyle']

In [114]:
assembler = VectorAssembler(inputCols=input_col,outputCol="features")

In [115]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StandardScaler

In [116]:
scaler = StandardScaler(inputCol = 'features', outputCol = 'scaledFeatures')

In [117]:
pipeline_features = [styleIndexer,assembler,scaler]

In [118]:
pipeline = Pipeline(stages=pipeline_features)

In [119]:
pipe = pipeline.fit(beer)
p_fea = pipe.transform(beer)

In [120]:
p_fea.show(4,False)

+-----+------------------------------+----------+------+------------+----------------------+---------------------------------------------------------------------------+
|abv  |style                         |brewery_id|ounces|Indexedstyle|features              |scaledFeatures                                                             |
+-----+------------------------------+----------+------+------------+----------------------+---------------------------------------------------------------------------+
|0.05 |American Pale Lager           |408       |12.0  |14.0        |[0.05,408.0,12.0,14.0]|[3.690912908094385,2.611791589562038,5.143333636031784,0.7239446974073551] |
|0.066|American Pale Ale (APA)       |177       |12.0  |1.0         |[0.066,177.0,12.0,1.0]|[4.872005038684589,1.1330566454717665,5.143333636031784,0.0517103355290968]|
|0.071|American IPA                  |177       |12.0  |0.0         |[0.071,177.0,12.0,0.0]|[5.2410963294940265,1.1330566454717665,5.143333636031784,0.0]  

In [121]:
kmeans3 = KMeans(featuresCol = 'scaledFeatures', k=3)
kmeans4 = KMeans(featuresCol = 'scaledFeatures', k=4)
kmeans5 = KMeans(featuresCol = 'scaledFeatures', k=5)

In [122]:
model3 = kmeans3.fit(p_fea)

In [123]:
print('WSSSE:', model3.computeCost(p_fea))

WSSSE: 6034.318067860257


In [124]:
model4 = kmeans4.fit(p_fea)
print('WSSSE:', model4.computeCost(p_fea))

WSSSE: 5066.012872310219


In [125]:
model5 = kmeans5.fit(p_fea)
print('WSSSE:', model5.computeCost(p_fea))

WSSSE: 4463.228829303852


In [126]:
kmeans6 = KMeans(featuresCol = 'scaledFeatures', k=6)
kmeans7 = KMeans(featuresCol = 'scaledFeatures', k=7)
kmeans2 = KMeans(featuresCol = 'scaledFeatures', k=2)
model6 = kmeans6.fit(p_fea)
model7 = kmeans7.fit(p_fea)
model2 = kmeans2.fit(p_fea)
print('WSSSE:', model6.computeCost(p_fea))
print('WSSSE:', model7.computeCost(p_fea))
print('WSSSE:', model2.computeCost(p_fea))

WSSSE: 4064.386848131942
WSSSE: 3416.601691185484
WSSSE: 7394.593349405141


In [127]:
aaa = model5.transform(p_fea)

In [128]:
centers = model5.clusterCenters()
print(centers)

[array([4.33376969, 1.99850107, 7.11515652, 0.48096996]), array([4.11636484, 0.70748702, 5.51791816, 0.39580444]), array([6.32524977, 1.11513259, 6.20451011, 0.95361664]), array([4.36515625, 1.16949018, 5.92407511, 2.9103884 ]), array([4.03724143, 2.52704487, 5.14333364, 0.49782459])]


In [129]:
aaa.show(4)

+-----+--------------------+----------+------+------------+--------------------+--------------------+----------+
|  abv|               style|brewery_id|ounces|Indexedstyle|            features|      scaledFeatures|prediction|
+-----+--------------------+----------+------+------------+--------------------+--------------------+----------+
| 0.05| American Pale Lager|       408|  12.0|        14.0|[0.05,408.0,12.0,...|[3.69091290809438...|         4|
|0.066|American Pale Ale...|       177|  12.0|         1.0|[0.066,177.0,12.0...|[4.87200503868458...|         1|
|0.071|        American IPA|       177|  12.0|         0.0|[0.071,177.0,12.0...|[5.24109632949402...|         1|
| 0.09|American Double /...|       177|  12.0|         4.0|[0.09,177.0,12.0,...|[6.64364323456989...|         2|
+-----+--------------------+----------+------+------------+--------------------+--------------------+----------+
only showing top 4 rows



In [130]:
aaa.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  833|
|         3|  269|
|         4|  574|
|         2|  265|
|         0|  405|
+----------+-----+

