In [1]:
import org.apache.spark.sql._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf

import scala.util.matching.Regex

In [2]:
val spark = SparkSession.builder()
                        .master("local[*]")
                        .appName("hw06")
                        .getOrCreate()

spark = org.apache.spark.sql.SparkSession@2e0034d3


org.apache.spark.sql.SparkSession@2e0034d3

In [3]:
import spark.implicits._

In [4]:
val data_path: String = "./tripadvisor_hotel_reviews.csv"

data_path = ./tripadvisor_hotel_reviews.csv


./tripadvisor_hotel_reviews.csv

In [5]:
var reviews = spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(data_path)

reviews = [Review: string, Rating: int]


[Review: string, Rating: int]

In [6]:
reviews.show(5)

+--------------------+------+
|              Review|Rating|
+--------------------+------+
|nice hotel expens...|     4|
|ok nothing specia...|     2|
|nice rooms not 4*...|     3|
|unique, great sta...|     5|
|great stay great ...|     5|
+--------------------+------+
only showing top 5 rows



In [7]:
reviews = reviews
        .select("Review")
        .withColumn("DocId", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))

reviews = [Review: string, DocId: int]


[Review: string, DocId: int]

In [8]:
reviews.show(5)

+--------------------+-----+
|              Review|DocId|
+--------------------+-----+
|nice hotel expens...|    1|
|ok nothing specia...|    2|
|nice rooms not 4*...|    3|
|unique, great sta...|    4|
|great stay great ...|    5|
+--------------------+-----+
only showing top 5 rows



In [9]:
// В задании сказано удалить "спецсимволы", так что оставим только буквы и цифры, а всё остальное заменим на пробел
val cleanRegex: Regex = raw"[^A-Za-z0-9]+".r

cleanRegex = [^A-Za-z0-9]+


[^A-Za-z0-9]+

In [10]:
def cleanStr(str: String): String = {
    cleanRegex.replaceAllIn(str.toLowerCase, " ").trim()
}

cleanStr: (str: String)String


In [11]:
val cleanStrUDF = udf(cleanStr _)

cleanStrUDF = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

In [12]:
val cleanReviews = reviews
                    .withColumn("CleanReview", cleanStrUDF(col("Review")))
                    .drop("Review")

cleanReviews = [DocId: int, CleanReview: string]


[DocId: int, CleanReview: string]

In [13]:
cleanReviews.show(5)

+-----+--------------------+
|DocId|         CleanReview|
+-----+--------------------+
|    1|nice hotel expens...|
|    2|ok nothing specia...|
|    3|nice rooms not 4 ...|
|    4|unique great stay...|
|    5|great stay great ...|
+-----+--------------------+
only showing top 5 rows



In [14]:
val reviewWords = cleanReviews
                    .withColumn("Words", split(col("CleanReview"), " "))
                    .drop("CleanReview")

reviewWords = [DocId: int, Words: array<string>]


[DocId: int, Words: array<string>]

In [15]:
reviewWords.show(5)

