In [1]:
import pyspark.sql.functions as func
from pyspark.sql.types import StringType, DoubleType
import string
import re
import math

### Get sample data

In [16]:
%%time
sampleDF = sqlContext.read.json("hdfs://orion11:11001/sampled_reddit/*")
sampleDF.cache()
print(sampleDF.count())

309199315
CPU times: user 60.5 ms, sys: 37.4 ms, total: 97.9 ms
Wall time: 9min 8s


In [None]:
sampleDFv2 = sampleDF.sample(False, 0.1)
sampleDFv2.write.format('json').save('hdfs://orion11:11001/sampled_reddit_v2')

In [2]:
%%time
sampleDFv2 = spark.read.format('json').load('hdfs://orion11:11001/sampled_reddit_v2/*')
sampleDFv2.cache()
print(sampleDFv2.count())

30907764
CPU times: user 16.8 ms, sys: 7.73 ms, total: 24.5 ms
Wall time: 1min 18s


### Filter all comment generate by bot/ comment that is delete

In [17]:
%%time
botExpr = "[bB][oO][tT]"

filteredComment = (sampleDF
                   .filter(~(sampleDF.body.like("[deleted]") 
                             | sampleDF.body.like('[removed]') 
                             | sampleDF.author.rlike(botExpr)
                            )
                          )
                  )
sampleDF.unpersist()
filteredComment.cache()
print(filteredComment.count())

286117107
CPU times: user 37 ms, sys: 26.5 ms, total: 63.5 ms
Wall time: 5min 45s


### Calculate number of subreddit

In [18]:
subCount = filteredComment.select("subreddit").distinct().count()

### Generate TF (Term Frequency) for each subreddit

Function to change all special characters to white space

In [19]:

def preProcessBody(text):
    # lowercase
    text=text.lower()
    
    # remove special characters and digits
#     text=re.sub("(\\d|[^\\w|\\s]|(\_))+","",text)
#     text=re.sub("(\\s)+"," ",text)
    text=re.sub("[^A-Za-z]+"," ",text)
    return text.strip()

print(preProcessBody("dan_aykroyd_calls"))

preProcessBodyUdf = func.udf(preProcessBody, StringType())

dan aykroyd calls


Split body to get the list of word in each subreddit

In [20]:
%%time
commentsTokensDF = (filteredComment
                    .select(
                        "subreddit",
                        func.explode(func.split(preProcessBodyUdf(filteredComment.body), "\s+")).alias("word")
                    )
                   )
filteredComment.unpersist()
commentsTokensDF.cache()
commentsTokensDF.show()

+---------+--------+
|subreddit|    word|
+---------+--------+
|AskReddit|       i|
|AskReddit|    read|
|AskReddit|     the|
|AskReddit|   title|
|AskReddit|     and|
|AskReddit| thought|
|AskReddit|      of|
|AskReddit|    that|
|AskReddit|cheating|
|AskReddit|   bitch|
|AskReddit|   clown|
|AskReddit|    from|
|AskReddit|     the|
|AskReddit|glassjaw|
|AskReddit|   video|
|AskReddit| because|
|AskReddit|     you|
|AskReddit|      re|
|AskReddit|   about|
|AskReddit|      to|
+---------+--------+
only showing top 20 rows

CPU times: user 3.55 ms, sys: 3.52 ms, total: 7.07 ms
Wall time: 6.24 s


In [21]:
%%time
wordTf = (commentsTokensDF.groupBy("subreddit","word")
        .agg(func.count("subreddit").alias("tf")))
wordTf.cache()
wordTf.show()

+-----------+--------+------+
|  subreddit|    word|    tf|
+-----------+--------+------+
|programming|     and|420115|
|   politics|  theory| 21543|
| technology|    time| 81105|
|  worldnews| useless|  5748|
|     canada|   there| 96106|
|  AskReddit|    flaw|  5023|
|      scifi|   first|  8026|
|     videos|      be|503413|
|   politics| biggest| 21209|
|environment|possible|   985|
|   politics|slightly| 10076|
|   politics| captain|  1559|
|   politics|    shit|147387|
|     videos|   there|273701|
|     videos|    sure| 78840|
|        WTF|     pre|  3063|
|  AskReddit|   strip| 18294|
|      Music|  before| 47645|
|   business|     the|193401|
|   business|    that| 71784|
+-----------+--------+------+
only showing top 20 rows

