# Imports

In [1]:
import org.apache.spark.sql._
import org.apache.spark.sql.{functions => F}
import spark.implicits._

# Constants

In [3]:
val appName = "MADE_hw4"
val topN = 100
val inputPath = "tripadvisor_hotel_reviews.csv"
val outputPath = "tfidf.csv"

# Loading Data

In [5]:
val spark = (
    SparkSession.builder
    .master("local[*]")
    .appName(appName)
    .getOrCreate
)

In [6]:
val reviews = (
    spark.read.option("header", true).csv(inputPath)
    .withColumn("Id", F.monotonically_increasing_id)
    .cache
)

val numReviews = reviews.count

In [7]:
reviews.show

# Preprocessing

In [9]:
val parsedTokens = reviews.select(
    F.col("Id"), 
    F.explode(
        F.split(
            F.lower(
                F.trim(
                    F.regexp_replace(F.col("Review"), pattern="[^A-Za-z0-9- ]", replacement="")
                )
            ),
            pattern=" "
        )
    ).alias("Token")
).cache

In [10]:
parsedTokens.show

# Calculating TF

In [12]:
val tf = (
    parsedTokens.groupBy("Id", "Token")
    .agg(F.count(F.lit(1)).alias("tf"))
    .cache
)

In [13]:
tf.show

In [14]:
val df = (
    parsedTokens.groupBy("Token")
    .agg(F.countDistinct("Id").alias("Df"))
    .cache
)

In [15]:
df.show

 

# Calculating IDF

In [17]:
val idf = (
    df.orderBy(F.desc("Df"))
    .limit(topN)
    .withColumn("Idf", F.log(F.lit(numReviews) / F.col("Df")))
    .select("Token", "Idf")
)

In [18]:
idf.show

# Calculating TFIDF

In [20]:
val tfIdfUnfilled = (
    reviews.select("Id")
    .distinct
    .crossJoin(idf.select("Token"))
    .cache
)

In [21]:
tfIdfUnfilled.show

In [22]:
val tfIdf = (
    tfIdfUnfilled.join(
        tf.join(idf, Seq("Token")).withColumn(
            "tfIdf", F.col("Tf") * F.col("Idf")
        ).select("Id", "Token", "tfIdf"),
        Seq("Id", "Token"),
        "left"
    )
    .na.fill(0)
    .groupBy("Id")
    .pivot("Token")
    .agg(F.first(F.col("TfIdf")))
    .cache
)

In [23]:
tfIdf.show

# Saving results

In [25]:
tfIdf.write.mode("overwrite").csv(outputPath)