In [1]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types._


Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.31.213:4040
SparkContext available as 'sc' (version = 3.0.1, master = local[*], app id = local-1637828871498)
SparkSession available as 'spark'


import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types._


In [2]:
val spark = SparkSession.builder()
    // адрес мастера
    .master("local[*]")
    // имя приложения в интерфейсе спарка
    .appName("made-demo")
    .getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@13cdca98


In [3]:
val df = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("tripadvisor_hotel_reviews.csv")

df: org.apache.spark.sql.DataFrame = [Review: string, Rating: int]


In [4]:
// Привести все к одному регистру
// Удалить все спецсимволы
val lower = udf((col: String) => {
    col.toLowerCase
})
val df_cleaned = df
    .withColumn("Review", regexp_replace(lower(col("Review")), "[^a-z0-9 ']", "")).withColumn("id",monotonicallyIncreasingId)

lower: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3395/0x00000008413eb840@7f719d7a,StringType,List(Some(class[value[0]: string])),None,true,true)
df_cleaned: org.apache.spark.sql.DataFrame = [Review: string, Rating: int ... 1 more field]


In [5]:
// Найдем топ100 частых слов
val flatten = df_cleaned
  .select(split(col("Review"), " "))
  .as[Array[String]]
  .flatMap(x => x)
val top100 = flatten
  .groupBy(col("value"))
  .count()
  .orderBy(desc("count"))
  .limit(100)
  .select("value")

top100.show

+--------+
|   value|
+--------+
|   hotel|
|        |
|    room|
|     not|
|   great|
|     n't|
|    good|
|   staff|
|    stay|
|     did|
|    just|
|    nice|
|   rooms|
|      no|
|location|
|  stayed|
| service|
|    time|
|   beach|
|   night|
+--------+
only showing top 20 rows



flatten: org.apache.spark.sql.Dataset[String] = [value: string]
top100: org.apache.spark.sql.DataFrame = [value: string]


In [6]:
val get_wordcounts = udf((col: String) => {
    col.split(" ").groupBy(identity).mapValues(_.size)
})
val df_wordcounts = df_cleaned.withColumn("wordcounts", get_wordcounts(col("Review")))
df_wordcounts.show

