# Hacker News Pipeline

We will use the pipeline we have been building, and apply it to a real world data pipeline project. From a JSON API, we will filter, clean, aggregate, and summarize data in a sequence of tasks that will apply these transformations for us.
The data we will use comes from a Hacker News (HN) API that returns JSON data of the [top stories in 2014](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON).

We have already downloaded a list of JSON posts to a file called hn_stories_2014.json. The JSON file contains a single key stories, which contains a list of stories (posts). Each post has a set of keys, but we will deal only with the following keys:

* created_at: A timestamp of the story's creation time.
* created_at_i: A unix epoch timestamp.
* url: The URL of the story link.
* objectID: The ID of the story.
* author: The story's author (username on HN).
* points: The number of upvotes the story had.
* title: The headline of the post.
* num_comments: The number of a comments a post has.



In [1]:
#import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

## Design pipeline

In [3]:
import json
import io
import csv
import re
from pipeline import Pipeline, build_csv
from datetime import datetime
from dateutil.parser import parse
from stop_words import stop_words

pipeline = Pipeline()

# read json hn_stories_2014.json and return the data as a list
@pipeline.task()
def file_to_json():
    filename = 'hn_stories_2014.json'
    with open(filename, 'r') as f:
        hn_dict = json.load(f)
    return hn_dict['stories']

# fildter stories only with more than 50 points and 1 comment
# we also discard Ask HN comments
@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):            
    def inner():
        for story in stories:
            not_ask = story['title'].find('Ask HN ') == -1
            if story['points'] > 50 and story['num_comments'] and not_ask:
                yield story
        
    return inner

@pipeline.task(depends_on=filter_stories)
def json_to_csv(filtered_stories):
    def parsed_func():
        gen = filtered_stories()
        fields =[
            'objectID', 'created_at', 
            'url', 'points', 'title'
                ]
        for row in gen:
            parsed = []
            for fld in fields:
                fld_str = str(row[fld])
                if fld=='created_at':
                    fld_str = str(parse(fld_str))
                parsed.append(fld_str)        
            yield parsed

    parse_gen = parsed_func()
    csv_file = build_csv(
        parse_gen,
        #header=fields,
        # Using file-like object instead of `temporary.csv`.            
        file=io.StringIO()
    )
    return csv_file

@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    def inner():
        csv_rows = csv.reader(csv_file, delimiter=',')
        header = next(csv_rows)
        for row in csv_rows:
            yield row[4]
    return inner

@pipeline.task(depends_on=extract_titles)
def clean_titles(title_gen):
    def inner():
        for title in title_gen():
            title = re.sub(r"[^\w\d\s]", "", title).lower()
            title = re.sub(r"\s+", r" ", title)
            yield title
    return inner

@pipeline.task(depends_on=clean_titles)
def build_keyword_dictionary(cleaned_gen):
    keyword_dict = {}
    for title in cleaned_gen():
        words = title.split()
        for word in words:
            if word not in stop_words:
                if word in keyword_dict:
                    keyword_dict[word] += 1outputs

## Test pipeline

In [40]:
stories = file_to_json()

In [41]:
gen = filter_stories(stories)()
test = next(gen)

In [42]:
csv_file = json_to_csv(filter_stories(stories))

In [43]:
title_gen = extract_titles(csv_file)

In [44]:
clean_gen = clean_titles(title_gen)

In [4]:
outputs = pipeline.run()

In [26]:
sorted_keywords = sorted(outputs[build_keyword_dictionary].items(), key=lambda x: x[1])
top100 = sorted_keywords[-100:][::-1]
for word, counts in top100:
    print('word: '+word+', counts: '+str(counts))

word: new, counts: 184
word: google, counts: 166
word: ask, counts: 124
word: bitcoin, counts: 101
word: open, counts: 96
word: programming, counts: 91
word: web, counts: 89
word: data, counts: 87
word: video, counts: 78
word: python, counts: 76
word: code, counts: 73
word: facebook, counts: 71
word: released, counts: 70
word: using, counts: 69
word: source, counts: 68
word: free, counts: 65
word: game, counts: 64
word: javascript, counts: 64
word: 2014, counts: 64
word: 2013, counts: 64
word: internet, counts: 62
word: c, counts: 59
word: linux, counts: 58
word: microsoft, counts: 58
word: work, counts: 58
word: dont, counts: 57
word: app, counts: 57
word: pdf, counts: 54
word: software, counts: 53
word: language, counts: 53
word: use, counts: 52
word: startup, counts: 51
word: make, counts: 49
word: apple, counts: 49
word: time, counts: 48
word: security, counts: 47
word: yc, counts: 47
word: github, counts: 44
word: nsa, counts: 44
word: like, counts: 43
word: windows, counts: 43
wo

## Conclusion
The results seem coherent, the most common words are new, google, ask, programming, bitcoin. All seem very common words in the field of programming. Our pipeline seems to work fine! We see that the simplicity and versatility of the pipeline allows to perform streamlined with little effort.