# Hacker News Pipeline

This investigation looks at a dataset of news/interest stories from 2014. Using this dataset, a sequence of basic natural language processing tasks are run using an implementation of a pipeline. The goal is to find the top 100 keywords of Hacker News posts in 2014. Because Hacker News is the most popular technology social media site, this will give us an understanding of the most talked about tech topics in 2014.

The investigation demonstrates the use of a task-based data pipeline, which orders tasks using a directed acyclic graph. The pipeline is used to **filter, clean, aggregate, and summarize data** in a sequence of tasks that will apply these transformations. Some tasks are written as 'generator' functions that successive tasks can use to iterate through values, instead of passing through all data at once.

The DAG class and Pipeline class have been implementated here:

In [1]:
# See task_pipeline.py
import task_pipeline

## The Data

The starting data comes from a Hacker News (HN) API that returns JSON data of the top stories in 2014. Hacker News is a link aggregator website that users vote up stories that are interesting to the community. It is similar to Reddit, but the community only revolves around on computer science and entrepreneurship posts.

A list of JSON posts from the website in saved to a file: hn_stories_2014.json. The JSON file contains a single key stories, which contains a list of stories (posts). The following keys are the ones we will examine:

| Label | Description |
| :--- | :--- |
| created_at | A timestamp of the story's creation time. |
| created_at_i | A unix epoch timestamp. |
| url | The URL of the story link. |
| objectID | The ID of the story. |
| author | The story's author (username on HN). |
| points | The number of upvotes the story had. |
| title | The headline of the post. |
| num_comments | The number of a comments a post has. |


### Example JSON

```
{
    "story_text": "",
    "created_at": "2014-05-29T08:23:46Z",
    "story_title": null,
    "story_id": null,
    "comment_text": null,
    "created_at_i": 1401351826,
    "url": "http://bits.blogs.nytimes.com/2014/05/28/making-twitter-easier-to-use/",
    "parent_id": null,
    "objectID": "7815285",
    "author": "Leynos",
    "points": 1,
    "title": "Making Twitter Easier to Use",
    "_tags": [
        "story",
        "author_Leynos",
        "story_7815285"
    ],
    "num_comments": 0,
    "_highlightResult": {
        "story_text": {
            "matchedWords": [],
            "value": "",
            "matchLevel": "none"
        },
        "author": {
            "matchedWords": [],
            "value": "Leynos",
            "matchLevel": "none"
        },
        "url": {
            "matchedWords": [],
            "value": "http://bits.blogs.nytimes.com/2014/05/28/making-twitter-easier-to-use/",
            "matchLevel": "none"
        },
        "title": {
            "matchedWords": [],
            "value": "Making Twitter Easier to Use",
            "matchLevel": "none"
        }
    },
    "story_url": null
}
```

In [2]:
# Set this variable to True to run a set of tests going throughout the notebook:
testing = False

## Task 1: Read in the JSON data
The task will output a list of stories from the data.

In [3]:
# Instantiate a Pipeline: 
pipeline = task_pipeline.Pipeline()

# Add the first tast - to read the JSON file.
import json
@pipeline.task()
def file_to_json():
    with open('hn_stories_2014.json', 'r', encoding='UTF-8') as file:
        stories = json.loads(file.read())
    return stories

In [4]:
# Test this:
if testing:
    stories = file_to_json()
    for key,value in stories.items():
        print(value[0])
        break

## Task 2: Read in the JSON data
This task creates a generator function to iterate through the stories which have:
- More than 0 points/votes from other users
- 1 or more comments
- Are not a question (starting with 'Ask HN') 

In [5]:
@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def generator():
        for x in stories['stories']:
            if x['points'] > 0 and x['num_comments']>1 and not x['title'].startswith('Ask HN'):
                yield x
        return
    return generator

In [6]:
if testing:
    generator = filter_stories(stories)
# for i, x in enumerate(generator()):
#     print (x)
#     break
# print(i)
# print(len(stories['stories']))

## Task 3: Read in the JSON data
Using the generator from task 2, iterate through the stories we are interested in and build into CSV format. To do this, the ```build_csv``` helper is used to write the file to the destination (which might be disk, or could be an ```io.stringIO()```.

In [7]:
import io
from datetime import datetime
import csv
import itertools
def build_csv(lines, header=None, file=None):

    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories_enumerator):
    stories2 = []
    for story in stories_enumerator():
        created_at = datetime. strptime(story['created_at'], "%Y-%m-%dT%H:%M:%SZ")
        stories2.append([story['objectID'], created_at, story['url'], story['points'], story['title']])

    #print (stories2[0])
    file = build_csv(stories2,
                     header=['objectID', 'created_at', 'url', 'points', 'title'],
                     file=open('test.csv','w+',newline='',encoding='utf-8'))
    # This file could be implemented through io.StringIO()
    # The disk file works better on my system and allows us to see what the middle step is
    # but memory-based may be better for production (if not too big!)
    return file
    
    

In [8]:
if testing:
    file = json_to_csv(generator)

## Task 4: Extract the titles form the stories
The task will outputs a generator to iterate through the titles.

In [9]:
@pipeline.task(depends_on=json_to_csv)
def extract_titles(file):
    def titles():
        for row in reader:
            yield row[index]
        return

    reader = csv.reader(file)
    header = next(reader)
    #print (header)
    #headings = header.split(',')
    index = 0
    for h in header:
        if h =='title':
            #print("Title index is:", index)
            break
        index += 1
    return titles