+--------------------+------+---+--------------------+
|              Review|Rating| id|          wordcounts|
+--------------------+------+---+--------------------+
|nice hotel expens...|     4|  0|[maybe -> 1, peop...|
|ok nothing specia...|     2|  1|[pm -> 1, guest -...|
|nice rooms not 4 ...|     3|  2|[45 -> 1, wakeup ...|
|unique great stay...|     5|  3|[rock -> 1, feel ...|
|great stay great ...|     5|  4|[lights -> 1, lau...|
|love monaco staff...|     5|  5|[used -> 1, handb...|
|cozy stay rainy c...|     5|  6|[rainy -> 1, inte...|
|excellent staff h...|     4|  7|[used -> 1, regar...|
|hotel stayed hote...|     5|  8|[sturdiness -> 1,...|
|excellent stayed ...|     5|  9|[particularly -> ...|
|poor value stayed...|     2| 10|[quotient -> 1, s...|
|nice value seattl...|     4| 11|[people -> 1, are...|
|nice hotel good l...|     4| 12|[panel -> 1, rese...|
|nice hotel not ni...|     3| 13|[4th -> 1, tiring...|
|great hotel night...|     4| 14|[business -> 2, l...|
|horrible 

get_wordcounts: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4100/0x0000000841625840@542a5e90,MapType(StringType,IntegerType,false),List(Some(class[value[0]: string])),None,true,true)
df_wordcounts: org.apache.spark.sql.DataFrame = [Review: string, Rating: int ... 2 more fields]


In [7]:
// Найдем N для idf
val num_reviews = df.count()
// найдем список уникальных слов для каждого документа, суммируя их получим число документов по каждому слову
val document_counts = df_wordcounts
  .select(col("wordcounts"))
  .as[Map[String, Int]]
  .flatMap(x => x)
  .groupBy("_1")
  .count
  .select(col("_1").as("value"), col("count"))
// Найдем idf
val df_idf = top100
  .join(document_counts, Seq("value"), "left")
  .withColumn("idf", log(lit(num_reviews).as[Double]/col("count")))

num_reviews: Long = 20491
document_counts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]
df_idf: org.apache.spark.sql.DataFrame = [value: string, count: bigint ... 1 more field]


In [8]:
// Найдем tfidf
val idfmap = df_idf.select("value", "idf").map(row => row.getString(0) -> row.getDouble(1)).collect().toMap
val get_tfidf = udf((col: String) => {
    val split = col.split(" ")
    val wordcounts = split.groupBy(identity).mapValues(_.size)
    val words = split.size
    val freqs = (for((word, count) <- wordcounts) yield (word, count.toDouble/words)).toMap
    (for ((word, idf) <- idfmap) yield (word, idf * freqs.getOrElse(word, 0.0))).toMap
})
val df_tfidf = df_cleaned.withColumn("tfidf", get_tfidf(col("Review")))

idfmap: scala.collection.immutable.Map[String,Double] = Map(beautiful -> 1.7973874969055945, "" -> 8.829128755668723, buffet -> 2.088215462632154, recommend -> 1.575422496110829, hotels -> 1.8419463430552638, people -> 1.6031621991999818, city -> 1.9378416693938931, area -> 1.5968774311120884, booked -> 1.865623461582094, nights -> 1.6664728937591844, stayed -> 0.8741714483105988, bit -> 1.9344587162352413, location -> 0.9201290389322669, 4 -> 2.0606355440200934, got -> 1.5939900373832527, want -> 1.8767188361461544, best -> 1.569778078491152, fantastic -> 2.1724022314903313, good -> 0.792447546683561, hotel -> 0.2275331431861368, problem -> 2.1905609665022023, big -> 2.1210446718156533, going -> 2.0691003887160413, excellent -> 1.5336198505105894, bathroom -> 1.57212604857665, price ->...


In [9]:
// Создадим колонки для tfidf значений топ100 слов
val keys = top100.collect().map(f=>f.get(0))
val keyCols = keys.map(f=> col("tfidf").getItem(f).as(f.toString))
val df_result = df_tfidf.select(col("id") +: keyCols:_*)
df_result.show(false)

+---+---------------------+---+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------------+-----+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+------+---------------------+--------------------+----+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------------

|5  |0.0033960170624796537|0.0|0.005629102525911755 |0.00391702455664365  |0.0                  |0.006674451812163077 |0.005913787661817619 |0.004296473985406747 |0.010566350949102904 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.006523667524705961 |0.00888754112972346  |0.0                 |0.0  |0.0                 |0.0                 |0.0                 |0.0                  |0.009856023499408997|0.0                 |0.0   |0.0                  |0.010069138988833557|0.0 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                  |0.0                 |0.0                 |0.0                 |0.011714761779784715|0.0                 |0.02557936490618044 |0.0                 |0.0                 |0.0                 |0.0                |0.0                 |0.0                 |0.0                 |0.013413339529146227|0.01390611256288

|9  |0.0065009469481753365|0.0|0.010775710549602502 |0.0                  |0.01772211318942649  |0.0                  |0.0                  |0.032898715088257376 |0.0                  |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.024976327094588534 |0.0                  |0.0                 |0.0  |0.0                 |0.0                 |0.0                 |0.0                  |0.0                 |0.0                 |0.0   |0.0                  |0.0                 |0.0 |0.0                 |0.03588889998903361 |0.041201100571770455|0.0                 |0.0                 |0.0                  |0.043817710014588264|0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.04772443967446847|0.0                 |0.04831587776069645 |0.0                 |0.0                 |0.0             

|13 |0.0065009469481753365|0.0|0.005387855274801251 |0.007498304151289273 |0.0                  |0.0                  |0.0                  |0.016449357544128688 |0.010113507336998494 |0.0                 |0.0                 |0.02903845534546118 |0.01284517694137272 |0.015739149983626657|0.0                 |0.0                  |0.0                  |0.0                 |0.0  |0.0                 |0.0                 |0.01834668124300379 |0.0                  |0.0                 |0.0                 |0.0   |0.0                  |0.0                 |0.0 |0.0                 |0.017944449994516803|0.0                 |0.0                 |0.022771286248332183|0.0                  |0.0                 |0.0                 |0.023825219779550804|0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                |0.0                 |0.0                 |0.0                 |0.0                 |0.0             

|17 |0.01422082144913355  |0.0|0.007857288942418491 |0.0072900179248645705|0.004307458066874494 |0.0062109482140961965|0.005503107963080284 |0.0                  |0.004916288288818712 |0.0                 |0.0                 |0.0                 |0.0                 |0.007650975686485179|0.0                 |0.0060706350577124915|0.008270350773492663 |0.008352229206376936|0.0  |0.0                 |0.0                 |0.0                 |0.0073496033642591295|0.0                 |0.0                 |0.0   |0.0                  |0.0                 |0.0 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                 |0.0                  |0.02130027570153596 |0.0                 |0.0                 |0.010901236656188555|0.0                 |0.01190151006051451 |0.0                 |0.0                 |0.0                 |0.0                |0.0                 |0.0                 |0.010940434000769645|0.012481857617399962|0.0             

keys: Array[Any] = Array(hotel, "", room, not, great, n't, good, staff, stay, did, just, nice, rooms, no, location, stayed, service, time, beach, night, clean, day, breakfast, food, like, resort, really, place, pool, people, friendly, small, little, got, walk, excellent, area, 2, best, helpful, restaurant, bar, bathroom, trip, restaurants, water, bed, recommend, beautiful, went, view, floor, comfortable, desk, 5, 3, nights, right, want, way, better, wonderful, free, hotels, make, city, away, bit, booked, reviews, large, price, street, minutes, say, quite, buffet, new, days, lobby, 4, loved, experience, going, close, morning, definitely, big, lovely, airport, fantastic, think, took, check, problem, lot, perfect, walking, need, bad)
keyCols: Array[org.apache.spark.sql.Column] = Array(tfid...
