In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

!tar -xvf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.master("local[*]").appName("hw4_tf_idf").getOrCreate()

In [17]:
spark

In [6]:
data = spark.read.option("header", "true") \
      .option("inferSchema", "true") \
      .option("sep", ",") \
      .csv("/content/tripadvisor_hotel_reviews.csv"
      )

In [7]:
data.show(10)

+--------------------+------+
|              Review|Rating|
+--------------------+------+
|nice hotel expens...|     4|
|ok nothing specia...|     2|
|nice rooms not 4*...|     3|
|unique, great sta...|     5|
|great stay great ...|     5|
|love monaco staff...|     5|
|cozy stay rainy c...|     5|
|excellent staff, ...|     4|
|hotel stayed hote...|     5|
|excellent stayed ...|     5|
+--------------------+------+
only showing top 10 rows



In [8]:
df = data.select("Review")
df.show(10)

+--------------------+
|              Review|
+--------------------+
|nice hotel expens...|
|ok nothing specia...|
|nice rooms not 4*...|
|unique, great sta...|
|great stay great ...|
|love monaco staff...|
|cozy stay rainy c...|
|excellent staff, ...|
|hotel stayed hote...|
|excellent stayed ...|
+--------------------+
only showing top 10 rows



#### 1. Delete special symbols

In [9]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import col

In [10]:
df1 = data.na.drop("all").withColumn("Review", regexp_replace(col("Review"), "[^a-z0-9 ]", ""))

In [11]:
df1.show(10)

+--------------------+------+
|              Review|Rating|
+--------------------+------+
|nice hotel expens...|     4|
|ok nothing specia...|     2|
|nice rooms not 4 ...|     3|
|unique great stay...|     5|
|great stay great ...|     5|
|love monaco staff...|     5|
|cozy stay rainy c...|     5|
|excellent staff h...|     4|
|hotel stayed hote...|     5|
|excellent stayed ...|     5|
+--------------------+------+
only showing top 10 rows



#### 2. Bring to lower case

In [12]:
from pyspark.sql.functions import lower

In [13]:
df2 = df1.withColumn("Review", lower(col("Review")))
df2.show(10)

+--------------------+------+
|              Review|Rating|
+--------------------+------+
|nice hotel expens...|     4|
|ok nothing specia...|     2|
|nice rooms not 4 ...|     3|
|unique great stay...|     5|
|great stay great ...|     5|
|love monaco staff...|     5|
|cozy stay rainy c...|     5|
|excellent staff h...|     4|
|hotel stayed hote...|     5|
|excellent stayed ...|     5|
+--------------------+------+
only showing top 10 rows



#### 3. Count word frequency

In [14]:
# from pyspark.sql.functions import split

In [15]:
from pyspark.sql.functions import *

In [16]:
df3 = df2.withColumn("Review", split(col("Review"), " ")).withColumn("id", monotonically_increasing_id())
df3.show(10)

