In [35]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *

In [36]:
spark_master_private_ip = '192.168.2.122'

spark = SparkSession.builder\
        .master(f'spark://{spark_master_private_ip}:7077') \
        .appName('group-9_project')\
        .config('spark.dynamicAllocation.enabled', True)\
        .config('spark.dynamicAllocation.shuffleTracking.enabled',True)\
        .config('spark.shuffle.service.enabled', True)\
        .config('spark.dynamicAllocation.executorIdleTimeout','30s')\
        .config('spark.cores.max', 4)\
        .getOrCreate()

sql_context = SQLContext(spark.sparkContext)

In [37]:
# Loads the Reddit dataset containing comments as JSON documents from HDFS into a DataFrame 
# and stores it in cache memory to increase speed.
reddit_comments = sql_context.read.json(f'hdfs://{spark_master_private_ip}:9000/group-9').cache()

                                                                                

In [38]:
# Prints the schema of the Reddit comments. 
reddit_comments.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: boolean (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



In [39]:
# Extracts the documents (comment bodies) of the Reddit comments. 
documents = reddit_comments.select(col('body').alias('document'))

In [40]:
# Prints the documents in the document corpus.
documents.show()

+--------------------+
|            document|
+--------------------+
|A look at Vietnam...|
|The site states "...|
|Jython related to...|
|           [deleted]|
|Saft is by far th...|
|           [deleted]|
|How to take panor...|
|I donât know wh...|
|LinkIt by Marc, a...|
|Making websites r...|
|On the bright sid...|
|Like a lot of peo...|
|This is comment t...|
|           [deleted]|
|           [deleted]|
|           [deleted]|
|           [deleted]|
|It's a New York T...|
|[Here's the copy ...|
|The best thing ab...|
+--------------------+
only showing top 20 rows



In [41]:
# Pre-processes each document in the document corpus. 
def pre_process(documents):
    # Lowercases the documents.
    lower_words = lower(col('document'))
    
    # Tokenizes the documents (splits on whitespace). 
    whitespace = '\\s+'
    tokenize_words = split(lower_words, whitespace)
    
    # Filters out deleted/removed documents and tokenization
    return documents.filter((col('document') != '[deleted]') & (col('document') != '[removed]')) \
                    .select(tokenize_words.alias('document'))

In [42]:
# Pre-processes each document in the document corpus.
documents = pre_process(documents)
documents.show()

+--------------------+
|            document|
+--------------------+
|[a, look, at, vie...|
|[the, site, state...|
|[jython, related,...|
|[saft, is, by, fa...|
|[how, to, take, p...|
|[i, donât, know...|
|[linkit, by, marc...|
|[making, websites...|
|[on, the, bright,...|
|[like, a, lot, of...|
|[this, is, commen...|
|[it's, a, new, yo...|
|[[here's, the, co...|
|[the, best, thing...|
|[you, can, rank, ...|
|[just, testing, t...|
|            [ye, ye]|
|[_we, didn't, tor...|
|[interesting, art...|
|[reddit, got, a, ...|
+--------------------+
only showing top 20 rows



In [43]:
# Adds unique identifiers for all documents in the document corpus.  
documents = documents.withColumn('doc_id', monotonically_increasing_id())
documents.show()

+--------------------+------+
|            document|doc_id|
+--------------------+------+
|[a, look, at, vie...|     0|
|[the, site, state...|     1|
|[jython, related,...|     2|
|[saft, is, by, fa...|     3|
|[how, to, take, p...|     4|
|[i, donât, know...|     5|
|[linkit, by, marc...|     6|
|[making, websites...|     7|
|[on, the, bright,...|     8|
|[like, a, lot, of...|     9|
|[this, is, commen...|    10|
|[it's, a, new, yo...|    11|
|[[here's, the, co...|    12|
|[the, best, thing...|    13|
|[you, can, rank, ...|    14|
|[just, testing, t...|    15|
|            [ye, ye]|    16|
|[_we, didn't, tor...|    17|
|[interesting, art...|    18|
|[reddit, got, a, ...|    19|
+--------------------+------+
only showing top 20 rows



In [44]:
# Unfolds each document giving a set of tokens belonging to each document.
unfolded_documents = documents.select(col('document'), col('doc_id'), explode(col('document')).alias('token'))
unfolded_documents.show()

+--------------------+------+---------------+
|            document|doc_id|          token|
+--------------------+------+---------------+
|[a, look, at, vie...|     0|              a|
|[a, look, at, vie...|     0|           look|
|[a, look, at, vie...|     0|             at|
|[a, look, at, vie...|     0|        vietnam|
|[a, look, at, vie...|     0|            and|
|[a, look, at, vie...|     0|         mexico|
|[a, look, at, vie...|     0|        exposes|
|[a, look, at, vie...|     0|            the|
|[a, look, at, vie...|     0|           myth|
|[a, look, at, vie...|     0|             of|
|[a, look, at, vie...|     0|         market|
|[a, look, at, vie...|     0|liberalisation.|
|[the, site, state...|     1|            the|
|[the, site, state...|     1|           site|
|[the, site, state...|     1|         states|
|[the, site, state...|     1|          "what|
|[the, site, state...|     1|            can|
|[the, site, state...|     1|              i|
|[the, site, state...|     1|     

In [45]:
from pyspark.sql.window import Window

# Calculates the term frequencies (tf) for each document, 
# i.e. the number of occurrences of a term in a document divided by the total number of terms in the document.
token_tf = unfolded_documents.groupBy('doc_id', 'token') \
                             .agg(count('document').alias('tf')) \
                             .withColumn('tf', col('tf') / sum('tf').over(Window.partitionBy('doc_id')))

token_tf.show()

+------+---------------+--------------------+
|doc_id|          token|                  tf|
+------+---------------+--------------------+
|     0|        exposes| 0.08333333333333333|
|     0|            and| 0.08333333333333333|
|     0|           myth| 0.08333333333333333|
|     0|         mexico| 0.08333333333333333|
|     0|             at| 0.08333333333333333|
|     0|         market| 0.08333333333333333|
|     0|        vietnam| 0.08333333333333333|
|     0|            the| 0.08333333333333333|
|     0|           look| 0.08333333333333333|
|     0|liberalisation.| 0.08333333333333333|
|     0|              a| 0.08333333333333333|
|     0|             of| 0.08333333333333333|
|     1|          there|0.009009009009009009|
|     1|           your|0.009009009009009009|
|     1|          specs|0.018018018018018018|
|     1|           they|0.009009009009009009|
|     1|           web.|0.009009009009009009|
|     1|             me|0.018018018018018018|
|     1|       personal|0.00900900

In [46]:
# Calculates the document frequencies (df) for each term, i.e. the number of documents having this term.  
token_df = unfolded_documents.groupBy('token') \
                             .agg(countDistinct('doc_id').alias('df')) \
                             .sort('df', ascending=False) 
token_df.show()

+-----+---+
|token| df|
+-----+---+
|  the|618|
|   to|523|
|    a|476|
|   of|452|
|   is|427|
|  and|421|
|    i|412|
| that|374|
|   in|359|
|   it|302|
| this|300|
|  for|291|
|  you|258|
|   on|247|
|   be|242|
| have|224|
|  but|218|
|  are|214|
|  not|212|
| with|204|
+-----+---+
only showing top 20 rows



In [47]:
from math import log10

# Total number of documents in the corpus.
N = documents.count()

# Calcutates the inverse document frequency (idf) of a term where
# N is the total number of documents in the corpus and  
# df is the document frequencies for that term.
def idf(N, df):
    return log10(N / df)

# Defines an UDF for the idf() function
udf_idf = udf(lambda df: idf(N, df))

# Calculates the idf for each term 
token_idf = token_df.withColumn('idf', udf_idf(col('df')))

token_idf.show()

+-----+---+-------------------+
|token| df|                idf|
+-----+---+-------------------+
|  the|618|0.18901489649493047|
|   to|523| 0.2615016827164721|
|    a|476| 0.3023964188632532|
|   of|452| 0.3248649367723642|
|   is|427| 0.3495754965587225|
|  and|421|  0.355721275748078|
|    i|412|0.36510615555061177|
| that|374|0.40713176938326623|
|   in|359|0.42490892300542715|
|   it|302|0.49999642862659566|
| this|300| 0.5028821168640839|
|  for|291|  0.516110382597839|
|  you|258| 0.5683836656205161|
|   on|247| 0.5873064183240806|
|   be|242| 0.5961880056033151|
| have|224| 0.6297553532495835|
|  but|218| 0.6415468779791416|
|  are|214| 0.6495895982345555|
|  not|212| 0.6536675106549948|
| with|204| 0.6703732041578476|
+-----+---+-------------------+
only showing top 20 rows



In [48]:
# Calcutates the term frequency–inverse document frequency (tf-idf) for each term as the product of the term's tf and idf.
token_tf_idf = token_tf.join(token_idf, ['token']) \
                       .withColumn('tf_idf', col('tf') * col('idf')) 
token_tf_idf.show()

+---------------+------+--------------------+---+-------------------+--------------------+
|          token|doc_id|                  tf| df|                idf|              tf_idf|
+---------------+------+--------------------+---+-------------------+--------------------+
|        exposes|     0| 0.08333333333333333|  3|  2.502882116864084| 0.20857350973867364|
|            and|     0| 0.08333333333333333|421|  0.355721275748078|0.029643439645673165|
|           myth|     0| 0.08333333333333333|  4|  2.377943380255784| 0.19816194835464865|
|         mexico|     0| 0.08333333333333333|  1| 2.9800033715837464| 0.24833361429864553|
|             at|     0| 0.08333333333333333|160| 0.7758833889278216| 0.06465694907731846|
|         market|     0| 0.08333333333333333|  6| 2.2018521212001025|  0.1834876767666752|
|        vietnam|     0| 0.08333333333333333|  2|  2.678973375919765| 0.22324778132664708|
|            the|     0| 0.08333333333333333|618|0.18901489649493047| 0.01575124137457754|

In [49]:
spark.stop()