In [42]:
from pyspark.sql.functions import split
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec, PCA, VectorAssembler
from pyspark.ml.clustering import KMeans

In [4]:
spark = SparkSession.builder.appName("Beer Reviews Clustering").getOrCreate()

24/11/11 12:44:11 WARN Utils: Your hostname, Abhays-MacBook-Air-3.local resolves to a loopback address: 127.0.0.1; using 192.168.0.36 instead (on interface en0)
24/11/11 12:44:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 12:44:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
beers = spark.read.csv(path="processed_data/merged_list.csv", header=True, inferSchema=True)

                                                                                

In [6]:
beers = beers.withColumn("style_tokens", split(beers["style"], " "))
beers = beers.withColumn("availability_tokens", split(beers["availability"], " "))

# Apply Word2Vec on `style_tokens`
style_word2vec = Word2Vec(vectorSize=50, minCount=0, inputCol="style_tokens", outputCol="style_embed")
style_model = style_word2vec.fit(beers)
beers = style_model.transform(beers)

# Apply Word2Vec on `availability_tokens`
availability_word2vec = Word2Vec(vectorSize=50, minCount=0, inputCol="availability_tokens", outputCol="availability_embed")
availability_model = availability_word2vec.fit(beers)
beers = availability_model.transform(beers)

24/11/11 12:44:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [7]:
beers = beers.drop("style_tokens", "availability_tokens")

In [8]:
# Apply PCA on style embeddings
style_pca = PCA(k=10, inputCol="style_embed", outputCol="style_embed_pca")
style_pca_model = style_pca.fit(beers)
beers = style_pca_model.transform(beers)

# Apply PCA on availability embeddings
availability_pca = PCA(k=10, inputCol="availability_embed", outputCol="availability_embed_pca")
availability_pca_model = availability_pca.fit(beers)
beers = availability_pca_model.transform(beers)

24/11/11 12:44:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [9]:
beers = beers.drop("style_embed", "availability_embed")
beers = beers.withColumn("abv", beers["abv"].cast("int")).dropna(subset="abv")

In [48]:
assembler = VectorAssembler(
    inputCols=["look", "smell", "taste", "feel", "style_embed_pca", "availability_embed_pca"],
    outputCol="features"
)

features_df = assembler.transform(beers)

In [59]:
kmeans = KMeans(k=40, seed=1, featuresCol="features")
model = kmeans.fit(features_df)
predictions = model.transform(features_df)

                                                                                

In [66]:
pca = PCA(k=3, inputCol="features", outputCol="feature")
pca_model = pca.fit(predictions)
data = pca_model.transform(predictions)

                                                                                

In [67]:
data.show(5)

+---+--------------------+----------+-------+--------------------+------------+---+-------+-----+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+----------------------+--------------------+----------+--------------------+
| id|                beer|brewery_id|country|               style|availability|abv|retired|count|              look|             smell|             taste|              feel|           overall|             score|             brewery|     style_embed_pca|availability_embed_pca|            features|prediction|             feature|
+---+--------------------+----------+-------+--------------------+------------+---+-------+-----+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+----------------------+--------------------+----------+--------------------+
| 26|Wachu

In [72]:
df = data.drop("style_embed_pca", "availability_embed_pca", "features").toPandas()

                                                                                

In [73]:
df["prediction"].value_counts()

prediction
0     7500
14    6406
6     5329
39    5169
7     4450
2     4303
25    3852
21    3842
36    3760
38    3756
27    3645
3     3548
15    3398
4     3365
17    3199
28    2965
1     2874
26    2775
34    2607
35    2562
10    1899
5     1871
31    1841
13    1751
12    1663
19    1377
23    1305
18    1298
24    1281
30    1244
9     1183
22    1066
8     1010
37     903
33     773
32     750
29     648
16     508
20     415
11     320
Name: count, dtype: int64

In [75]:
df = df.rename(columns={"prediction":"cluster", "feature":"features"})

In [76]:
df

Unnamed: 0,id,beer,brewery_id,country,style,availability,abv,retired,count,look,smell,taste,feel,overall,score,brewery,cluster,features
0,26,Wachusett Octoberfest Ale,20,US,American Amber / Red Ale,Rotating,5,f,87,3.574713,3.301724,3.451149,3.451149,3.522989,3.437356,Wachusett Brewing Company,35,"[0.7835619022802504, -1.9435483736425239, -0.3..."
1,27,Quinn's Amber Ale,20,US,Irish Red Ale,Rotating,4,f,52,3.591346,3.341346,3.408654,3.432692,3.509615,3.426538,Wachusett Brewing Company,35,"[0.7473625667763036, -1.899459959713967, -0.13..."
2,31,Blanche De Chambly,22,CA,Belgian Witbier,Year-round,5,f,1387,3.883201,3.834715,3.886806,3.816510,3.960707,3.882617,Unibroue,2,"[0.6290555815125006, -0.17287367775654713, -0...."
3,34,La Fin Du Monde,22,CA,Belgian Tripel,Year-round,9,f,4181,4.248625,4.251614,4.381069,4.287491,4.317388,4.320847,Unibroue,2,"[0.5358773284349162, -0.38704074505242575, -0...."
4,44,Lager,155,CA,German Pilsner,Year-round,5,f,66,3.363636,3.155303,3.537879,3.431818,3.621212,3.441970,Upper Canada Brewing Company,14,"[0.819409332165983, -1.2123827897278476, -3.18..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
102406,371149,Gummy Tron,38311,US,American IPA,Limited (brewed once),7,f,4,3.875000,3.937500,3.937500,4.000000,3.937500,3.940000,Noon Whistle Brewing,38,"[-1.3657593978265683, -3.143520730872862, -1.7..."
102407,371163,More Brewing Co. Oktoberfest,49747,US,German Märzen / Oktoberfest,Fall,6,f,4,3.687500,3.625000,3.687500,3.562500,3.687500,3.662500,More Brewing Co.,14,"[0.2504266529593981, -0.5529632637013648, -2.7..."
102408,371194,EXPO CHGO,28178,US,American IPA,Limited (brewed once),6,f,4,4.250000,4.250000,4.312500,4.125000,4.187500,4.252500,Pipeworks Brewing Company,38,"[-1.430364003627926, -3.266723212875886, -1.87..."
102409,371438,Pete's Secret Stache,48813,US,New England IPA,Rotating,6,f,5,4.300000,4.300000,4.250000,4.250000,4.300000,4.276000,Revision Brewing Company,33,"[0.4917143096231401, -3.2187608302697877, -3.0..."


In [77]:
df.to_csv("processed_data/embedded_list.csv", header=True, index=False)