CPU times: user 156 ms, sys: 58.8 ms, total: 215 ms
Wall time: 17min 37s


### Calculate DF(Document frequence)

In [22]:
%%time
wordDf = (commentsTokensDF.groupBy("word")
        .agg(func.countDistinct("subreddit").alias("df")))
commentsTokensDF.unpersist()
wordDf.cache()
wordDf.show()

+------------------+-----+
|              word|   df|
+------------------+-----+
|             still|55613|
|foreveralonedating|   47|
|           barrier| 6319|
|              some|71724|
|             anime| 7112|
|             those|48150|
|      accumulation| 1946|
|         viewpoint| 4384|
|            harder|15883|
|       piljrhjuvew|    2|
|               art|22895|
|             spoil| 4860|
|              hope|39658|
|          ligament|  787|
|         imitation| 2762|
|       battlefront| 1281|
|           persist| 3143|
|           flashed| 2517|
|            whaaat| 1412|
|          incoming| 6075|
+------------------+-----+
only showing top 20 rows

CPU times: user 295 ms, sys: 174 ms, total: 468 ms
Wall time: 13min 36s


### Calculate IDF(Inverse Document Frequency)
Base on DF, we will calculate IDF
IDF(t,D) = log[ (|D| + 1) / (DF(t,D) + 1) ]

In [23]:
def calcIdf(docCount, df):
    return math.log((float(docCount) + 1) / (float(df) + 1))
#     return math.log((float(docCount) + 1))

calcIdfUdf = func.udf(calcIdf, DoubleType())

In [24]:
%%time
wordIdf = (wordDf
           .withColumn("idf", calcIdfUdf(func.lit(subCount), wordDf.df)))
wordDf.unpersist()
wordIdf.cache()
wordIdf.show()

+-------------+-----+------------------+
|         word|   df|               idf|
+-------------+-----+------------------+
|    connected|11822| 2.985299969173641|
|  transmitted| 2131|4.6982862497510185|
|         some|71724|1.1825073941871884|
|       harder|15883| 2.690034442785612|
|     whaaaaat| 1071| 5.385820693406007|
|          few|45680|1.6336642995279118|
|         hope|39658|1.7750288476025882|
|     debunked| 2477| 4.547894972847667|
|    recognize|11571|3.0067583689407282|
|gratification| 2457| 4.555998744910774|
|          art|22895|2.3243845332405217|
|socialization| 1374| 5.136893024936082|
|        monte| 1957| 4.783423211946299|
|        those|48150|  1.58100484959416|
|    indicator| 5867|3.6858328957738817|
|   fermenting|  597| 5.969511281086122|
|      rfcpool|    1|11.669954854476808|
|        still|55613|1.4369117879328663|
|   kingseeker|   25| 9.105005497015272|
|     priority| 9363|3.2184742064259524|
+-------------+-----+------------------+
only showing top

### Calculate TF-IDF for selected subreddt
Calculate TF-IDF by getting the TF of selected subreddit, then multiply it with calculated IDF

In [25]:
%%time
selectSub = "fireemblem"
selectSubTf = wordTf.filter(wordTf.subreddit.like(selectSub))
selectSubTf.show()

