# Hacker News Pipeline
In this short data engineering focused project, we will use and demonstrate a robust data pipeline that schedules tasks in a 'correct' order (implement a directed acyclic graph), and apply it to a real world data problem. From a JSON API, we will filter, clean, aggregate, and summarize data in a sequence of tasks that will apply these transformations for us. For a more analytical/data science focused approach on projects, check out the Data Analysis and Data Science Github repos respectively.

## Introduction
The data we will use comes from a Hacker News (HN) API that returns JSON data of the top stories in 2014. HN is a link aggregator website that users vote up stories that are interesting to the community. It is similar to Reddit, but the community only revolves around on computer science and entrepreneurship posts. Check out its website here: https://news.ycombinator.com/.

To make things easier, we have already downloaded a list of JSON posts to a file called hn_stories_2014.json. The JSON file contains a single key stories, which contains a list of stories (posts). Each post has a set of keys, but we will deal only with the following keys:
* created_at: A timestamp of the story's creation time.
* created_at_i: A unix epoch timestamp.
* url: The URL of the story link.
* objectID: The ID of the story.
* author: The story's author (username on HN).
* points: The number of upvotes the story had.
* title: The headline of the post.
* num_comments: The number of a comments a post has.

Here's an example of the full list of keys in a story (output):

<br>

{
    "story_text": "",
    "created_at": "2014-05-29T08:23:46Z",
    "story_title": null,
    "story_id": null,
    "comment_text": null,
    "created_at_i": 1401351826,
    "url": "http://bits.blogs.nytimes.com/2014/05/28/making-twitter-easier-to-use/",
    "parent_id": null,
    "objectID": "7815285",
    "author": "Leynos",
    "points": 1,
    "title": "Making Twitter Easier to Use",
    "_tags": [
        "story",
        "author_Leynos",
        "story_7815285"
    ],
    "num_comments": 0,
    "_highlightResult": {
        "story_text": {
            "matchedWords": [],
            "value": "",
            "matchLevel": "none"
        },
        "author": {
            "matchedWords": [],
            "value": "Leynos",
            "matchLevel": "none"
        },
        "url": {
            "matchedWords": [],
            "value": "http://bits.blogs.nytimes.com/2014/05/28/making-twitter-easier-to-use/",
            "matchLevel": "none"
        },
        "title": {
            "matchedWords": [],
            "value": "Making Twitter Easier to Use",
            "matchLevel": "none"
        }
    },
    "story_url": null
}

<br>

Using this dataset, we will run a sequence of basic natural language processing tasks using our Pipeline class. **The goal will be to find the top 100 keywords of Hacker News posts in 2014.** Because Hacker News is the most popular technology social media site, this will give us an understanding of the most talked about tech topics in 2014. Naturally, repeating this process for any year of your choosing will be quite easy and a great way to catch up on current hot topics in the current year ____ (insert here) easily!

First, let's get some basic things out of the way:

In [1]:
# get pipeline module
from pipeline import Pipeline

# instantiate an instance
pipeline = Pipeline()

The entirety of our code from this project will revolve around this pipeline class.

## Loading JSON Data
We'll start the project by loading the JSON file data into Python. Because JSON files resemble a key-value dictionary, the goal is to parse the JSON file into a Python dict object. We can accomplish this using the json module (documentation: https://docs.python.org/3/library/json.html).

An example on how to parse JSON strings (code):

<br>

import json

\# notice that `sample_json` is a string, and
\# NOT a dict.
sample_json = '{"hello": "world"}'
sample_dict = json.loads(sample_json)
print(sample_dict)

\# output:
{'hello': 'world'}

<br>

To load in a file, json exposes a method called json.load() which takes in a Python file object as the first argument. Using this json.load() method, we'll load the hn_stories_2014.json file as a Python dict.

In [2]:
# get more modules
import json

# pipeline.task fcn with no arg
@pipeline.task()
def file_to_json():
    # loads the file into python dict
    with open('hn_stories_2014.json', 'r') as f:
        data = json.load(f)
        stories = data['stories']
    # returns list of stories
    return stories

Note that the first task will have no arguments, but every task following this wil depend on the previous one, thus creating an efficient pipeline.

# Filtering the Stories
Now that we have loaded in all the stories as a list of dict objects, we can now operate on them. Let's start by filtering the list of stories to get the most popular stories of the year.

Like any social link aggregator site, individual users can post whatever content they want. The reason we want the most popular stories is to ensure that we select stories that were the most talked about during the year. We can filter for popular stories by ensuring they are links (not Ask HN posts), have a good number of points, and have some comments.

In [3]:
# new pipeline task that depends on file_to_json
@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    # new fcn that filters popular stories > 50 points, > 1 comment, no "Ask HN"
    def is_popular(story):
        # embedded fcn to return this
        return story['points'] > 50 and story['num_comments'] > 1 and not story['title'].startswith('Ask HN')
    
    return (
        # returns a generator of stories filtered
        story for story in stories
        if is_popular(story)
    )