+--------------------+------+---+
|              Review|Rating| id|
+--------------------+------+---+
|[nice, hotel, exp...|     4|  0|
|[ok, nothing, spe...|     2|  1|
|[nice, rooms, not...|     3|  2|
|[unique, great, s...|     5|  3|
|[great, stay, gre...|     5|  4|
|[love, monaco, st...|     5|  5|
|[cozy, stay, rain...|     5|  6|
|[excellent, staff...|     4|  7|
|[hotel, stayed, h...|     5|  8|
|[excellent, staye...|     5|  9|
+--------------------+------+---+
only showing top 10 rows



In [17]:
df4=df3.select("id", explode(col("Review")))
df4 = df4.withColumnRenamed("col", "word")

In [18]:
df4.show(10)

+---+-----------+
| id|       word|
+---+-----------+
|  0|       nice|
|  0|      hotel|
|  0|  expensive|
|  0|    parking|
|  0|        got|
|  0|       good|
|  0|       deal|
|  0|       stay|
|  0|      hotel|
|  0|anniversary|
+---+-----------+
only showing top 10 rows



In [19]:
word_freq = df4.groupBy("id","word").agg(count("word"))

In [20]:
word_freq.show(10)

+---+----------+-----------+
| id|      word|count(word)|
+---+----------+-----------+
|  0|      room|          3|
|  1|    better|          2|
|  6|attractive|          1|
|  6|  positive|          1|
|  7| concierge|          2|
| 10|        nt|          2|
| 12|     clean|          1|
| 12|   concert|          1|
| 15|      stay|          2|
| 16|      desk|          6|
+---+----------+-----------+
only showing top 10 rows



One review example

In [21]:
word_freq.filter(word_freq.id.isin(0)).show()

+---+-----------+-----------+
| id|       word|count(word)|
+---+-----------+-----------+
|  0|       room|          3|
|  0|      night|          2|
|  0|       took|          1|
|  0|        bed|          1|
|  0|      taken|          1|
|  0|     people|          1|
|  0|    staying|          1|
|  0|      music|          1|
|  0|       high|          1|
|  0|        got|          1|
|  0|      doors|          1|
|  0|   products|          1|
|  0|       woke|          1|
|  0|   location|          1|
|  0|        not|          2|
|  0|   previous|          1|
|  0|    pillows|          1|
|  0|        did|          2|
|  0|       deal|          1|
|  0|anniversary|          1|
+---+-----------+-----------+
only showing top 20 rows



In [22]:
num_words = word_freq.groupBy("id").agg(count(col("word")))
num_words = num_words.withColumnRenamed("count(word)", "words")

In [23]:
num_words.show(10)

+----+-----+
|  id|words|
+----+-----+
| 474|   98|
|1677|  100|
|2214|  101|
|2250|  128|
|2453|   94|
|2927|  477|
|3091|  373|
|3506|   29|
|4590|   98|
|5409|   52|
+----+-----+
only showing top 10 rows



In [24]:
word_freq = word_freq.join(num_words, "id")
word_freq.show(10)

+----+------------+-----------+-----+
|  id|        word|count(word)|words|
+----+------------+-----------+-----+
| 474|       great|          3|   98|
| 474|        rate|          2|   98|
| 474|        look|          1|   98|
|1677|        walk|          2|  100|
|1677|destinations|          1|  100|
|2214|       staff|          2|  101|
|2250|       miami|          1|  128|
|2453|   incapable|          1|   94|
|2927|    trolleys|          1|  477|
|2927|       makes|          1|  477|
+----+------------+-----------+-----+
only showing top 10 rows



In [25]:
word_freq = word_freq.select(col("id"), col("word"), (col("count(word)") / col("words")))
word_freq.show(10)

+----+------------+---------------------+
|  id|        word|(count(word) / words)|
+----+------------+---------------------+
| 474|       great| 0.030612244897959183|
| 474|        rate|  0.02040816326530612|
| 474|        look|  0.01020408163265306|
|1677|        walk|                 0.02|
|1677|destinations|                 0.01|
|2214|       staff| 0.019801980198019802|
|2250|       miami|            0.0078125|
|2453|   incapable| 0.010638297872340425|
|2927|    trolleys| 0.002096436058700...|
|2927|       makes| 0.002096436058700...|
+----+------------+---------------------+
only showing top 10 rows



In [26]:
word_freq = word_freq.withColumnRenamed("(count(word) / words)", "word_freq")

#### 4. Count number of documents with word

In [27]:
doc_freq = word_freq.groupBy("word").agg(countDistinct("id"))

In [28]:
doc_freq.show(10)

+---------+---------+
|     word|count(id)|
+---------+---------+
|   travel|     1330|
|   online|      360|
|    pools|      819|
|traveling|      436|
|  jewelry|       56|
|    spoil|       59|
|  barrier|      163|
|standards|      577|
|     jamb|        2|
|     hope|      540|
+---------+---------+
only showing top 10 rows



In [29]:
doc_freq = doc_freq.withColumnRenamed("count(id)", "num_docs")

#### 5. Top100 most frequent

In [29]:
doc_freq100 = doc_freq.orderBy(desc("num_docs")).limit(100)

In [30]:
doc_freq100.show(10)

+------+--------+
|  word|num_docs|
+------+--------+
|      |   20491|
| hotel|   16321|
|  room|   14053|
|   not|   12123|
| staff|   11522|
| great|   11020|
|  stay|   10095|
|  good|    9277|
|stayed|    8549|
|    nt|    8379|
+------+--------+
only showing top 10 rows



#### 6. Join and TD-IDF calc

In [30]:
tf_table = word_freq.join(doc_freq, "word")

In [31]:
tf_table.show(10)

+---------+-----+--------------------+--------+
|     word|   id|           word_freq|num_docs|
+---------+-----+--------------------+--------+
|   travel|11190|0.006944444444444444|    1330|
|    pools|11190|0.006944444444444444|     819|
| lacrosse|  474| 0.02040816326530612|       1|
|traveling| 2040|0.011363636363636364|     436|
| everyday| 7225|               0.008|     514|
| medasian|13401|0.005714285714285714|       1|
|     hope| 2927|0.002096436058700...|     540|
|    pools| 1697| 0.00510204081632653|     819|
|   travel| 9945|              0.0125|    1330|
|   travel| 8484|0.010752688172043012|    1330|
+---------+-----+--------------------+--------+
only showing top 10 rows



In [34]:
df1.count()

20491

In [36]:
tf_table2 = tf_table.select(col("id"), col("word"), (col("word_freq") * log(20491 / col("num_docs"))))
tf_table2=tf_table2.withColumnRenamed("(word_freq * ln((20491 / num_docs)))", "tf_idf")

In [40]:
tf_table2.show(10)

+-----+---------+--------------------+
|   id|     word|              tf_idf|
+-----+---------+--------------------+
|11190|   travel|0.018991714049451616|
|11190|    pools|0.022358728892248356|
|  474| lacrosse| 0.20260696008850676|
| 2040|traveling| 0.04375112273849771|
| 7225| everyday| 0.02948414223105334|
|13401| medasian|  0.0567299488247819|
| 2927|     hope|0.007623001896810...|
| 1697|    pools|0.016426821226957974|
| 9945|   travel|0.034185085289012915|
| 8484|   travel|0.029406524979796057|
+-----+---------+--------------------+
only showing top 10 rows



#### 7. Pivot table

In [44]:
spark.conf.set("spark.sql.pivotMaxValues", 100000)

In [49]:
tf_table_small = tf_table2.limit(100) ##Colab out of memory with full table

In [53]:
pivot_result = tf_table_small.groupBy("id")
pivot_result = pivot_result.pivot("word").max("tf_idf").na.fill(0.0)

In [54]:
pivot_result.show(10)

+----------+---+--------+-------+------+------+---------------+-------+--------+---------+-------------------+-----+-----------+------+-----------+--------------------+---------+-----+-----+----+-------+----------+-----+----------+------+------+--------------------+---------+---------------+-----+---------+----------+------+-----+--------------------+-------------------+------+------+
|        id|675|barbizon|barrier|biting|brands|breakfastdinner|bricked|cautious|connected|           everyday|exept|formalities|gloria|handicapped|                hope|indicator|inner|input|jamb|jewelry|lifeguards|monte|occidental|online|peolpe|               pools|recognize|reconditionning|spoil|standards|strawberry|tothis|trail|              travel|          traveling|voyage|waters|
+----------+---+--------+-------+------+------+---------------+-------+--------+---------+-------------------+-----+-----------+------+-----------+--------------------+---------+-----+-----+----+-------+----------+-----+----