In [1]:
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window

Intitializing Scala interpreter ...

Spark Web UI available at http://a951015de27f:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1636913673132)
SparkSession available as 'spark'


import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window


In [2]:
var df = spark.read.option("header", "true")
                    .option("sep", ",")
                    .csv("../data/tripadvisor_hotel_reviews.csv")
df.show(5)

+--------------------+------+
|              Review|Rating|
+--------------------+------+
|nice hotel expens...|     4|
|ok nothing specia...|     2|
|nice rooms not 4*...|     3|
|unique, great sta...|     5|
|great stay great ...|     5|
+--------------------+------+
only showing top 5 rows



df: org.apache.spark.sql.DataFrame = [Review: string, Rating: string]


In [3]:
df = df.withColumn("review", lower(col("Review")));
df.show(5);

+--------------------+------+
|              review|Rating|
+--------------------+------+
|nice hotel expens...|     4|
|ok nothing specia...|     2|
|nice rooms not 4*...|     3|
|unique, great sta...|     5|
|great stay great ...|     5|
+--------------------+------+
only showing top 5 rows



df: org.apache.spark.sql.DataFrame = [review: string, Rating: string]


In [4]:
df.count

res2: Long = 20491


In [5]:
df.na.drop("all")
df.count

res3: Long = 20491


In [6]:
df = df
  .withColumn("review", regexp_replace(col("review"), "[^a-z0-9 ]", ""))
  .withColumn("doc_id", row_number().over(Window.orderBy("Rating"))).select(col("review"),col("doc_id"))
df.show(5)

+--------------------+------+
|              review|doc_id|
+--------------------+------+
|horrible customer...|     1|
|noise airconditio...|     2|
|bad choice booked...|     3|
|hated inn terribl...|     4|
|ace grunge lives ...|     5|
+--------------------+------+
only showing top 5 rows



df: org.apache.spark.sql.DataFrame = [review: string, doc_id: int]


In [7]:
val doc2word = df.withColumn("word", explode(split(trim(col("review")), " {1,}")))
doc2word.show(5)

+--------------------+------+--------+
|              review|doc_id|    word|
+--------------------+------+--------+
|horrible customer...|     1|horrible|
|horrible customer...|     1|customer|
|horrible customer...|     1| service|
|horrible customer...|     1|   hotel|
|horrible customer...|     1|    stay|
+--------------------+------+--------+
only showing top 5 rows



doc2word: org.apache.spark.sql.DataFrame = [review: string, doc_id: int ... 1 more field]


In [8]:
val review_len = doc2word.groupBy("doc_id").agg(count(col("doc_id")).as("len"))
review_len.show(5)

+------+---+
|doc_id|len|
+------+---+
|     1|214|
|     2| 86|
|     3|118|
|     4| 16|
|     5| 14|
+------+---+
only showing top 5 rows



review_len: org.apache.spark.sql.DataFrame = [doc_id: int, len: bigint]


In [9]:
val word_count = doc2word.groupBy("doc_id", "word")
                    .agg(count(col("word"))
                    .as("count"))
word_count.show(5)

+------+--------+-----+
|doc_id|    word|count|
+------+--------+-----+
|     1|horrible|    1|
|     1|customer|    1|
|     1| service|    1|
|     1|   hotel|    4|
|     1|    stay|    2|
+------+--------+-----+
only showing top 5 rows



word_count: org.apache.spark.sql.DataFrame = [doc_id: int, word: string ... 1 more field]


In [10]:
val tf = review_len
  .join(word_count, Seq("doc_id"), "inner")
  .withColumn("tf", col("count") / col("len"))
tf.show(5)

+------+---+--------+-----+--------------------+
|doc_id|len|    word|count|                  tf|
+------+---+--------+-----+--------------------+
|     1|214|horrible|    1|0.004672897196261682|
|     1|214|customer|    1|0.004672897196261682|
|     1|214| service|    1|0.004672897196261682|
|     1|214|   hotel|    4|0.018691588785046728|
|     1|214|    stay|    2|0.009345794392523364|
+------+---+--------+-----+--------------------+
only showing top 5 rows