We can begin to see here how the pipeline would start to work - as soon as file_to_json is called, filter_stories would naturally just follow in the pipeline.

## Convert to CSV
With a reduced set of stories, it's time to write these dict objects to a CSV file. The purpose of translating the dictionaries to a CSV is that we want to have a consistent data format when running the later summarizations. **By keeping consistent data formats, each of our pipeline tasks will be adaptable with future task requirements.** That last point is especially important for any project.

In [4]:
# get new modules
from datetime import datetime
from pipeline import build_csv
import string
import io

# new pipeline task that depends on filter_stories
@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    # new fcn that writes the filtered JSON stories to CSV
    lines = []
    for story in stories:
        # build it...
        lines.append(
            (story['objectID'], datetime.strptime(story['created_at'], "%Y-%m-%dT%H:%M:%SZ"), story['url'], story['points'], story['title'])
        )
    # return the csv
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

Due to the data formats being consistent, if we ever wanted to go back and add or change something to this task or any of the other task, it would be quite easy. Making sure of this now will save headaches in the future.

## Extract Title Column
Using the CSV file format we created in the previous task, we can now extract the title column. Once we have extracted the titles of each popular post, we can then run the next word frequency task.

The steps: 
1. Import csv, and create a csv.reader() object from the file object.
2. Find the index of the title in the header. 
3. Iterate the through the reader, and return each item from the reader in the corresponding title index position.

In [5]:
# get csv
import csv

# new pipeline task that depends on json_to_csv
@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    # steps 1, 2, 3 from above
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('title')
    
    # returns generator of titles
    return (line[idx] for line in reader)

We can then just keep building tasks on top of each other, just like this, and the pipeline would continue to grow, without making it too much more complex. We can still very easily follow the workflow from what we have.

## Clean the Titles
Because we're trying to create a word frequency model of words from Hacker News titles, we need a way to create a consistent set of words to use. For example, words like Google, google, GooGle?, and google., all mean the same keyword: google. If we were to split the title into words, however, they would all be lumped into different categories.

To clean the titles, we should make sure to lower case the titles, and to remove the punctuation. An easy way to rid a string of punctuation is to check each character, determine if it is a letter or punctuation, and only keep the letter. From the string package, we are given a handy string constant that contains all the punctuation needed.

Example (code):

<br>

import string

print(string.punctuation)

\# output:  

