## Building a data pipeline: Hacker News Social Media Data

In this project, we will build our own data pipeline class in Python. This class will be constructed around a directed acyclic graph as the scheduler for it. From a JSON API, we will filter, clean, aggregate, and summarize data in a sequence of tasks that will apply these transformations for us.

The data we will use comes from a Hacker News (HN) API that returns JSON data of the top stories in 2014. The list of JSON posts can be downloaded from the file called `hn_stories_2014.json`. The JSON file contains a single key `stories`, which contains a list of stories (posts). Each post has a set of keys, but we will deal only with the following keys:

- `created_at`: A timestamp of the story's creation time.
- `created_at_i`: A unix epoch timestamp.
- `url`: The URL of the story link.
- `objectID`: The ID of the story.
- `author`: The story's author (username on HN).
- `points`: The number of upvotes the story had.
- `title`: The headline of the post.
- `num_comments`: The number of a comments a post has.

Using this dataset, we will run a sequence of basic natural language processing tasks using our `Pipeline` class. The goal will be to find the top 100 keywords of Hacker News posts in 2014.

### 1. Building the `Pipeline` class

#### 1.1 Constructing the directed acyclic graph (DAG) class

The pipeline exhibits the requirements of a DAG. First, there are a set of vertices (tasks) and edges (links between tasks), second, there is a direction of each task, and finally, there are no cycles.

The DAG structure is built in a way that naturally creates an efficient ordering of dependent tasks. There is a DAG sorting algorithm for exposing this order that we can take advantage of when scheduling our tasks.

