In [0]:
val df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("sep", ",")
    .csv("/opt/zeppelin/notebook/tripadvisor_hotel_reviews.csv")

df.head

In [1]:
// привести все к одному регистру
val df_lower = df
    .withColumn("Review", lower(col("Review")))
    .select("Review")
df_lower.head

In [2]:
// удалить все спецсимволы (кроме \\s и латинский букв)
val df_clear = df_lower
    .withColumn("Review", regexp_replace(col("Review"), "[^a-z ]", ""))
df_clear.head

In [3]:
// разбить предложение на массивы слов
val df_words = df_clear
    .withColumn("Review", trim(col("Review")))
    .withColumn("Review", split(col("Review"), "\\s+"))
df_words.head

In [4]:
// развернуть массивы слов отдельные строки с идентификатором отзыва
val df_words_counted = df_words
    .withColumn("ReviewId", monotonically_increasing_id())
    .withColumn("Word", explode(col("Review")))
    .select("ReviewId", "Word")
df_words_counted.show(200)

In [5]:
// посчитать частоту слова в рамках каждого документа: tf

import org.apache.spark.sql.expressions.Window

val revieIdWordWindow = Window.partitionBy("Reviewid", "Word")
val revieIdWindow = Window.partitionBy("Reviewid")
val wordWindow = Window.partitionBy("Word")

val df_calculated_tf = df_words_counted
    .withColumn("FD", count(col("Word")).over(revieIdWordWindow))
    .dropDuplicates("ReviewId", "Word", "FD")
    .withColumn("SFD", count(col("ReviewId")).over(revieIdWindow))
    .withColumn("TF", col("FD") / col("SFD"))
    .orderBy("ReviewId")

df_calculated_tf.show(300)

In [6]:
// посчитать количество документов со словом, взять только 100 самых встречаемых

val DN = df.count()
val df_calculated_idf = df_calculated_tf
    .select("ReviewId", "Word")
    .withColumn("DF", count(col("Word")).over(wordWindow))
    .select("Word", "DF")
    .dropDuplicates("Word")
    .orderBy(desc("DF"))
    .limit(100)
    .withColumn("DN", lit(DN))
    .withColumn("IDF", log(col("DN") / col("DF")))
    
    
df_calculated_idf.show(100)

In [7]:
// cджойнить две полученные таблички и посчитать Tf-Idf (только для слов из предыдущего пункта)
val tf_final = df_calculated_tf
    .select("ReviewId", "Word", "TF")
    .dropDuplicates()

val idf_final = df_calculated_idf
    .select("Word", "IDF")
    .dropDuplicates()
    
val tf_idf = tf_final
    .join(idf_final, Seq("Word"), "inner")
    .withColumn("TFIDF", col("TF") * col("IDF"))
    .select("ReviewId", "Word", "TFIDF")
tf_idf.show(100)

In [8]:
// запайвотить табличку

import org.apache.spark.sql._

val tf_idf_final = tf_idf
    .groupBy("ReviewId")
    .pivot("Word")
    .max("TFIDF")
    .na.fill(0)

tf_idf_final
    .select(tf_idf_final.columns.take(15).map(col): _*)
    .show(5, 10)