### Key Terms: 
calculate the TF-IDF for a given subreddit.

Produce a Tag Cloud of the terms (note: this doesn’t have to be integrated into your code; simply including the image is enough).

In [1]:
subreddit='newzealand'
# word = 
concatenatedString = ""

#### Loading dataset 

In [2]:
# from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import from_json, col
conf = SparkConf().setAppName('FirstSpark2').setMaster('Spark')
sc = SparkContext.getOrCreate()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.json("hdfs://orion11:20001/sample_sampled_reddit/")

#### Filtering data

In [3]:
df2 = df.filter(
~(df.body.like('[deleted]'))
    & ~(df.body.isNull())
    & ~(df.author.like('[deleted]'))
    & ~(df.author.like('AutoModerator')) 
    & ~(df.author.rlike("[bB][oO][tT]"))

)

print(type(df2))
print(df2.count())
df2.show(1)

<class 'pyspark.sql.dataframe.DataFrame'>
27303462
+--------+--------------+--------------+----------------------+-----------------+--------------------+---------+----------------+-------+-----------+-------------+-----+------+------+-------+---------+-----------+----+---------+--------------+-------+------------+-----+-----+------------+--------+---------------+------------+---+------------+
|archived|        author|author_cakeday|author_flair_css_class|author_flair_text|                body|body_html|controversiality|created|created_utc|distinguished|downs|edited|gilded|     id|  link_id|mod_reports|name|parent_id|removal_reason|replies|retrieved_on|saved|score|score_hidden|stickied|      subreddit|subreddit_id|ups|user_reports|
+--------+--------------+--------------+----------------------+-----------------+--------------------+---------+----------------+-------+-----------+-------------+-----+------+------+-------+---------+-----------+----+---------+--------------+-------+--------

In [4]:
subCount = df2.select("subreddit").distinct().count()
subCount

88429

#### Getting Query for chosen subreddit :

In [5]:
#subreddit='apple'
query_df = df2#.filter(df2.subreddit==subreddit)

# print(type(query_df))
# print(query_df.count())
# query_df.show(2)

#### Getting words in body of filtered subreddit rows

In [6]:
import pyspark.sql.functions as func
import re
from pyspark.sql.types import StringType, DoubleType, IntegerType

def preProcessBody(text):
    # lowercase
    text=text.lower()
    text=re.sub("[^A-Za-z]+"," ",text)
    return text.strip()

# print(preProcessBody("who_is_there"))

preProcessBodyUdf = func.udf(preProcessBody, StringType())

commentsTokensDF = (query_df
                    .select(
                        "subreddit",
                        func.explode(func.split(preProcessBodyUdf(query_df.body), "\s+")).alias("word")
                    )
                   )

#query_df.unpersist()
# commentsTokensDF.cache()

print(type(commentsTokensDF))
print(commentsTokensDF.count())
commentsTokensDF.show()

<class 'pyspark.sql.dataframe.DataFrame'>
903761043
+---------------+--------+
|      subreddit|    word|
+---------------+--------+
|weddingplanning|       i|
|weddingplanning|    need|
|weddingplanning|    hash|
|weddingplanning|     tag|
|weddingplanning|    help|
|weddingplanning|      my|
|weddingplanning|    last|
|weddingplanning|    name|
|weddingplanning|      is|
|weddingplanning|  sitter|
|weddingplanning|     and|
|weddingplanning|     his|
|weddingplanning|      is|
|weddingplanning|  ritter|
|weddingplanning|       i|
|weddingplanning|      ll|
|weddingplanning|      be|
|weddingplanning|changing|
|weddingplanning|      my|
|weddingplanning|    last|
+---------------+--------+
only showing top 20 rows



In [7]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords
stopWords = stopwords.words()

commentsTokensDF = commentsTokensDF.filter(commentsTokensDF.word.isin(*stopWords) == False)

print(commentsTokensDF.count())

[nltk_data] Downloading package stopwords to /home4/ajha6/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


433093657


#### calculating tf

In [8]:
%%time
wordTf = (commentsTokensDF.groupBy("subreddit","word")
        .agg(func.count("subreddit").alias("tf")))
# wordTf.cache()

print(type(wordTf))
print(wordTf.count())
wordTf.show()

<class 'pyspark.sql.dataframe.DataFrame'>
47460196
+-------------------+-----------+----+
|          subreddit|       word|  tf|
+-------------------+-----------+----+
|       windowsphone|        uwp|  95|
|                rit|     campus| 148|
|               pics|      rules|2098|
|    leagueoflegends|       case|3473|
|                wow|  sacrifice| 108|
|                DnD|     better| 946|
|              piano|    entered|   1|
|      AdviceAnimals|transported|  10|
|      AdviceAnimals|        ufo|   8|
|         television|    content| 383|
|                wow|       back|4536|
|      SquaredCircle|      clash| 148|
|            OkCupid|        tbh|  87|
|            teenmom|       feel| 240|
|            xboxone|         tv|1222|
|    TwoXChromosomes|       cute| 405|
|      todayilearned|        lol|2054|
|PoliticalDiscussion|     reason|1303|
|            Fitness|  guideline|  23|
|     DestinyTheGame|        roi| 419|
+-------------------+-----------+----+
only showing 

#### calculating df

In [9]:
%%time
wordDf = (commentsTokensDF.groupBy("word")
        .agg(func.countDistinct("subreddit").alias("df")))
# commentsTokensDF.unpersist()
# wordDf.cache()
wordDf.orderBy(wordDf.df, ascending=False).show()

