In [0]:
spark

In [1]:
val data = spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .option("sep", ",")
                .csv("/notebook/tripadvisor_hotel_reviews.csv")
data.show(5)

In [2]:
val data_with_id = data.withColumn("review_id", monotonically_increasing_id())
                       .withColumn("Review", regexp_replace(lower(col("Review")), "[^a-zA-Z0-9]+", " "))
                       .withColumn("Review", trim(col("Review")))
                       .withColumnRenamed("Review","review")
                       .withColumn("token", explode(split(col("Review")," ")))

data_with_id.show(5)

In [3]:
val TF = data_with_id.groupBy("review_id","token")
                     .agg(count("Review") as "tf")
                     .orderBy("review_id")
TF.show(5)


In [4]:
val DF = data_with_id.groupBy("token")
                     .agg(countDistinct("review_id") as "df")
                     .orderBy(desc("df"))
                     .limit(100)
DF.show(5)

In [5]:
val reviewsNumber = data.count().toDouble
val calcIdfUdf = udf { df: Long => math.log(reviewsNumber / (df.toDouble + 1))}
val IDF = DF.withColumn("idf", calcIdfUdf(col("df")))
IDF.show()

In [6]:
val TF_IDF = TF.join(IDF, Seq("token"), "inner")
               .withColumn("tf_idf", col("tf") * col("idf"))
TF_IDF.show()

In [7]:
val result = TF_IDF.groupBy("review_id")
                   .pivot("token")
                   .agg(round(first(col("tf_idf"), ignoreNulls = true), 4))
                   .orderBy("review_id")
                   
result.show()
          