tf: org.apache.spark.sql.DataFrame = [doc_id: int, len: bigint ... 3 more fields]


In [11]:
val top100 = word_count.groupBy("word")
    .agg(count(col("word"))
    .as("count"))
    .orderBy(desc("count"))
    .limit(100)
    .withColumn("review_count", lit(review_len.count()))
top100.show(10)

+------+-----+------------+
|  word|count|review_count|
+------+-----+------------+
| hotel|16321|       20491|
|  room|14053|       20491|
|   not|12123|       20491|
| staff|11522|       20491|
| great|11020|       20491|
|  stay|10095|       20491|
|  good| 9277|       20491|
|stayed| 8549|       20491|
|    nt| 8379|       20491|
| rooms| 8338|       20491|
+------+-----+------------+
only showing top 10 rows



top100: org.apache.spark.sql.DataFrame = [word: string, count: bigint ... 1 more field]


In [12]:
val idf = top100
  .withColumn("idf", log(col("review_count") / col("count")))
idf.show(5)

+-----+-----+------------+-------------------+
| word|count|review_count|                idf|
+-----+-----+------------+-------------------+
|hotel|16321|       20491| 0.2275331431861368|
| room|14053|       20491|0.37714986923608756|
|  not|12123|       20491| 0.5248812905902491|
|staff|11522|       20491| 0.5757275140445041|
|great|11020|       20491| 0.6202739616299271|
+-----+-----+------------+-------------------+
only showing top 5 rows



idf: org.apache.spark.sql.DataFrame = [word: string, count: bigint ... 2 more fields]


In [13]:
val tf_idf = tf
  .join(idf, Seq("word"), joinType = "inner")
  .withColumn("tf_idf", (col("tf") * col("idf")))
tf_idf.show(5)

+-------+------+---+-----+--------------------+-----+------------+------------------+--------------------+
|   word|doc_id|len|count|                  tf|count|review_count|               idf|              tf_idf|
+-------+------+---+-----+--------------------+-----+------------+------------------+--------------------+
|service|     1|214|    1|0.004672897196261682| 6228|       20491|1.1909305113829436|0.005565095847583...|
|  hotel|     1|214|    4|0.018691588785046728|16321|       20491|0.2275331431861368|0.004252955947404426|
|   stay|     1|214|    2|0.009345794392523364|10095|       20491|0.7079455135898947| 0.00661631321112051|
|      3|     1|214|    1|0.004672897196261682| 3375|       20491|1.8035904410302024|0.008427992715094403|
|   free|     1|214|    1|0.004672897196261682| 3065|       20491|1.8999381959665207|0.008878215869002433|
+-------+------+---+-----+--------------------+-----+------------+------------------+--------------------+
only showing top 5 rows



tf_idf: org.apache.spark.sql.DataFrame = [word: string, doc_id: int ... 7 more fields]


In [14]:
val pivot_tf_idf = tf_idf
  .groupBy(col("doc_id"))
  .pivot("word")
  .sum("tf_idf")
  .withColumnRenamed("sum", "tf_idf")
  .na.fill(0)

pivot_tf_idf: org.apache.spark.sql.DataFrame = [doc_id: int, 2: double ... 99 more fields]


In [15]:
pivot_tf_idf.show(2, 20, true)

-RECORD 0---------------------------
 doc_id      | 1                    
 2           | 0.0                  
 3           | 0.008427992715094403 
 4           | 0.0                  
 5           | 0.0                  
 area        | 0.0                  
 arrived     | 0.0                  
 away        | 0.0                  
 bar         | 0.0                  
 bathroom    | 0.0                  
 beach       | 0.0                  
 beautiful   | 0.0                  
 bed         | 0.00790212954030082  
 beds        | 0.0                  
 best        | 0.0                  
 better      | 0.0                  
 big         | 0.0                  
 bit         | 0.0                  
 booked      | 0.00871786664290698  
 breakfast   | 0.0                  
 buffet      | 0.0                  
 city        | 0.0                  
 clean       | 0.0                  
 close       | 0.0                  
 comfortable | 0.0                  
 day         | 0.0                  
 