# PySpark TF-IDF implementation

Stages:


*   Convert all characters to lowercase; 
*   Remove special characters;
*   Count frequency of every word in documents;
*   Count number of documents with words;
*   Use only top 100 frequent words;
*   Join two tables with TF and DF and count TF-IDF;
*   Pivot final table.


In [1]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
!wget -q  https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
# unzip it
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
# install findspark 
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
df = spark.read.csv("tripadvisor_hotel_reviews.csv", header=True)
df.show()

+--------------------+------+
|              Review|Rating|
+--------------------+------+
|nice hotel expens...|     4|
|ok nothing specia...|     2|
|nice rooms not 4*...|     3|
|unique, great sta...|     5|
|great stay great ...|     5|
|love monaco staff...|     5|
|cozy stay rainy c...|     5|
|excellent staff, ...|     4|
|hotel stayed hote...|     5|
|excellent stayed ...|     5|
|poor value stayed...|     2|
|nice value seattl...|     4|
|nice hotel good l...|     4|
|nice hotel not ni...|     3|
|great hotel night...|     4|
|horrible customer...|     1|
|disappointed say ...|     2|
|fantastic stay mo...|     5|
|good choice hotel...|     5|
|hmmmmm say really...|     3|
+--------------------+------+
only showing top 20 rows



In [5]:
df = (
    df
    .withColumn(
        'Review', 
        F.lower(F.col('Review'))
    )
    .withColumn(
        'Review',
        F.regexp_replace(
            F.col("Review"), 
            "[^a-z ]", 
            ""
        )
    )
    .withColumn(
        'Words',
        F.split(
            F.col('Review'),
            pattern=' ',
        )
    )
)

df.show()

