In [None]:
spark

In [1]:
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._

Intitializing Scala interpreter ...

Spark Web UI available at http://bf2c77dcebef:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1635377626546)
SparkSession available as 'spark'


import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.types._


In [2]:
val spark = SparkSession.builder()
    .master("local[*]")
    .appName("hw4")
    .getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5964df80


In [3]:
var data = spark.read.option("header", "true")
    .option("inferSchema", "true")
    .option("sep", ",").csv("/mnt/data/tripadvisor_hotel_reviews.csv")

data = data.select("Review")

data: org.apache.spark.sql.DataFrame = [Review: string]
data: org.apache.spark.sql.DataFrame = [Review: string]


In [4]:
data.show(5)

+--------------------+
|              Review|
+--------------------+
|nice hotel expens...|
|ok nothing specia...|
|nice rooms not 4*...|
|unique, great sta...|
|great stay great ...|
+--------------------+
only showing top 5 rows



In [6]:
data.printSchema

root
 |-- Review: string (nullable = true)



In [7]:
data.count

res3: Long = 20491


In [8]:
val dataWithoutNan = data.na.drop("all")

dataWithoutNan: org.apache.spark.sql.DataFrame = [Review: string]


# Приведение к нижнему регистру

In [12]:
val lowerData = dataWithoutNan.withColumn("Review", lower(col("Review")))

lowerData: org.apache.spark.sql.DataFrame = [Review: string]


In [13]:
lowerData.show(5)

+--------------------+
|              Review|
+--------------------+
|nice hotel expens...|
|ok nothing specia...|
|nice rooms not 4*...|
|unique, great sta...|
|great stay great ...|
+--------------------+
only showing top 5 rows



# Деленеи на слова

In [14]:
val words = lowerData.withColumn("Review", split(col("Review"), "\\W+"))

words: org.apache.spark.sql.DataFrame = [Review: array<string>]


In [15]:
words.show(10)

+--------------------+
|              Review|
+--------------------+
|[nice, hotel, exp...|
|[ok, nothing, spe...|
|[nice, rooms, not...|
|[unique, great, s...|
|[great, stay, gre...|
|[love, monaco, st...|
|[cozy, stay, rain...|
|[excellent, staff...|
|[hotel, stayed, h...|
|[excellent, staye...|
+--------------------+
only showing top 10 rows



In [16]:
val documents = words.withColumn("doc_id", monotonically_increasing_id())

documents: org.apache.spark.sql.DataFrame = [Review: array<string>, doc_id: bigint]


In [18]:
documents.show(4)

+--------------------+------+
|              Review|doc_id|
+--------------------+------+
|[nice, hotel, exp...|     0|
|[ok, nothing, spe...|     1|
|[nice, rooms, not...|     2|
|[unique, great, s...|     3|
+--------------------+------+
only showing top 4 rows



# Расчёт частоты слов в предложении

In [22]:
documents.registerTempTable("documents")

val wordDoc = spark.sql("""
    select word, doc_id from documents lateral view explode(Review) tab as word where rlike(word, '[a-z-]{2,}')
""")

wordDoc: org.apache.spark.sql.DataFrame = [word: string, doc_id: bigint]


In [23]:
wordDoc.show(6)

+---------+------+
|     word|doc_id|
+---------+------+
|     nice|     0|
|    hotel|     0|
|expensive|     0|
|  parking|     0|
|      got|     0|
|     good|     0|
+---------+------+
only showing top 6 rows



In [24]:
val docLength = wordDoc.groupBy("doc_id").agg(count(col("word")).as("total_words")).cache()

docLength: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [doc_id: bigint, total_words: bigint]


In [25]:
docLength.show(5)

+------+-----------+
|doc_id|total_words|
+------+-----------+
|    26|         42|
|    29|         34|
|   474|        112|
|   964|         86|
|  1677|        120|
+------+-----------+
only showing top 5 rows



In [26]:
val wordFreq = wordDoc.groupBy("word", "doc_id").count().orderBy(desc("count")).cache()

wordFreq: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [word: string, doc_id: bigint ... 1 more field]


In [27]:
wordFreq.show(5)

+-----+-----------+-----+
| word|     doc_id|count|
+-----+-----------+-----+
|  the|17179871450|   46|
|  did|25769803883|   37|
|hotel|25769806217|   35|
|hotel|25769806595|   34|
| food|25769804737|   33|
+-----+-----------+-----+
only showing top 5 rows



In [31]:
val tf = wordFreq.join(docLength, "doc_id")
        .select(col("doc_id"), col("word"), (col("count") / col("total_words")).as("tf"))

tf: org.apache.spark.sql.DataFrame = [doc_id: bigint, word: string ... 1 more field]


In [32]:
tf.show(5)

+------+--------+--------------------+
|doc_id|    word|                  tf|
+------+--------+--------------------+
|    26|shopping|0.023809523809523808|
|    26|  anywho|0.023809523809523808|
|    26|   clean|0.023809523809523808|
|    26|    stay|0.023809523809523808|
|    26|    vibe|0.023809523809523808|
+------+--------+--------------------+
only showing top 5 rows



# Расчёт количества документов со словом и выбор 100 самых встречаемых

In [39]:
val totalDoc = wordDoc.select(countDistinct("doc_id")).first().getLong(0)

totalDoc: Long = 20491


In [40]:
val topNFreq = 100

topNFreq: Int = 100


In [41]:
val docFreq = wordDoc.groupBy("word").agg(countDistinct("doc_id").as("df"))
    .orderBy(desc("df"))
    .limit(topNFreq).withColumn("total_doc", lit(totalDoc))

docFreq: org.apache.spark.sql.DataFrame = [word: string, df: bigint ... 1 more field]


In [42]:
docFreq.show(5)

+-----+-----+---------+
| word|   df|total_doc|
+-----+-----+---------+
|hotel|16400|    20491|
| room|14202|    20491|
|  not|12155|    20491|
|staff|11768|    20491|
|great|11127|    20491|
+-----+-----+---------+
only showing top 5 rows



# Расчёт Tf-Idf

In [43]:
val tfIdf = tf.join(docFreq, "word").select(col("doc_id"), col("word"), (col("tf") * log(col("total_doc") / col("df"))).as("tf_idf")).cache()

tfIdf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [doc_id: bigint, word: string ... 1 more field]


In [44]:
val tfIdfMatrix = tfIdf.groupBy("doc_id").pivot("word").max("tf_idf").na.fill(0).cache()

tfIdfMatrix: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [doc_id: bigint, area: double ... 99 more fields]


In [47]:
tfIdfMatrix.show(5, 50, true)

-RECORD 0----------------------------
 doc_id      | 474                   
 area        | 0.0                   
 arrived     | 0.0                   
 away        | 0.0                   
 bad         | 0.0                   
 bar         | 0.0                   
 bathroom    | 0.0                   
 beach       | 0.0                   
 beautiful   | 0.0                   
 bed         | 0.0                   
 beds        | 0.0                   
 best        | 0.0                   
 better      | 0.0                   
 big         | 0.0                   
 bit         | 0.0                   
 booked      | 0.016556589958485957  
 breakfast   | 0.00908294899928728   
 buffet      | 0.0                   
 check       | 0.0                   
 city        | 0.0                   
 clean       | 0.008629493418258532  
 close       | 0.0                   
 comfortable | 0.014696580838620016  
 day         | 0.0                   
 days        | 0.0                   
 definitely 