In [1]:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SparkSession

import scala.collection.breakOut
import scala.collection.mutable._
import scala.math

In [2]:
val spark = SparkSession.builder()
    .master("local[*]")
    .config("spark.executor.cores", "6")
    .appName("tfIdf")
    .getOrCreate()

spark = org.apache.spark.sql.SparkSession@7dd2a32


org.apache.spark.sql.SparkSession@7dd2a32

In [3]:
import spark.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

In [4]:
val options = Map(
    "header" -> "true", "sep" -> ",", "inferSchema" -> "true")
val reviews = spark.read
    .options(options)
    .csv("./tripadvisor_hotel_reviews.csv").select("Review")
reviews.show(5, 80, false)

+--------------------------------------------------------------------------------+
|                                                                          Review|
+--------------------------------------------------------------------------------+
|nice hotel expensive parking got good deal stay hotel anniversary, arrived la...|
|ok nothing special charge diamond member hilton decided chain shot 20th anniv...|
|nice rooms not 4* experience hotel monaco seattle good hotel n't 4* level.pos...|
|unique, great stay, wonderful time hotel monaco, location excellent short str...|
|great stay great stay, went seahawk game awesome, downfall view building did ...|
+--------------------------------------------------------------------------------+
only showing top 5 rows



options = Map(inferSchema -> true, sep -> ,, header -> true)
reviews = [Review: string]


[Review: string]

In [5]:
// drop empty row
val existReviews = reviews.na.drop(Seq("Review"))
existReviews.show(5, 80, false)
existReviews.getClass

+--------------------------------------------------------------------------------+
|                                                                          Review|
+--------------------------------------------------------------------------------+
|nice hotel expensive parking got good deal stay hotel anniversary, arrived la...|
|ok nothing special charge diamond member hilton decided chain shot 20th anniv...|
|nice rooms not 4* experience hotel monaco seattle good hotel n't 4* level.pos...|
|unique, great stay, wonderful time hotel monaco, location excellent short str...|
|great stay great stay, went seahawk game awesome, downfall view building did ...|
+--------------------------------------------------------------------------------+
only showing top 5 rows



existReviews = [Review: string]


class org.apache.spark.sql.Dataset

In [6]:
// get review in lowercase
val lowerReviews = existReviews.select(lower(col("Review")) as "Review")
lowerReviews.show(5, 80, false)

+--------------------------------------------------------------------------------+
|                                                                          Review|
+--------------------------------------------------------------------------------+
|nice hotel expensive parking got good deal stay hotel anniversary, arrived la...|
|ok nothing special charge diamond member hilton decided chain shot 20th anniv...|
|nice rooms not 4* experience hotel monaco seattle good hotel n't 4* level.pos...|
|unique, great stay, wonderful time hotel monaco, location excellent short str...|
|great stay great stay, went seahawk game awesome, downfall view building did ...|
+--------------------------------------------------------------------------------+
only showing top 5 rows



lowerReviews = [Review: string]


[Review: string]

In [7]:
// split review on words
val splitReviews = lowerReviews.select(split(col("Review"), "[^a-zA-z0-9*']{1,}") as "Review").as[Seq[String]]
splitReviews.show(5, 80, false)