+----------+----------+-----+
| subreddit|      word|   tf|
+----------+----------+-----+
|fireemblem|  carrying|  161|
|fireemblem|    curves|   21|
|fireemblem|    fockin|    3|
|fireemblem|highlander|    2|
|fireemblem|       ing|   85|
|fireemblem| localised|   36|
|fireemblem|       one|22464|
|fireemblem|       saw| 1163|
|fireemblem|     stage|  210|
|fireemblem|    aether|  412|
|fireemblem|appearance|  294|
|fireemblem|   choices|  520|
|fireemblem|         g|  640|
|fireemblem|      grit|   13|
|fireemblem|   harsher|    8|
|fireemblem|  included|  296|
|fireemblem|   partner|  414|
|fireemblem|   closely|   66|
|fireemblem|     terms|  784|
|fireemblem|      tits|   91|
+----------+----------+-----+
only showing top 20 rows

CPU times: user 2.28 ms, sys: 472 µs, total: 2.76 ms
Wall time: 138 ms


In [27]:
%%time
selectedSubTfIdf = (selectSubTf
      .join(wordIdf, ["word"],how='left')
      .withColumn("tf_idf", wordTf.tf * wordIdf.idf))

CPU times: user 1.73 ms, sys: 230 µs, total: 1.96 ms
Wall time: 14.9 ms


In [30]:
selectedSubTfIdf.sort(func.desc("tf_idf")).show(100)

+----------+----------+------+------+-------------------+------------------+
|      word| subreddit|    tf|    df|                idf|            tf_idf|
+----------+----------+------+------+-------------------+------------------+
|        fe|fireemblem| 22885|  6198| 3.6309587673348345| 83094.49139045768|
|       the|fireemblem|221970|164283|0.35375011858968597| 78521.91382335259|
|         i|fireemblem|184815|154085| 0.4178358679530063| 77222.33593573487|
|       and|fireemblem|133288|145529|0.47496450512938215|63307.068959685086|
|        to|fireemblem|143585|156128|0.40466416744067746|58103.704481969675|
|      that|fireemblem| 86926|124927| 0.6276091847040444| 54555.55598958376|
|        it|fireemblem| 97826|134417| 0.5543924083617696| 54233.99174039847|
|         a|fireemblem|136153|158611|0.38888578767322224| 52947.96664907223|
|       you|fireemblem| 86694|129987| 0.5879046175519596| 50967.80291404958|
|         s|fireemblem| 86509|129846| 0.5889899217938064|50952.929144460395|

As you can see, many work in here didn't make sense when standalone like (i, and, it, a, of, etc.)  
So we need to filter it out, in this project we use nltk library with stopwords to filter all of that

In [33]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords
stopWords = stopwords.words()

[nltk_data] Downloading package stopwords to /home4/hpbui/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [45]:
(selectedSubTfIdf
 .select("word", "tf_idf")
 .filter(selectedSubTfIdf.word.isin(*stopWords) == False)
 .sort(func.desc("tf_idf"))
 .show(100))

+----------+------------------+
|      word|            tf_idf|
+----------+------------------+
|        fe| 83094.49139045768|
|      game|43631.672126168676|
|    corrin| 39396.38380247276|
|     units|37982.897272947914|
| awakening| 37872.55254421729|
|      like|32942.633562011244|
|    emblem|31996.905642781894|
|     fates|30770.915139697416|
|   chapter|30548.201362604625|
|     chrom| 25534.42309782244|
|    really|25004.524652778502|
|      unit| 24616.88490053598|
|characters|24071.708261331292|
| character|23585.985889383137|
|     would|23359.745884367996|
|  conquest|20558.557685208278|
|      good|20385.601273246928|
|       get|19787.592041238448|
|     games|19423.484548911947|
|      even|19421.936726754284|
|      nohr|19153.733863832244|
|     think|19121.913405633608|
|       ike| 18576.95590531542|
|      fire|18005.140382141424|
|      much| 17214.30337370549|
|    lucina| 17113.72224186086|
|birthright| 16984.97127053859|
|       use| 16544.41568688506|
|    pre

In [41]:
(selectedSubTfIdf
 .select("word", "tf_idf")
 .filter(selectedSubTfIdf.word.isin(*stopWords) == False)
 .sort(func.desc("tf_idf"))
 .limit(2000)
 .coalesce(1).write.csv('hdfs://orion11:11001/wordCloud.csv'))