# Домашнее задание №4: Tf-Idf с помощью Spark DataFrame API


In [2]:
import org.apache.spark.sql._

## SparkSession

In [3]:
val spark = SparkSession.builder()
    // адрес мастера
    .master("local[*]")
    // имя приложения в интерфейсе спарка
    .appName("hw6")
//     .config("spark.executor.memory",  "2g")
//     .config("spark.executor.cores", "2")
//     .config("spark.driver.memory", "2g")
    .getOrCreate()

In [None]:
import spark.implicits._

import spark.implicits._


## Read and preprocess

* Привести все к одному регистру
* Удалить все спецсимволы

In [None]:
val text = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("tripadvisor_hotel_reviews.csv")
  .select(regexp_replace(lower(col("Review")), "[^\\w\\s-]", "").as("preprocessed_text"))

text: org.apache.spark.sql.DataFrame = [preprocessed_text: string]


In [None]:
var tokens = text
        .withColumn("tokens", split(col("preprocessed_text")," "))

tokens: org.apache.spark.sql.DataFrame = [preprocessed_text: string, tokens: array<string>]


In [None]:
tokens.show(5)

+--------------------+--------------------+
|   preprocessed_text|              tokens|
+--------------------+--------------------+
|nice hotel expens...|[nice, hotel, exp...|
|ok nothing specia...|[ok, nothing, spe...|
|nice rooms not 4 ...|[nice, rooms, not...|
|unique great stay...|[unique, great, s...|
|great stay great ...|[great, stay, gre...|
+--------------------+--------------------+
only showing top 5 rows



## Tf Idf

In [None]:
var tokens_docid =  tokens.withColumn("doc_id", monotonicallyIncreasingId())

tokens_docid_: org.apache.spark.sql.DataFrame = [preprocessed_text: string, tokens: array<string> ... 1 more field]


In [None]:
val cols = tokens_docid.columns.map(col) :+ (explode(col("tokens")) as "token" ) 
val unfolded = tokens_docid.select(cols: _*).filter("token != ''")

cols: Array[org.apache.spark.sql.Column] = Array(preprocessed_text, tokens, doc_id, explode(tokens) AS `token`)
unfolded: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [preprocessed_text: string, tokens: array<string> ... 2 more fields]


In [None]:
unfolded.show(5)

+--------------------+--------------------+------+---------+
|   preprocessed_text|              tokens|doc_id|    token|
+--------------------+--------------------+------+---------+
|nice hotel expens...|[nice, hotel, exp...|     0|     nice|
|nice hotel expens...|[nice, hotel, exp...|     0|    hotel|
|nice hotel expens...|[nice, hotel, exp...|     0|expensive|
|nice hotel expens...|[nice, hotel, exp...|     0|  parking|
|nice hotel expens...|[nice, hotel, exp...|     0|      got|
+--------------------+--------------------+------+---------+
only showing top 5 rows



### Tf
* Посчитать частоту слова в предложении

In [None]:
var tf = unfolded
        .groupBy("doc_id", "token")
        .agg(count("tokens") as "tf")

tf: org.apache.spark.sql.DataFrame = [doc_id: bigint, token: string ... 1 more field]


In [None]:
tf.show(5)

+------+----------+---+
|doc_id|     token| tf|
+------+----------+---+
|     0|      room|  3|
|     1|    better|  2|
|     6|attractive|  1|
|     6|  positive|  1|
|     7| concierge|  2|
+------+----------+---+
only showing top 5 rows



### Df
* Посчитать количество документов со словом
* Взять только 100 самых встречаемых

In [None]:
var df = unfolded
    .groupBy("token")
    .agg(countDistinct("doc_id") as "df_")
    .orderBy($"df_".desc)
    .limit(100)

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [token: string, df_: bigint]


In [None]:
df.show(5)

+-----+-----+
|token|  df_|
+-----+-----+
|hotel|16312|
| room|14046|
|  not|12123|
|staff|11517|
|great|11015|
+-----+-----+
only showing top 5 rows



### Idf

In [None]:
var numOfDocuments = text.count

numOfDocuments: Long = 20491


In [None]:
def calcIdf(docCount: Long, curDf: Long): Double =
    math.log((docCount.toDouble + 1) / (curDf.toDouble + 1))

calcIdf: (docCount: Long, curDf: Long)Double


In [None]:
val dufCalcIdf = udf {curDf: Long => calcIdf(numOfDocuments, curDf)}

dufCalcIdf: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4210/0x0000000841489840@5ca62cc5,DoubleType,List(Some(class[value[0]: bigint])),None,false,true)


In [None]:
var idf = df.withColumn("idf", dufCalcIdf(col("df_")) )

idf: org.apache.spark.sql.DataFrame = [token: string, df_: bigint ... 1 more field]


In [None]:
idf.show(5)

+-----+-----+-------------------+
|token|  df_|                idf|
+-----+-----+-------------------+
|hotel|16312| 0.2280722301227903|
| room|14046|0.37762571622818303|
|  not|12123| 0.5248476068813992|
|staff|11517| 0.5761235369924752|
|great|11015| 0.6206858046506021|
+-----+-----+-------------------+
only showing top 5 rows



In [None]:
idf.count

res40: Long = 100


### Merge
* Сджойнить две полученные таблички и посчитать Tf-Idf (только для слов из предыдущего пункта)

In [None]:
var tfidf = tf
.join(idf, Seq("token"), "inner")
.withColumn("tfidf", col("tf") * col("idf"))

tfidf: org.apache.spark.sql.DataFrame = [token: string, doc_id: bigint ... 4 more fields]


In [None]:
tfidf.show(5)

+------+------+---+-----+-------------------+------------------+
| token|doc_id| tf|  df_|                idf|             tfidf|
+------+------+---+-----+-------------------+------------------+
|  room|     0|  3|14046|0.37762571622818303| 1.132877148684549|
|better|     1|  2| 3239| 1.8444612362727177|3.6889224725454355|
|    nt|    10|  2| 8379| 0.8941866515829642|1.7883733031659284|
| clean|    12|  1| 7641| 0.9863752170522476|0.9863752170522476|
|  stay|    15|  2|10087| 0.7086879674143374|1.4173759348286747|
+------+------+---+-----+-------------------+------------------+
only showing top 5 rows



## Запайвотить
* Запайвотить табличку

In [None]:
var pivot = tfidf
      .groupBy("doc_id")
      .pivot(col("token"))
      .sum("tfidf")

pivot: org.apache.spark.sql.DataFrame = [doc_id: bigint, 2: double ... 99 more fields]


In [None]:
pivot.repartition(2)
      .write
      .format("parquet")
      .option("header","true")
      .save("hw6_solution")