'!"#%&'()*+,-./:;<=>?@[\\]^_\`{|}~'

<br>

This is very useful so that we don't have to type out each and every possible distracting piece of characters we can think of.

In [6]:
# new pipeline task that depends on extract_titles
@pipeline.task(depends_on=extract_titles)
def clean_title(titles):
    # ensure lowercase and remove characters
    for title in titles:
        title = title.lower()
        title = ''.join(c for c in title if c not in string.punctuation)
        # yield (return) a generator of cleaned titles
        yield title

Note that yield is used here instead of return. They are both very similar, but yield is needed to return a generator (iterable) and keep local calculations.

## Create the Word Frequency Dictionary
With a cleaned title, we can now build the word frequency dictionary. A word frequency dictionary are key value pairs that connects a word to the number of times it is used in a text. 

Furthermore, to find actual keywords, we should enforce the word frequency dictionary to not include stop words. Stop words are words that occur frequently in language like "the", "or", etc., and are commonly rejected in keyword searches.

Included in the project folder is a module called stop_words with a tuple of the most common used stop words in the English language. This will help with the task below.

In [7]:
# get new modules
from stop_words import stop_words

# new pipeline task that depends on clean_title
@pipeline.task(depends_on=clean_title)
def build_keyword_dictionary(titles):
    # new fcn called that returns dict of word freq of all HN titles
    word_freq = {}
    # account for stop words - split titles dict on ' ' char
    for title in titles:
        for word in title.split(' '):
            # account for 'empty' words too
            if word and word not in stop_words:
                if word not in word_freq:
                    word_freq[word] = 1
                word_freq[word] += 1
    return word_freq

There are a bunch of other things we can do inside this function - like only taking certain words that are n characters long - depending on our needs. To beat a dead horse - this is why we need a strong, robust foundation and to keep things consistent, so that changes can happen easily in the future.

## Sort Top Words
Finally, we're ready to sort the top words used in all the titles. The goal is to output a list of tuples with (word, frequency) as the entries sorted from most used, to least most used. We can sort in any way we wish, but we'll just use the basic sorted() built in function for now.

In [8]:
# new pipeline task that depends on build_keyword_dictionary
@pipeline.task(depends_on=build_keyword_dictionary)
def top_words(word_freq):
    # returns a list of
    freq_tuple = [
        (word, word_freq[word])
        for word in sorted(word_freq, key=word_freq.get, reverse=True)
    ]
    return freq_tuple[:100]

In [10]:
# let's run the pipeline and see our work in action!
run = pipeline.run()

# print results
print(run[top_words])

[('new', 186), ('google', 168), ('bitcoin', 102), ('open', 93), ('programming', 91), ('web', 89), ('data', 86), ('video', 80), ('python', 76), ('code', 73), ('facebook', 72), ('released', 72), ('using', 71), ('2013', 66), ('javascript', 66), ('free', 65), ('source', 65), ('game', 64), ('internet', 63), ('c', 60), ('microsoft', 60), ('linux', 59), ('app', 58), ('pdf', 56), ('work', 55), ('language', 55), ('software', 53), ('2014', 53), ('startup', 52), ('use', 51), ('make', 51), ('apple', 51), ('security', 49), ('time', 49), ('yc', 49), ('github', 46), ('nsa', 46), ('windows', 45), ('world', 42), ('way', 42), ('like', 42), ('computer', 41), ('project', 41), ('heartbleed', 41), ('1', 41), ('dont', 38), ('design', 38), ('users', 38), ('git', 38), ('ios', 38), ('twitter', 37), ('developer', 37), ('vs', 37), ('life', 37), ('os', 37), ('ceo', 37), ('big', 36), ('day', 36), ('online', 35), ('android', 35), ('years', 34), ('simple', 34), ('court', 34), ('apps', 33), ('browser', 33), ('mt', 33)

It looks like the top 5 words are new, google, bitcoin, open, and programming, at least in 2014. Note that we basically just found all of this out from 2 lines of code! Once we have our pipeline setup, any time we want to redo this process for another year or a similar dataset, the output will be as simple as 1 2.

## Further Analysis / Next Steps
Even though this was a basic natural language processing task, it did provide some interesting insights into conversations from 2014. Nonetheless, now that you have created the pipeline, there are additional tasks you can perform with the data.

While this was just a natural pipeline for a data engineering focused project, there are always nitpicky little implementations you can do to make your code run more efficiently and be more flexibile. However, since these changes and analysis are not really in the scope of a robust pipeline, we will discuss the tasks only. 

There are endless things you can now do with a pipeline, and again, they usually just pop up as you go along towards your intended goal. Everyone's foundation will likely look the same, but no one's code will be exact.

## Possible Additional Tasks
A few changes to be made to this project:

* Rewrite the Pipeline class' output to save a file of the output for each task. This will allow you to "checkpoint" tasks so they don't have to be run twice. (This is very important and to not be underestimated - more on this in the conclusion)
* Use the nltk package for more advanced natural language processing tasks. (There are many advanced language processing tasks being developed right this second (unless this is the year 2200) that will vastly improve results past the basic package - check them out!)
* Convert to a CSV before filtering, so you can keep all the stories from 2014 in a raw file. (This goes along with the first bullet point)
* Fetch the data from Hacker News directly from a JSON API. Instead of reading from the file we gave, you can perform additional data processing using newer data. (You can get a new file from a different year for HN, or implement an API directly. For more on this - look at the Movie Review project in the Data Analysis Github repo (a full JSON API analysis is done))

While these are just a few suggestions, they are enough such that the project can be re-worked in many different angles. It also should be noted that ALL of these changes shouldn't really be done together just because they are there. You should pick which change applies best to your situation and go from there. No need to implement something that won't be of use.

Finally, it is also important to test each change as it happens. Don't just do everything at once. That's why running individual cells in the notebook is a thing!

## Conclusion
In this project, we engineered a basic robust pipeline that can easily be followed from the first function to last, ending with pulling the top words from headlines for a given year. This pipeline enabled us to not only read the code with ease, but it also allowed us to execute it with ease (2 lines)!

A couple of things to be said about this pipeline and pipelines in general:
* A pro: The good thing about a pipeline speaks for itself: You don't need to call functions multiple times for a task. If you know that you will always follow up task A with task B, perhaps a pipeline will make things more efficient. A good schema goes a long way.
* A con: If not done correctly, pipelines can have some roadblocks. You should always test each part of your pipeline as you implement it (not done in this project, but we should have). If you get to the end of your pipeline without testing and there is an error - yes, you will know at which point the error is thrown because the output will tell you so due to the nature of the pipeline, but this does not mean a fix in that spot will result in successful execution. In the worst case, your code will be wrong somewhere in the first function, and you will have to adjust the the entire rest of the pipeline to account for that change! 

With the data pipeline complete, this not only supplements data analysis and data science projects, but in some cases even makes ideas that wouldn't be viable before doable. With a proper data engineering design, even the most basic data science projects can have many different angles. As always, data engineering is often overlooked and viewed as dull in relation to its "more interesting" siblings of analysis and predictions, but it is just as important.