In [0]:
spark

In [1]:
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._


In [2]:
val spark = SparkSession.builder()
    // адрес мастера
    .master("local[*]")
    // имя приложения в интерфейсе спарка
    .appName("made-demo")
//     .config("spark.executor.memory",  "2g")
//     .config("spark.executor.cores", "2")
//     .config("spark.driver.memory", "2g")
    .getOrCreate()

In [3]:
val df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("sep", ",")
    .csv("/my_new_folder/tripadvisor_hotel_reviews.csv")

df.show(5)

In [4]:
// 1. Приводим строки к нижнему регистру и удаляем все символы кроме строчных латинскх букв
val df2 = df.select(regexp_replace(lower(col("Review")), "[^a-z ]", "").as("Clean_string"))
df2.show(5)

In [5]:
// 2. Разбиваем строку по пробельным символам
var df3 = df2.select(split(col("Clean_string")," +").as("Array"))
df3.show(5)

In [6]:
// 3. Удаляем некоторые самые популярные стоп-слова, в том числе пустые элементы
val stopWords = Array("", "did", "got", "lot", "no", "nt", "th", "not")
df3 = df3.withColumn("Array", array_except(df3("Array"), lit(stopWords)))
df3.show(5)

In [7]:
// 4. Создаем новый столбец с длинной слов в предложении
var df4 = df3.select(
    col("Array"),
    size(col("Array")).as("total_word_sentence")
).orderBy("total_word_sentence")
df4.show(5)

In [8]:
// 5. Пронумеруем все предложения
val windowSpec = Window.partitionBy().orderBy("Array")
df4 = df4.withColumn("row_number",row_number.over(windowSpec))
df4.show(5)

In [9]:
// 6. Превратим столбец с массивами слов в столбец с отдельными словами
var df5 = df4.select(
    col("row_number"),
    col("total_word_sentence"), 
    explode(col("Array"))
    )
df5.show(5)

In [10]:
// 7. Считаем сколько раз встретилось слово в одном предложении
var df6_word = df5.groupBy("col", "row_number", "total_word_sentence").agg(count("col").as("word_sentence"))
df6_word.show(5)

In [11]:
// 8. Считаем в скольких предложениях (документах) встретилось слово
var df6_documents = df5.dropDuplicates().groupBy("col").agg(count("row_number").as("documents"))
df6_documents = df6_documents.orderBy(col("documents").desc).limit(100)
df6_documents.show(5)

In [12]:
// 9. Соединяем две таблицы, полученные на предыдущих двух шагах
var df6 = df6_documents.join(df6_word, "col")
df6.show(5)

In [13]:
// 10. Считаем общее количество предложений (документов) и вычисляем TF-IDF
val total_documents = df.count()
df6 = df6.withColumn("TF-IDF", (col("word_sentence") * total_documents) / (col("total_word_sentence") * col("documents")))
df6.orderBy(col("TF-IDF").desc).show(5)

In [14]:
// 11. Разворачиваем таблицу в ширину: строки - документы, столбцы - слова из списка топ-100 самых используемых
// Также заполняем пропуски нулями
var df_final = df6.select("row_number", "col", "TF-IDF").groupBy("row_number").pivot("col").sum("TF-IDF")
df_final = df_final.na.fill(value=0).orderBy("row_number")
println(df_final.count())
println(df_final.columns.size)
df_final.show(5)