+--------------------------------------------------------------------------------+
|                                                                          Review|
+--------------------------------------------------------------------------------+
|[nice, hotel, expensive, parking, got, good, deal, stay, hotel, anniversary, ...|
|[ok, nothing, special, charge, diamond, member, hilton, decided, chain, shot,...|
|[nice, rooms, not, 4*, experience, hotel, monaco, seattle, good, hotel, n't, ...|
|[unique, great, stay, wonderful, time, hotel, monaco, location, excellent, sh...|
|[great, stay, great, stay, went, seahawk, game, awesome, downfall, view, buil...|
+--------------------------------------------------------------------------------+
only showing top 5 rows



splitReviews = [Review: array<string>]


[Review: array<string>]

In [8]:
val numWordsInReview = splitReviews.withColumn("lenReview", size(col("Review")))
numWordsInReview.show(5, 80, false)

+--------------------------------------------------------------------------------+---------+
|                                                                          Review|lenReview|
+--------------------------------------------------------------------------------+---------+
|[nice, hotel, expensive, parking, got, good, deal, stay, hotel, anniversary, ...|       89|
|[ok, nothing, special, charge, diamond, member, hilton, decided, chain, shot,...|      257|
|[nice, rooms, not, 4*, experience, hotel, monaco, seattle, good, hotel, n't, ...|      225|
|[unique, great, stay, wonderful, time, hotel, monaco, location, excellent, sh...|       91|
|[great, stay, great, stay, went, seahawk, game, awesome, downfall, view, buil...|      192|
+--------------------------------------------------------------------------------+---------+
only showing top 5 rows



numWordsInReview = [Review: array<string>, lenReview: int]


[Review: array<string>, lenReview: int]

In [9]:
val wordFreqInReview = splitReviews.rdd.map(row => {
    val wordFreq = row.foldLeft(new HashMap[String, Int]()) {
        (map, word) => {
            map += word -> (map.getOrElse(word, 0) + 1)
            map
        }
    }
    wordFreq
})

wordFreqInReview = MapPartitionsRDD[33] at map at <console>:46


MapPartitionsRDD[33] at map at <console>:46

In [10]:
val wordFreqInReviews = wordFreqInReview
    .flatMap(_.keySet)
    .map((_, 1))
    .reduceByKey(_ + _)

wordFreqInReviews = ShuffledRDD[36] at reduceByKey at <console>:49


ShuffledRDD[36] at reduceByKey at <console>:49

In [11]:
val numWords: Int = 100
val ordering = Ordering.by[(String, Int), Int](_._2)
val topWordFreqInReviews = wordFreqInReviews.top(numWords)(ordering)

numWords = 100
ordering = scala.math.Ordering$$anon$9@51a40604
topWordFreqInReviews = Array(("",20491), (hotel,16399), (room,14201), (not,12155), (staff,11766), (great,11126), (stay,10192), (good,9402), (stayed,8569), (rooms,8532), (n't,8378), (location,8359), (clean,7793), (just,7767), (nice,7499), (breakfast,7408), (did,7243), (no,6895), (night,6688), (service,6380), (time,6283), (friendly,5993), (day,5968), (food,5609), (like,5399), (place,5366), (helpful,5268), (really,4893), (small,4886), (the,4758), (walk,4744), (excellent,4528), (2,4495), (little,4415), (bathroom,4357), (best,4303), (area,4259), (recommend,4252), (got,4175), (people,4158), (pool,4101), (5,4009), (restaurants,3991), (beach,3972), (comfortable,3951), (3...


Array(("",20491), (hotel,16399), (room,14201), (not,12155), (staff,11766), (great,11126), (stay,10192), (good,9402), (stayed,8569), (rooms,8532), (n't,8378), (location,8359), (clean,7793), (just,7767), (nice,7499), (breakfast,7408), (did,7243), (no,6895), (night,6688), (service,6380), (time,6283), (friendly,5993), (day,5968), (food,5609), (like,5399), (place,5366), (helpful,5268), (really,4893), (small,4886), (the,4758), (walk,4744), (excellent,4528), (2,4495), (little,4415), (bathroom,4357), (best,4303), (area,4259), (recommend,4252), (got,4175), (people,4158), (pool,4101), (5,4009), (restaurants,3991), (beach,3972), (comfortable,3951), (3...

In [12]:
val numReview = wordFreqInReview.count().toDouble
val idfs = topWordFreqInReviews.map{case (word, count) =>
    (word, math.log(numReview / count))    
}.toMap

numReview = 20491.0
idfs = Map(beautiful -> 1.7868425837289796, shower -> 2.1194179939457767, "" -> 0.0, buffet -> 2.07064717943434, recommend -> 1.572596304874994, hotels -> 1.816713206143153, people -> 1.5949515759188744, city -> 1.8950561683692124, area -> 1.570951374413619, booked -> 1.8543380753504273, nights -> 1.6523646695004262, stayed -> 0.8718347256677137, bit -> 1.933446057920855, location -> 0.8966469626376764, 4 -> 1.8931100114137265, got -> 1.5908714070518768, want -> 1.8713142768138489, best -> 1.560673311498233, fantastic -> 2.1554094691672194, good -> 0.7790633327527566, hotel -> 0.22276540799338684, problem -> 2.154146576976639, big -> 2.116172554991653, going -> 2.0652438471062875, excellent -> 1.5097054244538135,...


Map(beautiful -> 1.7868425837289796, shower -> 2.1194179939457767, "" -> 0.0, buffet -> 2.07064717943434, recommend -> 1.572596304874994, hotels -> 1.816713206143153, people -> 1.5949515759188744, city -> 1.8950561683692124, area -> 1.570951374413619, booked -> 1.8543380753504273, nights -> 1.6523646695004262, stayed -> 0.8718347256677137, bit -> 1.933446057920855, location -> 0.8966469626376764, 4 -> 1.8931100114137265, got -> 1.5908714070518768, want -> 1.8713142768138489, best -> 1.560673311498233, fantastic -> 2.1554094691672194, good -> 0.7790633327527566, hotel -> 0.22276540799338684, problem -> 2.154146576976639, big -> 2.116172554991653, going -> 2.0652438471062875, excellent -> 1.5097054244538135,...

In [13]:
val wordToIndex = idfs.keys.zipWithIndex.toMap
val indexToWord = idfs.keys.map(key => (wordToIndex(key), key)).toMap

wordToIndex = Map(beautiful -> 0, shower -> 1, "" -> 2, buffet -> 3, recommend -> 4, hotels -> 5, people -> 6, city -> 7, area -> 8, booked -> 9, nights -> 10, stayed -> 11, bit -> 12, location -> 13, 4 -> 14, got -> 15, want -> 16, best -> 17, fantastic -> 18, good -> 19, hotel -> 20, problem -> 21, big -> 22, going -> 23, excellent -> 24, bathroom -> 25, price -> 26, wonderful -> 27, clean -> 28, comfortable -> 29, just -> 30, trip -> 31, night -> 32, walk -> 33, 5 -> 34, quiet -> 35, 10 -> 36, perfect -> 37, walking -> 38, quite -> 39, nice -> 40, resort -> 41, restaurant -> 42, way -> 43, desk -> 44, service -> 45, rooms -> 46, bed -> 47, did -> 48, small -> 49, water -> 50, minutes -> 51, experience -> 52, need -> 53, check -> 54, restaur...


Map(beautiful -> 0, shower -> 1, "" -> 2, buffet -> 3, recommend -> 4, hotels -> 5, people -> 6, city -> 7, area -> 8, booked -> 9, nights -> 10, stayed -> 11, bit -> 12, location -> 13, 4 -> 14, got -> 15, want -> 16, best -> 17, fantastic -> 18, good -> 19, hotel -> 20, problem -> 21, big -> 22, going -> 23, excellent -> 24, bathroom -> 25, price -> 26, wonderful -> 27, clean -> 28, comfortable -> 29, just -> 30, trip -> 31, night -> 32, walk -> 33, 5 -> 34, quiet -> 35, 10 -> 36, perfect -> 37, walking -> 38, quite -> 39, nice -> 40, resort -> 41, restaurant -> 42, way -> 43, desk -> 44, service -> 45, rooms -> 46, bed -> 47, did -> 48, small -> 49, water -> 50, minutes -> 51, experience -> 52, need -> 53, check -> 54, restaur...

In [14]:
val tfIdfReviews = wordFreqInReview.map(wordFreqs => {
    val wordTotalInReview = wordFreqs.values.sum
    val wordScore = wordFreqs.filter{
        case (word, freq) => wordToIndex.contains(word)
    }.map{
        case (word, freq) => (wordToIndex(word), idfs(word) * wordFreqs(word) / wordTotalInReview)
    }.toSeq
    Vectors.sparse(wordToIndex.size, wordScore).toArray
})

tfIdfReviews = MapPartitionsRDD[38] at map at <console>:50


MapPartitionsRDD[38] at map at <console>:50

In [15]:
tfIdfReviews.toDF.show(5,100, false)

+----------------------------------------------------------------------------------------------------+
|                                                                                               value|
+----------------------------------------------------------------------------------------------------+
|[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.01792080422380758, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0100746849734...|
|[0.0, 0.0, 0.0, 0.0, 0.0, 0.014137845962203525, 0.0, 0.0, 0.0, 0.007215323250390767, 0.0, 0.0, 0....|
|[0.0, 0.009419635528647897, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.01...|
|[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.03452640383326635, 0.0, 0.0, 0.0, 0.0, 0.0098532633256...|
|[0.0, 0.0, 0.0, 0.0, 0.00819060575455726, 0.009462047948662256, 0.0, 0.009870084210256315, 0.0081...|
+----------------------------------------------------------------------------------------------------+
only showing top 5 rows