+-----+--------------------+
|DocId|               Words|
+-----+--------------------+
|    1|[nice, hotel, exp...|
|    2|[ok, nothing, spe...|
|    3|[nice, rooms, not...|
|    4|[unique, great, s...|
|    5|[great, stay, gre...|
+-----+--------------------+
only showing top 5 rows



In [16]:
val documentsFlat = reviewWords.as[(Long, Array[String])]
                        .flatMap {
                            case (docId, words) => words.map((docId, _))
                        }.toDF("DocId", "Word")

documentsFlat = [DocId: bigint, Word: string]


[DocId: bigint, Word: string]

In [17]:
documentsFlat.show(5)

+-----+---------+
|DocId|     Word|
+-----+---------+
|    1|     nice|
|    1|    hotel|
|    1|expensive|
|    1|  parking|
|    1|      got|
+-----+---------+
only showing top 5 rows



In [18]:
val wordCounts = documentsFlat
                    .groupBy("DocId", "Word")
                    .agg(count("*").as("WordCount"))

wordCounts = [DocId: bigint, Word: string ... 1 more field]


[DocId: bigint, Word: string ... 1 more field]

In [19]:
wordCounts.show(5)

+-----+---------+---------+
|DocId|     Word|WordCount|
+-----+---------+---------+
|    2| printout|        1|
|    4|   showed|        1|
|    4|bathrobes|        1|
|    6|   closed|        1|
|    6|   mighty|        1|
+-----+---------+---------+
only showing top 5 rows



In [20]:
val docLengths = wordCounts
                    .groupBy("DocId")
                    .agg(sum("WordCount").as("DocLen"))

docLengths = [DocId: bigint, DocLen: bigint]


[DocId: bigint, DocLen: bigint]

In [21]:
docLengths.show(5)

+-----+------+
|DocId|DocLen|
+-----+------+
|   26|    73|
|  964|    60|
| 1697|    37|
| 1950|    67|
| 2040|    25|
+-----+------+
only showing top 5 rows



In [22]:
val wordFreqs = wordCounts
                    .join(docLengths, Seq("DocId"), "inner")
                    .withColumn("WordFreq", (col("WordCount") / col("DocLen")))
                    .orderBy(col("DocId"), col("WordCount").desc)

wordFreqs = [DocId: bigint, Word: string ... 3 more fields]


[DocId: bigint, Word: string ... 3 more fields]

In [23]:
wordFreqs.show(5)

+-----+-------+---------+------+--------------------+
|DocId|   Word|WordCount|DocLen|            WordFreq|
+-----+-------+---------+------+--------------------+
|    1|   nice|        5|    88|0.056818181818181816|
|    1|   room|        3|    88| 0.03409090909090909|
|    1|parking|        3|    88| 0.03409090909090909|
|    1|   stay|        2|    88|0.022727272727272728|
|    1|    did|        2|    88|0.022727272727272728|
+-----+-------+---------+------+--------------------+
only showing top 5 rows



In [24]:
val totalDocuments = reviewWords.count

totalDocuments = 20491


20491

In [25]:
val documentFreqs = wordCounts
                        .groupBy("Word")
                        .agg(count(col("DocId")).as("DocCount"))  // Число документов, содержащих слово
                        .withColumn("DocFreq", col("DocCount") / lit(totalDocuments))
                        .withColumn("InvDocFreq", lit(1) / col("DocFreq"))
                        .withColumn("IDF", log(col("InvDocFreq")))
                        .orderBy(col("DocCount").desc)

documentFreqs = [Word: string, DocCount: bigint ... 3 more fields]


[Word: string, DocCount: bigint ... 3 more fields]

In [26]:
documentFreqs.show(5)

+-----+--------+------------------+------------------+-------------------+
| Word|DocCount|           DocFreq|        InvDocFreq|                IDF|
+-----+--------+------------------+------------------+-------------------+
|hotel|   16402| 0.800448977599922|1.2492988659919522|0.22258248674047598|
| room|   14206|0.6932799765750818|1.4424186963254961| 0.3663213547790772|
|  not|   12155|0.5931872529403153|1.6858083093377212| 0.5222451575866088|
|staff|   11768|0.5743009125957738|   1.7412474507138|  0.554601782055559|
|great|   11128|0.5430676882533796|1.8413910855499638| 0.6105213107335535|
+-----+--------+------------------+------------------+-------------------+
only showing top 5 rows



"Взять только 100 самых встречаемых" здесь трактуется как 100 слов, встречающихся в наибольшем числе документов

In [27]:
val top100DocFreqs = documentFreqs.limit(100)

top100DocFreqs = [Word: string, DocCount: bigint ... 3 more fields]


[Word: string, DocCount: bigint ... 3 more fields]

In [28]:
val top100TfIdf = wordFreqs
                    .join(top100DocFreqs.select("Word", "IDF"), Seq("Word"), "inner")
                    .withColumn("TfIdf", col("WordFreq") * col("IDF"))

top100TfIdf = [Word: string, DocId: bigint ... 5 more fields]


[Word: string, DocId: bigint ... 5 more fields]

In [29]:
top100TfIdf.show(5)

+-----+-----+---------+------+--------------------+-------------------+--------------------+
| Word|DocId|WordCount|DocLen|            WordFreq|                IDF|               TfIdf|
+-----+-----+---------+------+--------------------+-------------------+--------------------+
| nice|    1|        5|    88|0.056818181818181816| 1.0050827448124307| 0.05710697413706992|
| room|    1|        3|    88| 0.03409090909090909| 0.3663213547790772|0.012488228003832176|
|  did|    1|        2|    88|0.022727272727272728|   1.03884637496524|0.023610144885573635|
|night|    1|        2|    88|0.022727272727272728| 1.1189235612361903|0.025430080937186145|
|hotel|    1|        2|    88|0.022727272727272728|0.22258248674047598|0.005058692880465363|
+-----+-----+---------+------+--------------------+-------------------+--------------------+
only showing top 5 rows



In [30]:
val top100TfIdfPivot = top100TfIdf
                        .select("DocId", "Word", "TfIdf")
                        .groupBy("DocId")
                        .pivot("Word")
                        .sum("TfIdf")
                        .na.fill(0)
                        .orderBy("DocId")
                        .cache()

top100TfIdfPivot = [DocId: bigint, 10: double ... 99 more fields]


[DocId: bigint, 10: double ... 99 more fields]

In [31]:
top100TfIdfPivot.show(5, -1, true)

-RECORD 0----------------------------
 DocId       | 1                     
 10          | 0.0                   
 2           | 0.0                   
 3           | 0.0                   
 4           | 0.0                   
 5           | 0.0                   
 area        | 0.0                   
 arrived     | 0.024336415983565462  
 away        | 0.0                   
 bar         | 0.0                   
 bathroom    | 0.0                   
 beach       | 0.0                   
 beautiful   | 0.0                   
 bed         | 0.018957759647777545  
 best        | 0.0                   
 better      | 0.0                   
 big         | 0.0                   
 bit         | 0.0                   
 booked      | 0.0                   
 breakfast   | 0.0                   
 buffet      | 0.0                   
 check       | 0.02152366609603048   
 city        | 0.0                   
 clean       | 0.01098007637530946   
 close       | 0.0                   
 comfortable

In [32]:
top100TfIdfPivot.coalesce(1)
                .write
                .option("header", "true")
                .option("sep", ",")
                .mode("overwrite")
                .csv("./results")