# Hacker News Pipeline
## Introduction
In this project, we will create a pipeline that returns JSON data of the top stories from [Hacker News](https://news.ycombinator.com/) in 2014. Using this pipeline, we will then run a sequence of basic natural language processing tasks. Our goal is to find the top 100 keywords of Hacker News posts in 2014.

## Pipeline Class
Here we will create a `Pipeline` class that uses multiple dependencies to perform tasks on our data. We will first start by creating a `DAG` (Directed Acyclic Graph) class so we can use it to enhance our pipeline task scheduling.

In [1]:
from datetime import datetime
import itertools
import json
import io
import csv
import string
from collections import deque
from stop_words import stop_words


class DAG():
    def __init__(self):
        self.graph = {}

    def in_degrees(self):
        in_degrees = {}
        for node in self.graph:
            if node not in in_degrees:
                in_degrees[node] = 0
            for pointed in self.graph[node]:
                if pointed not in in_degrees:
                    in_degrees[pointed] = 0
                in_degrees[pointed] += 1
        return in_degrees

    def sort(self):
        in_degrees = self.in_degrees()
        to_visit = deque()
        for node in self.graph:
            if in_degrees[node] == 0:
                to_visit.append(node)

        searched = []
        while to_visit:
            node = to_visit.popleft()
            for pointer in self.graph[node]:
                in_degrees[pointer] -= 1
                if in_degrees[pointer] == 0:
                    to_visit.append(pointer)
            searched.append(node)
        return searched

    def add(self, node, to=None):
        if node not in self.graph:
            self.graph[node] = []
        if to:
            if to not in self.graph:
                self.graph[to] = []
            self.graph[node].append(to)

        if len(self.sort()) != len(self.graph):
            raise Exception


class Pipeline:
    def __init__(self):
        self.tasks = DAG()

    def task(self, depends_on=None):
        def inner(function):
            self.tasks.add(function)
            if depends_on:
                self.tasks.add(depends_on, function)
            return function
        return inner

    def run(self):
        scheduled = self.tasks.sort()
        completed = {}
        for task in scheduled:
            for node, values in self.tasks.graph.items():
                if task in values:
                    completed[task] = task(completed[node])
            if task not in completed:
                completed[task] = task()
        return completed

In [2]:
# Instantiate the pipeline
pipeline = Pipeline()

## Loading the JSON Data
Now we will load a JSON file containing Hacker News stories from 2014 as a Python `dict` object.

In [3]:
@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json', 'r') as f:
        data = json.load(f)
        stories = data['stories']
    return stories

# Filtering the Stories
Now that we have loaded the stories as a list of `dict` objects, we can start filtering them to get the most popular stories of the year. We want to make sure these stories are links (not `Ask HN` posts), have a good number of points, and have at least some comments.

In [4]:
@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['points'] > 50 and story['num_comments'] > 1 and not story['title'].startswith('Ask HN')

    return (
        story for story in stories
        if is_popular(story)
    )

## Converting to CSV
Now that we have our filtered stories, we want to write them to a CSV file so that we can have a consistent data format when running future tasks.

In [5]:
# Create a function to write a file in CSV format
def build_csv(lines, header=None, file=None):
    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

# Write filtered stories into a CSV file
@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    lines = []
    for story in stories:
        lines.append(
            (story['objectID'], datetime.strptime(story['created_at'],
             "%Y-%m-%dT%H:%M:%SZ"), story['url'], story['points'], story['title'])
        )
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

## Extracting Title Column
Now we want to extract the titles of the stories so that we can run a word frequency task.

In [6]:
@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('title')

    return (line[idx] for line in reader)

## Cleaning the Titles
In order to create our word frequency model, we want to make sure the words are consistent throughout each title. For this, we will lower case the titles and remove any punctuation.

In [7]:
@pipeline.task(depends_on=extract_titles)
def clean_title(titles):
    for title in titles:
        title = title.lower()
        title = ''.join(c for c in title if c not in string.punctuation)
        yield title

## Creating the Word Frequency Dictionary
Now we can create a word frequency dictionary that takes in words as keys and their frequencies as values. 

We also want to make sure this dictionary does not include any stop words, which are words that occur frequently (such as "the", "or", etc.) and are commonly rejected in keyword searches. For this, we already imported a module called `stop_words` which includes a tuple of the most commonly used stop words in the English language.

In [8]:
@pipeline.task(depends_on=clean_title)
def build_keyword_dictionary(titles):
    word_freq = {}
    for title in titles:
        for word in title.split(' '):
            if word and word not in stop_words:
                if word not in word_freq:
                    word_freq[word] = 1
                word_freq[word] += 1
    return word_freq

## Sorting the Top Words
Now that we have our dictionary, we are ready to sort it so that we can find the top words used in all of our filtered titles.

In [9]:
@pipeline.task(depends_on=build_keyword_dictionary)
def top_keywords(word_freq):
    freq_tuple = [
        (word, word_freq[word])
        for word in sorted(word_freq, key=word_freq.get, reverse=True)
    ]
    return freq_tuple[:100]

## Conclusion - Running the Pipeline
Finally, we can test our pipeline by running it to return the top keywords from Hacker News stories in 2014.

In [10]:
result = pipeline.run()
for keyword in result[top_keywords]:
    print(keyword)

('new', 186)
('google', 168)
('bitcoin', 102)
('open', 93)
('programming', 91)
('web', 89)
('data', 86)
('video', 80)
('python', 76)
('code', 73)
('facebook', 72)
('released', 72)
('using', 71)
('2013', 66)
('javascript', 66)
('free', 65)
('source', 65)
('game', 64)
('internet', 63)
('microsoft', 60)
('c', 60)
('linux', 59)
('app', 58)
('pdf', 56)
('work', 55)
('language', 55)
('software', 53)
('2014', 53)
('startup', 52)
('apple', 51)
('use', 51)
('make', 51)
('time', 49)
('yc', 49)
('security', 49)
('nsa', 46)
('github', 46)
('windows', 45)
('world', 42)
('way', 42)
('like', 42)
('1', 41)
('project', 41)
('computer', 41)
('heartbleed', 41)
('git', 38)
('users', 38)
('dont', 38)
('design', 38)
('ios', 38)
('developer', 37)
('os', 37)
('twitter', 37)
('ceo', 37)
('vs', 37)
('life', 37)
('big', 36)
('day', 36)
('android', 35)
('online', 35)
('years', 34)
('simple', 34)
('court', 34)
('guide', 33)
('learning', 33)
('mt', 33)
('api', 33)
('says', 33)
('apps', 33)
('browser', 33)
('server'