## Import Dask and distributed scheduler

In [1]:
from distributed import Executor, hdfs, progress
e = Executor('127.0.0.1:8786')
e

<Executor: scheduler=127.0.0.1:8786 workers=56 threads=56>

## Load JSON data from HDFS using Python

In [2]:
import json
lines = hdfs.read_text('/user/hive/warehouse/reddit_json/RC_2015-*.json')  # 242 GB
js = lines.map(json.loads)

Setting global dask scheduler to use distributed


## Filter out movies subreddit and persist

In [3]:
movies = js.filter(lambda d: 'movies' in d['subreddit'])
movies = e.persist(movies)
progress(movies)

In [5]:
%time movies.take(10)[6]

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 18.2 ms


{u'archived': False,
 u'author': u'Da_Sau5_Boss',
 u'author_flair_css_class': None,
 u'author_flair_text': None,
 u'body': u'Dawn of the Apes was such an incredible movie, it should be up there in my opinion.',
 u'controversiality': 0,
 u'created_utc': u'1420070489',
 u'distinguished': None,
 u'downs': 0,
 u'edited': False,
 u'gilded': 0,
 u'id': u'cnasaeq',
 u'link_id': u't3_2qyq18',
 u'name': u't1_cnasaeq',
 u'parent_id': u't3_2qyq18',
 u'retrieved_on': 1425124264,
 u'score': 1,
 u'score_hidden': False,
 u'subreddit': u'movies',
 u'subreddit_id': u't5_2qh3s',
 u'ups': 1}

## Natural language processing with NLTK

In [6]:
import nltk
sentence = 'Dawn of the Apes was such an incredible movie, it should be up there in my opinion.'

In [7]:
words = nltk.word_tokenize(sentence)
words

['Dawn',
 'of',
 'the',
 'Apes',
 'was',
 'such',
 'an',
 'incredible',
 'movie',
 ',',
 'it',
 'should',
 'be',
 'up',
 'there',
 'in',
 'my',
 'opinion',
 '.']

In [8]:
nltk.pos_tag(words)

[('Dawn', 'NN'),
 ('of', 'IN'),
 ('the', 'DT'),
 ('Apes', 'NNP'),
 ('was', 'VBD'),
 ('such', 'JJ'),
 ('an', 'DT'),
 ('incredible', 'JJ'),
 ('movie', 'NN'),
 (',', ','),
 ('it', 'PRP'),
 ('should', 'MD'),
 ('be', 'VB'),
 ('up', 'RP'),
 ('there', 'RB'),
 ('in', 'IN'),
 ('my', 'PRP$'),
 ('opinion', 'NN'),
 ('.', '.')]

## Distributed language processing with NLTK: Extracting nouns and filtering words

In [4]:
import nltk
pos = e.persist(movies.pluck('body')              # ["The quick brown fox", ...]
                      .map(nltk.word_tokenize)    # ["The", "quick", "brown", "fox", ...]
                      .map(nltk.pos_tag)          # [("The", "Article"), ("quick", "ADJ"), ...]
                      .concat()                   # [("The", "Article"), ("quick", "ADJ"), ...]
                      .filter(lambda (word, pos): word.isalpha()))

progress(pos)

In [11]:
f = e.compute(pos.filter(lambda (word, type): type == 'NNP')  # [("fox", "NN"), ("dog", "NN"), ...]
                 .pluck(0)  # get words                      # ["fox", "dog", ...]
                 .frequencies()                              # [("fox", 435), ("dog", 1952), ...]
                 .topk(10, lambda (word, count): count))     # [("dog", 1952), ...]
progress(f)

In [12]:
f.result()

[(u'Marvel', 35452),
 (u'Star', 34849),
 (u'Batman', 31749),
 (u'Wars', 28875),
 (u'Man', 26423),
 (u'John', 25304),
 (u'Superman', 22476),
 (u'Hollywood', 19840),
 (u'Max', 19558),
 (u'CGI', 19304)]

## Visualize with Bokeh

In [13]:
import pandas as pd
df = pd.DataFrame(f.result(), columns=['Noun', 'Count'])

from bokeh.charts import Bar, show, output_notebook
from bokeh.charts.attributes import cat
output_notebook()

p = Bar(df,
        label=cat(columns='Noun', sort=False),
        values='Count',
        title='Top N nouns in r/movies subreddit')
show(p)