+------+-----+
|  word|   df|
+------+-----+
|  like|33541|
|   get|29643|
| would|28875|
|  http|27192|
|  good|26544|
| think|26339|
|  know|25847|
|  time|25223|
|really|25154|
|   www|24593|
|   see|24042|
|  well|23472|
|people|23369|
|  much|23176|
|  make|22796|
| could|22565|
|thanks|22091|
|    go|22046|
| https|21941|
| still|21590|
+------+-----+
only showing top 20 rows

CPU times: user 32.6 ms, sys: 15.7 ms, total: 48.2 ms
Wall time: 52.6 s


#### calculating idf

In [10]:
%%time
import pyspark.sql.functions as func
from pyspark.sql.types import StringType, DoubleType
import string
import re
import math

def calcIdf(docCount, df):
    return math.log((float(docCount) + 1) / (float(df) + 1))
#     return math.log((float(docCount) + 1))

calcIdfUdf = func.udf(calcIdf, DoubleType())


wordIdf = (wordDf
           .withColumn("idf", calcIdfUdf(func.lit(subCount), wordDf.df)))
# wordDf.unpersist()
# wordIdf.cache()
wordIdf.show()

+----------+-----+------------------+
|      word|   df|               idf|
+----------+-----+------------------+
|      earl|  677|4.8708192696293855|
|commanders|  398| 5.401005140679917|
|    brands| 1916|3.8314498145241362|
|     yoshi|  341| 5.555155820507175|
|    online|11389| 2.049475501128553|
|  tripping| 1102| 4.384177538316278|
|  nicotine|  564|5.0531408264233395|
|      neet|  169| 6.254168120519519|
|   tanques|    7| 9.310525015889946|
|      hope|14654| 1.797429704424788|
|  fiscally|  302| 5.676233752060412|
| recognize| 3858| 3.131803196032162|
| litterers|   17| 8.499594799673616|
|    harder| 5942|  2.69999722220312|
|     papeg|    1|10.696819377009836|
| arguments| 2843|3.4369997666466494|
|     still|21590| 1.409934717393867|
| standards| 3488| 3.232596116383007|
|       art| 6541|2.6039683494714514|
| connected| 3791| 3.149317694194868|
+----------+-----+------------------+
only showing top 20 rows

CPU times: user 17.8 ms, sys: 16.9 ms, total: 34.7 ms
Wall tim

#### Filtering for a subreddit

In [11]:
%%time
selectSub = subreddit
selectSubTf = wordTf.filter(wordTf.subreddit.like(selectSub))

# wordIdf.unpersist()

selectSubTf.show()

+----------+--------------+---+
| subreddit|          word| tf|
+----------+--------------+---+
|newzealand|        system|259|
|newzealand|     countries|155|
|newzealand|        double| 47|
|newzealand|         happy|147|
|newzealand|          boat| 32|
|newzealand|      tailored|  3|
|newzealand|      building|121|
|newzealand|         march| 23|
|newzealand|         texts| 17|
|newzealand|sterilisations|  1|
|newzealand|          jerk| 12|
|newzealand|      curtains|  5|
|newzealand|         share| 50|
|newzealand|        motifs|  1|
|newzealand|        degree| 84|
|newzealand|          move|160|
|newzealand|     dickinson|  1|
|newzealand|  unreasonable| 11|
|newzealand|     manhattan|  1|
|newzealand|   trafficking|  4|
+----------+--------------+---+
only showing top 20 rows

CPU times: user 11.8 ms, sys: 4.85 ms, total: 16.6 ms
Wall time: 23.5 s


#### Calculating tdf-idf

In [12]:
%%time
selectedSubTfIdf = (selectSubTf
      .join(wordIdf, ["word"],how='left')
      .withColumn("tf_idf", wordTf.tf * wordIdf.idf))

selectedSubTfIdf.sort(func.desc("tf_idf")).show()

+----------+----------+----+-----+------------------+------------------+
|      word| subreddit|  tf|   df|               idf|            tf_idf|
+----------+----------+----+-----+------------------+------------------+
|        nz|newzealand|1694| 1738|3.9289010432154976| 6655.558367207053|
|    people|newzealand|2433|23369|1.3307581300368772| 3237.734530379722|
|  auckland|newzealand| 515|  270|  5.78784773669008| 2980.741584395391|
|        gt|newzealand|1548|19047| 1.535249169403082| 2376.565714235971|
|   zealand|newzealand| 548| 1270| 4.242407286380327| 2324.839192936419|
|     would|newzealand|2056|28875| 1.119200478310729| 2301.076183406859|
|      like|newzealand|2323|33541|0.9694128936782076|2251.9461520144764|
|       get|newzealand|1959|29643|1.0929515344985314| 2141.092056082623|
|     think|newzealand|1568|26339|1.2111225822725087|1899.0402090032935|
|     maori|newzealand| 280|  179|  6.19700970667957|1735.1627178702797|
|       new|newzealand|1140|20515|1.461006209060621

In [15]:
df_fDist = selectedSubTfIdf.select('word', 'tf_idf').orderBy('tf_idf', ascending=False) #converting RDD to spark dataframe
df_fDist.createOrReplaceTempView("myTable") 
# df211 = spark.sql("SELECT _1 AS Keywords, _2 as Frequency from myTable limit 20") #renaming columns 
pandD = df_fDist.toPandas() #converting spark dataframes to pandas dataframes

pandD.head(2)

Unnamed: 0,word,tf_idf
0,nz,6655.558367
1,people,3237.73453


In [16]:
df_fDist.limit(500).write.csv('hdfs://orion11:20001/wordCloud4.csv')

#### Checking word cloud

![wordCloud](https://drive.google.com/file/d/17TYbMWBAosbh1x8VJK_CskhlEJ3Tab80/view?usp=sharing)

link - https://drive.google.com/file/d/17TYbMWBAosbh1x8VJK_CskhlEJ3Tab80/view?usp=sharing