In [10]:
if testing:
    file.seek(0)
    title_generator = extract_titles(file)

In [11]:
if testing:
    for x in title_generator():
        print(x)
        break
    

## Task 5: Clean up the Titles
Before parsing for the keywords, this task cleans out all punctuation and converts everything to lower case. The resulting clean text is returned as a generator function again to allow interation through the titles.

To clean the strings, we will use the string.maketrans() method, with a useful list of punctuation symbols from the ```string``` module.

In [12]:
import string
if testing:
    print(string.punctuation) # A useful list of punctuation symbols.
    # Create a 'translation' mapping.
    # All items in the 3rd argument are changed to None.
    # This is fast - it's implemented using mapping in a C string.
    strip_punctuation = str.maketrans('', '', string.punctuation)
    print('hello! how are you?'.translate(strip_punctuation))

In [13]:
@pipeline.task(depends_on=extract_titles)
def clean_titles(titles):
    # Generator to return cleaned up titles:
    def cleaned_titles():
        for title in titles():
            yield title.lower().translate(strip_punctuation)
        return

    # Create a 'translation' mapping to be used in the generator above.
    # All items in the 3rd argument are changed to None.
    # This is fast - it's implemented using mapping in a C string.   
    strip_punctuation = str.maketrans('', '', string.punctuation)
    return cleaned_titles


In [14]:
if testing:
    cleaned_titles = clean_titles(title_generator())
    for t in cleaned_titles():
        print(t)
        break

## Task 6: Build a Dictionary of Keywords
We also have a list of 'stop words' from ```stop_words.py``` - which are common words we want to ignore.
The task will output a word frequency dictionary.

In [15]:
from stop_words import stop_words
@pipeline.task(depends_on=clean_titles)
def build_keyword_dictionary(cleaned_titles):
    word_freq_dict = {}
    for title in cleaned_titles():
        for word in title.split(' '):
            # Add to the count if it is not in our list of common words (stop_words)
            # and we test for zero length, as multiple spaces are split this way.
            if word not in stop_words and len(word) > 0:
                word_freq_dict[word] = word_freq_dict.get(word, 0) + 1

    return word_freq_dict

In [16]:
def test_keyword_input():
    yield 'hello you horse exactly one day  today horse   to go before the end the end in nigh and the horse is neigh today '
    return

if testing:
    build_keyword_dictionary(test_keyword_input)

In [17]:
if testing:
    word_freq_dict = build_keyword_dictionary(cleaned_titles)

## Task 7: List our the top 100 Keywords
The task will output a list of the top 100 keywords.

In [18]:
@pipeline.task(depends_on=build_keyword_dictionary)
def top_100_keywords(word_freq_dict):
    # Sort the dictionary and put the results (tuples) into a list 
    top_100 = list(sorted(word_freq_dict.items(), key=lambda x: x[1], reverse=True))
    # return only the top 100:
    return top_100[:100]

In [19]:
if testing:
    x = top_100_keywords(build_keyword_dictionary(test))
    print(*x)

In [20]:
if testing:
    x = top_100_keywords(word_freq_dict)
    print(*x)

## Task 8: Read in the JSON data
The task will add a simple 'progress' indication into the pipeline task list after csv creation.

In [21]:
# Add a 'progress' indicator into the pipeline after csv creation:
@pipeline.task(depends_on=json_to_csv)
def update_progress(file):
    print("Finished conversion to temp csv")

## Run the Pipeline
This will now run the whole pipeline. The result (top 100 keywords) is printed.

In [22]:
x = pipeline.run()
print(x[top_100_keywords])

Finished conversion to temp csv
[('new', 498), ('google', 481), ('bitcoin', 298), ('app', 295), ('web', 259), ('startup', 241), ('data', 240), ('open', 227), ('facebook', 219), ('code', 207), ('using', 202), ('programming', 188), ('use', 181), ('video', 180), ('free', 179), ('game', 167), ('time', 162), ('javascript', 156), ('apple', 154), ('ios', 153), ('source', 152), ('microsoft', 149), ('software', 147), ('make', 146), ('like', 146), ('internet', 145), ('2013', 140), ('world', 139), ('2014', 139), ('tech', 138), ('way', 136), ('github', 134), ('c', 132), ('python', 132), ('work', 130), ('people', 129), ('apps', 127), ('windows', 127), ('project', 123), ('twitter', 121), ('security', 120), ('pdf', 119), ('released', 119), ('yc', 117), ('1', 116), ('language', 114), ('dont', 114), ('vs', 113), ('users', 112), ('android', 112), ('email', 112), ('linux', 111), ('mobile', 106), ('startups', 106), ('news', 105), ('ceo', 104), ('better', 102), ('just', 102), ('api', 102), ('does', 101), (

## Commentary
The pipeline generated an interesting list of 'top-of-mind' words from 2014. Noticably, we have **bitcoin** and **Heartbleed**. People were talking more about **JavaScript** than **Python**.

## Further development ideas

- Rewrite the way Pipeline class does its 'output' to save a file of the output for each task to 'checkpoint' tasks. In the case that one task fails in the pipeline, it can be restarted from the 'checkpoint'.
- Use spaCy or the [nltk package](http://www.nltk.org/) for more natural language processing tasks.
- Fetch the lastest data from Hacker News directly from [a JSON API](https://hn.algolia.com/api) and process this.