In [1]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
def removePunctuation(column):
    return F.split(F.trim(F.lower(F.regexp_replace(F.concat_ws(" ", column),'[^\sa-zA-Z0-9]', ''))), " ").alias('Review')

In [3]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName('PySpark_Tutorial')\
    .getOrCreate()

In [4]:
data_schema = [StructField('Review', StringType(), True),
               StructField('Rating', StringType(), True),]
final_struc = StructType(fields = data_schema)

In [5]:
csv_path = 'tripadvisor_hotel_reviews.csv'
df = spark.read.csv(csv_path, schema=final_struc, header=False)

In [6]:
w = Window().partitionBy('new_column').orderBy(F.lit('A'))
df = df.where(df['Review'] != 'Review')\
    .withColumn("new_column",F.lit("ABC"))\
    .withColumn("id",F. row_number()\
                .over(w))\
    .drop("new_column")

In [8]:
df_count = df.select('id', removePunctuation(F.col('Review')))\
    .withColumn('split', F.col('Review'))\
    .withColumn('words', F.explode('split'))\
    .groupby('id', 'Review').count()

In [9]:
tf = df_count.withColumnRenamed('count', 'len')\
    .withColumn('words', F.explode('Review'))\
    .drop('Review').groupby('id','words', 'len').count()\
    .withColumn('tf', F.col('count') / F.col('len'))

In [10]:
idf = df_count.withColumnRenamed('count', 'len')\
    .withColumn('words', F.explode('Review'))\
    .drop('Review').groupby('id','words', 'len').count()\
    .groupby('words').count()\
    .withColumnRenamed('count', 'total')\
    .withColumn('idf', F.lit(df.count()) / (F.lit(1) + F.col('total')))

In [36]:
tf_idf = tf.join(idf.sort('total', ascending=False).limit(100).drop('total'), on=['words'], how='inner')\
    .withColumn('tf-idf', F.col('tf') * F.col('idf'))\
    .drop('len', 'count', 'tf', 'idf')

In [38]:
tf_idf = tf_idf.groupby('id').pivot('words').sum('tf-idf')

In [40]:
tf_idf.write.format('csv').save('tf_idf')