Specifically, the algorithm we will implement is called [Kahn's Algorithm](https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm), a famous DAG sorting algorithm. An interesting property about this sorting algorithm is that it can also determine if a graph has a cycle or not.

In [1]:
from collections import deque

class DAG:
    
    def __init__(self):
        self.graph = {}
            
    def in_degrees(self):
        self.degrees = {}
        for node in self.graph:
            if node not in self.degrees:
                self.degrees[node] = 0
            for pointer in self.graph[node]:
                if pointer not in self.degrees:
                    self.degrees[pointer] = 1
                else:
                    self.degrees[pointer] += 1
                    
    def sort(self):
        self.in_degrees()
        d = deque()
        searched = []
        for node in self.degrees:
            if self.degrees[node] == 0:
                d.append(node)
        while d:
            node_i = d.popleft()
            for pointer in self.graph[node_i]:
                self.degrees[pointer] -= 1
                if self.degrees[pointer] == 0:
                    d.append(pointer)
            searched.append(node_i)
        return searched
        
    def add(self, node, to=None):
        if not node in self.graph:
            self.graph[node] = []
        if to:
            if not to in self.graph:
                self.graph[to] = []
            self.graph[node].append(to)
        if len(self.sort()) > len(self.graph):
            raise Exception

With a robust DAG, we now have the scheduler to build our intended pipeline. To start, let's add in the DAG class to the Pipeline. Ee then can create a general purpose pipeline with single argument tasks.

In [2]:
class Pipeline():
    
    def __init__(self):
        self.tasks = DAG()
        
    def task(self, depends_on=None):
        def inner(f):
            self.tasks.add(f)
            if depends_on:
                self.tasks.add(depends_on, f)
            return f
        return inner
    
    def run(self):
        scheduled = self.tasks.sort()
        completed = {}
        for task in scheduled:
            for node, values in self.tasks.graph.items():
                if task in values:
                    completed[task] = task(completed[node])
            if task not in completed:
                completed[task] = task()
        return completed

In [3]:
pipeline = Pipeline()

### 2. Establishing the data pipeline

#### 2.1. Loading the data

Because JSON files resemble a key-value dictionary, the goal is to parse the JSON file into a Python `dict` object.

In [4]:
import json

@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json', 'r') as f:
        data = json.load(f)
        stories = data['stories']
    return stories

#### 2.2 Filtering the stories

Now that we have loaded in all the stories as a list of dict objects, we can now operate on them. Let's start by filtering the list of stories to get the most popular stories of the year.

Like any social link aggregator site, individual users can post whatever content they want. The reason we want the most popular stories is to ensure that we select stories that were the most talked about during the year. We can filter for popular stories by ensuring they are links (not Ask HN posts), have a good number of points, and have some comments.

In [5]:
@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['points'] > 50 and story['num_comments'] > 1 and not story['title'].startswith('Ask HN')
    return (story for story in stories if is_popular(story)) #generator

#### 2.3 Converting to CSV

With a reduced set of stories, it's time to write these `dict` objects to a CSV file. The purpose of translating the dictionaries to a CSV is that we want to have a consistent data format when running the later summarizations. By keeping consistent data formats, each of our pipeline tasks will be adaptable with future task requirements.

In [6]:
import io
import itertools
from datetime import datetime

def build_csv(lines, header=None, file=None):
    def open_file(f):
        if isinstance(f, str):
            f = open(f, 'w')
        return f
    
    file = open_file(file)
    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file


@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    lines = []
    for story in stories:
        lines.append(
            (story['objectID'],
             datetime.strptime(story['created_at'], "%Y-%m-%dT%H:%M:%SZ"),
             story['url'],
             story['points'],
             story['title'])
        )
    return build_csv(
        lines,
        header=['objectID', 'created_at', 'url', 'points', 'title'],
        file=io.StringIO()
    )

#### 2.4 Extracting the title column

Using the CSV file format we created in the previous task, we can now extract the title column. Once we have extracted the titles of each popular post, we can then run the next word frequency task. To extract them, we can follow these steps:

- 1. Import `csv`, and create a `csv.reader()` object from the file object.
- 2. Find the index of the `title` in the header.
- 3. Iterate the through the reader, and return each item from the reader in the corresponding title index position.

In [7]:
import csv

@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('title')
    return (line[idx] for line in reader)#generator

#### 2.5 Cleaning the titles

Because we're trying to create a word frequency model of words from Hacker News titles, we need a way to create a consistent set of words to use. To clean the titles, we should make sure to lower case the titles, and to remove the punctuation. An easy way to rid a string of punctuation is to check each character, determine if it is a letter or punctuation, and only keep the letter.

In [8]:
import string

@pipeline.task(depends_on=extract_titles)
def clean_titles(titles):
    for title in titles:
        title = title.lower()
        title = ''.join(c for c in title if c not in string.punctuation)
        yield title

#### 2.6 Creating the word frequency dictionary

A word frequency dictionary are key value pairs that connects a word to the number of times it is used in a text. To find actual keywords, we should enforce the word frequency dictionary to not include stop words.

In [9]:
from stop_words import stop_words

@pipeline.task(depends_on=clean_titles)
def build_keyword_dictionary(titles):
    word_freq = {}
    for title in titles:
        for word in title.split(' '):
            if word and word not in stop_words: #ignore empty words
                if word not in word_freq:
                    word_freq[word] = 1
                word_freq[word] += 1
    return word_freq

#### 2.7 Sorting the top words

The goal of this task is to output a list of tuples with `(word, frequency)` as the entries sorted from most used, to least most used.

In [10]:
@pipeline.task(depends_on=build_keyword_dictionary)
def top_keywords(word_freq):
    freq_tuple = [
        (word, word_freq[word])
        for word in sorted(word_freq, key=word_freq.get, reverse=True)
    ]
    return freq_tuple[:100]

### 3. Running the data pipeline

We can finally run the pipeline and print the output of the task.

We can represent our tasks, and their outputs during a run, by using a dictionary that maps `function: output`. With this dictionary, we can store outputs after task completion so we can use them as inputs for the next tasks that require them.

With that output dictionary, we can then choose to see the outputs of any function.

In [11]:
output = pipeline.run()

In [12]:
print(output[top_keywords])

[('new', 186), ('google', 168), ('bitcoin', 102), ('open', 93), ('programming', 91), ('web', 89), ('data', 86), ('video', 80), ('python', 76), ('code', 73), ('facebook', 72), ('released', 72), ('using', 71), ('2013', 66), ('javascript', 66), ('free', 65), ('source', 65), ('game', 64), ('internet', 63), ('microsoft', 60), ('c', 60), ('linux', 59), ('app', 58), ('pdf', 56), ('work', 55), ('language', 55), ('software', 53), ('2014', 53), ('startup', 52), ('apple', 51), ('use', 51), ('make', 51), ('time', 49), ('yc', 49), ('security', 49), ('nsa', 46), ('github', 46), ('windows', 45), ('world', 42), ('way', 42), ('like', 42), ('1', 41), ('project', 41), ('computer', 41), ('heartbleed', 41), ('git', 38), ('users', 38), ('dont', 38), ('design', 38), ('ios', 38), ('developer', 37), ('os', 37), ('twitter', 37), ('ceo', 37), ('vs', 37), ('life', 37), ('big', 36), ('day', 36), ('android', 35), ('online', 35), ('years', 34), ('simple', 34), ('court', 34), ('guide', 33), ('learning', 33), ('mt', 3

The final result yielded some interesting keywords. There were terms like `bitcoin` (the cryptocurrency), `heartbleed` (the 2014 hack), and many others. Even though this was a basic natural language processing task, it did provide some interesting insights into conversations from 2014.