+--------------------+------+--------------------+
|              Review|Rating|               Words|
+--------------------+------+--------------------+
|nice hotel expens...|     4|[nice, hotel, exp...|
|ok nothing specia...|     2|[ok, nothing, spe...|
|nice rooms not  e...|     3|[nice, rooms, not...|
|unique great stay...|     5|[unique, great, s...|
|great stay great ...|     5|[great, stay, gre...|
|love monaco staff...|     5|[love, monaco, st...|
|cozy stay rainy c...|     5|[cozy, stay, rain...|
|excellent staff h...|     4|[excellent, staff...|
|hotel stayed hote...|     5|[hotel, stayed, h...|
|excellent stayed ...|     5|[excellent, staye...|
|poor value stayed...|     2|[poor, value, sta...|
|nice value seattl...|     4|[nice, value, sea...|
|nice hotel good l...|     4|[nice, hotel, goo...|
|nice hotel not ni...|     3|[nice, hotel, not...|
|great hotel night...|     4|[great, hotel, ni...|
|horrible customer...|     1|[horrible, custom...|
|disappointed say ...|     2|[d

In [6]:
tf_table = (
    df
    .select(['Words'])
    .withColumnRenamed(
        'Words',
        'Tokens',
    )
    .withColumn(
        'Document_ID',
        F.monotonically_increasing_id()
    )
    .withColumn(
        'Token',
        F.explode(F.col('Tokens'))
    )
    .groupBy(
        [F.col('Document_ID'), F.col('Token')]
    )
    .agg(
        {"Token": 'count'}
    )
    .withColumnRenamed(
        'count(Token)',
        'Token_in_document_count'
    )
    .filter(
        F.col('Token') != ''
    )
    .orderBy(
        F.col('Document_ID'),
        F.col('Token_in_document_count').desc(),
        F.col('Token'),
    ) 
)

tf_table.show()

+-----------+-----------+-----------------------+
|Document_ID|      Token|Token_in_document_count|
+-----------+-----------+-----------------------+
|          0|       nice|                      5|
|          0|    parking|                      3|
|          0|       room|                      3|
|          0|        did|                      2|
|          0|      hotel|                      2|
|          0|      night|                      2|
|          0|        not|                      2|
|          0|       stay|                      2|
|          0|  advantage|                      1|
|          0|     advice|                      1|
|          0|anniversary|                      1|
|          0|    arrived|                      1|
|          0|      aveda|                      1|
|          0|      bangs|                      1|
|          0|       bath|                      1|
|          0|        bed|                      1|
|          0|      check|                      1|


In [7]:
idf_table = (
    tf_table
    .groupby('Token')
    .agg(
        {'Document_ID': 'count'}
    )
    .withColumnRenamed(
        'count(Document_ID)',
        'Token_in_all_documents_count'
    )
    .orderBy(
        F.col('Token_in_all_documents_count').desc()
    )
    .limit(100)
    .withColumn(
        'Documents_count',
        F.lit(df.count())
    )
)

idf_table.show()

+---------+----------------------------+---------------+
|    Token|Token_in_all_documents_count|Documents_count|
+---------+----------------------------+---------------+
|    hotel|                       16325|          20491|
|     room|                       14056|          20491|
|      not|                       12124|          20491|
|    staff|                       11528|          20491|
|    great|                       11021|          20491|
|     stay|                       10096|          20491|
|     good|                        9280|          20491|
|   stayed|                        8552|          20491|
|       nt|                        8383|          20491|
|    rooms|                        8341|          20491|
| location|                        8172|          20491|
|     just|                        7736|          20491|
|    clean|                        7651|          20491|
|     nice|                        7420|          20491|
|      did|                    

In [8]:
tf_idf_table = (
    tf_table
    .join(
        (
            tf_table
            .groupBy('Document_ID')
            .agg(
                {"Token_in_document_count": "sum"}
            )
            .withColumnRenamed(
                'sum(Token_in_document_count)',
                'Tokens_count'
            )
        ),
        on='Document_ID',
        how='left'
    )
    .join(
        idf_table,
        on='Token',
        how='left'
    )
)

tf_idf_table.show()

+----------+-----------+-----------------------+------------+----------------------------+---------------+
|     Token|Document_ID|Token_in_document_count|Tokens_count|Token_in_all_documents_count|Documents_count|
+----------+-----------+-----------------------+------------+----------------------------+---------------+
|      room|          0|                      3|          86|                       14056|          20491|
|    better|          1|                      2|         243|                        3244|          20491|
|attractive|          6|                      1|          98|                        null|           null|
|  positive|          6|                      1|          98|                        null|           null|
| concierge|          7|                      2|          85|                        null|           null|
|        nt|         10|                      2|          44|                        8383|          20491|
|     clean|         12|             

In [9]:
tf_idf_table = (
    tf_idf_table
    .na
    .drop(
        subset=['Token_in_all_documents_count']
    )
    .withColumn(
        'TF',
        F.col('Token_in_document_count') / F.col('Tokens_count')
    )
    .withColumn(
        'IDF',
        F.log2(F.col('Documents_count') / F.col('Token_in_all_documents_count'))
    )
    .withColumn(
        'TF-IDF',
        F.col('TF') * F.col('IDF')
    )
)

(
    tf_idf_table
    .select(['Token', 'Document_ID', 'TF-IDF'])
    .show()
)

+---------+-----------+--------------------+
|    Token|Document_ID|              TF-IDF|
+---------+-----------+--------------------+
|     room|          0|0.018969917298355492|
|   better|          1|   0.021885964343023|
|       nt|         10| 0.05861144808162943|
|    clean|         12|0.016919882906726882|
|     stay|         15|0.009772311740784861|
|     desk|         16| 0.06791032495372712|
|      bed|         19|0.016141640469173452|
|excellent|         30| 0.05977208382864487|
|   really|         32|0.025233545234842725|
|   street|         70| 0.09023604479773865|
|      bed|         80|0.024871303171889705|
|      day|        116|0.025712667815178728|
|     just|        125| 0.02066662782967244|
|   little|        133|0.027726300229127476|
|   hotels|        146| 0.01870759696543231|
|      did|        153|0.007976294401516069|
|  walking|        173| 0.05654406021356884|
|       no|        176|  0.0078983576505205|
| bathroom|        189| 0.10309538426972249|
|      day

In [10]:
tf_idf_table.count()

444239

In [11]:
(
    tf_idf_table
    .limit(5)
    .groupBy('Token')
    .pivot('Document_ID')
    .agg(
        F.first(F.col('TF-IDF'))
    )
    .show()
)

+------+--------------------+-----------------+-------------------+--------------------+--------------------+
| Token|                   0|                1|                 10|                  12|                  15|
+------+--------------------+-----------------+-------------------+--------------------+--------------------+
|  room|0.018969917298355492|             null|               null|                null|                null|
|better|                null|0.021885964343023|               null|                null|                null|
|    nt|                null|             null|0.05861144808162943|                null|                null|
| clean|                null|             null|               null|0.016919882906726882|                null|
|  stay|                null|             null|               null|                null|0.009772311740784861|
+------+--------------------+-----------------+-------------------+--------------------+--------------------+

