For this Pipeline building task, the data we will use comes from a Hacker News (HN) API that returns JSON data of the top stories in 2014.

In [18]:
from datetime import datetime
from pipeline import build_csv, Pipeline
from stop_words import stop_words

pipeline = Pipeline()

import json
import io
import csv
import string

In [19]:
@pipeline.task()
def file_to_json():
  with open('hn_stories_2014.json', 'r') as f:
    data = json.load(f)
    stories = data['stories']
    return stories

In [20]:
stories = file_to_json()
print(stories[:1])

[{'story_text': '', 'created_at': '2014-05-29T08:25:40Z', 'story_title': None, 'story_id': None, 'comment_text': None, 'created_at_i': 1401351940, 'url': 'https://duckduckgo.com/settings', 'parent_id': None, 'objectID': '7815290', 'author': 'TuxLyn', 'points': 1, 'title': 'DuckDuckGo Settings', '_tags': ['story', 'author_TuxLyn', 'story_7815290'], 'num_comments': 0, '_highlightResult': {'story_text': {'matchedWords': [], 'value': '', 'matchLevel': 'none'}, 'author': {'matchedWords': [], 'value': 'TuxLyn', 'matchLevel': 'none'}, 'url': {'matchedWords': [], 'value': 'https://duckduckgo.com/settings', 'matchLevel': 'none'}, 'title': {'matchedWords': [], 'value': 'DuckDuckGo Settings', 'matchLevel': 'none'}}, 'story_url': None}]


In [21]:
@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['points'] > 50 and story['num_comments'] > 1 and not story['title'].startswith('Ask HN')

    return (
        story for story in stories
        if is_popular(story)
    )

In [22]:
# convert to csv
@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
  lines = []
  for story in stories:
    lines.append(
        (story['objectID'], datetime.strptime(story['created_at'], "%Y-%m-%dT%H:%M:%SZ"), story['url'], story['points'], story['title'])
        )
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())


In [23]:
# extract title column
@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):   # this return a generator of every story title
  reader = csv.reader(csv_file)
  header = next(reader)
  print('header: ', header)

  idx = header.index('title')
  print('idx: ', idx)
  return (line[idx] for line in reader)

In [24]:
# clean the titles output
@pipeline.task(depends_on=extract_titles)
def clean_title(titles):
  def inner(title):
    processed = title.lower()
    processed.replace(string.punctuation," ")
    return processed
  return (inner(title) for title in titles)

# def clean_title(titles):
#     for title in titles:
#         title = title.lower()
#         title = ''.join(c for c in title if c not in string.punctuation)
#         yield title

In [25]:
# Create the Word Frequency Dictionary
@pipeline.task(depends_on=clean_title)

# function that returns a dictionary of the word frequency of all
# HN titles.
def build_keyword_dictionary(titles):
    word_freq = {}
    for title in titles:
        for word in title.split(' '):
            if word and word not in stop_words:
                if word not in word_freq:
                    word_freq[word] = 1
                word_freq[word] += 1
    return word_freq

In [26]:
# sort the top words
# pipeline.task() function that depends on the build_keyword_dictionary() function
@pipeline.task(depends_on=build_keyword_dictionary)

# function that returns a list of the top 100 tuples
def top_words(word_freq):
    freq_tuple = [
        (word, word_freq[word])
        for word in sorted(word_freq, key=word_freq.get, reverse=True)
    ]
    return freq_tuple[:100]

# print the output of the new task function
ran = pipeline.run()
print(ran[top_words])

header:  ['objectID', 'created_at', 'url', 'points', 'title']
idx:  4
[('true', 2), ('goodbye:', 2), ('‘using', 2), ('truecrypt', 2), ('secure’', 2)]
