# Introduction

We will build a robust data pipeline that schedules our tasks in the correct order and we will apply it to a real world data !! We will filter, clean, aggregate and summarize the data in a sequence of tasks.

The data we will use comes from a Hacker News (HN) API that returns JSON DATA of the top stories in 2014, it looks like the one used in [this project](https://github.com/abdouch96/Exploring-Hackers-News-Posts-/blob/main/Exploring%20Hackers%20News%20Posts%20.ipynb).

If you're unfamiliar with Hacker News, it's a link aggregator website that users vote up stories that are interesting to the community. It is similar to [Reddit](https://www.reddit.com/), but the community only revolves around on computer science and entrepreneurship posts.

The JSON file contains a single key stories, which contains a list of stories (posts). Each post has a set of keys, but we will deal only with the following keys:

  -  title: The headline of the post.
  -  created_at: A timestamp of the story's creation time.
  -  url: The URL of the story link.
  -  objectID: The ID of the story.
  -  points: The number of upvotes the story had.


Using this dataset, we will run a sequence of basic natural language processing tasks using our Pipeline class. The goal will be to find the top 100 keywords of Hacker News posts in 2014. Because Hacker News is the most popular technology social media site, this will give us an understanding of the most talked about tech topics in 2014 !

In [36]:
import csv
from collections import deque
import itertools
from datetime import datetime
import json
import io
import string
from nltk.corpus import stopwords
import re

In [7]:
class DAG:
    def __init__(self):
        self.graph = {}

    def in_degrees(self):
        in_degrees = {}
        for node in self.graph:
            if node not in in_degrees:
                in_degrees[node] = 0
            for pointed in self.graph[node]:
                if pointed not in in_degrees:
                    in_degrees[pointed] = 0
                in_degrees[pointed] += 1
        return in_degrees

    def sort(self):
        in_degrees = self.in_degrees()
        to_visit = deque()
        for node in self.graph:
            if in_degrees[node] == 0:
                to_visit.append(node)

        searched = []
        while to_visit:
            node = to_visit.popleft()
            for pointer in self.graph[node]:
                in_degrees[pointer] -= 1
                if in_degrees[pointer] == 0:
                    to_visit.append(pointer)
            searched.append(node)
        return searched

    def add(self, node, to=None):
        if node not in self.graph:
            self.graph[node] = []
        if to:
            if to not in self.graph:
                self.graph[to] = []
            self.graph[node].append(to)
        if len(self.sort()) != len(self.graph):
            raise Exception

In [8]:
class Pipeline:
    def __init__(self):
        self.tasks = DAG()

    def task(self, depends_on=None):
        def inner(f):
            self.tasks.add(f)
            if depends_on:
                self.tasks.add(depends_on, f)
            return f
        return inner

    def run(self):
        scheduled = self.tasks.sort()
        completed = {}

        for task in scheduled:
            for node, values in self.tasks.graph.items():
                if task in values:
                    completed[task] = task(completed[node])
            if task not in completed:
                completed[task] = task()
        return completed

In [9]:
def build_csv(lines, header=None, file=None):
    if header:
        lines = itertools.chain([header], lines)
    writer = csv.writer(file, delimiter=',')
    writer.writerows(lines)
    file.seek(0)
    return file

In [51]:
pipeline = Pipeline()

@pipeline.task()
def file_to_json():
    with open('OneDrive\Documents\my_datasets\hn_stories_2014.json', 'r') as f:
        data = json.load(f)
        stories = data['stories']
    return stories

@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    def is_popular(story):
        return story['points'] > 50 and story['num_comments'] > 1 and not story['title'].startswith('Ask HN')

    return (story for story in stories if is_popular(story))

@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    lines = []
    for story in stories:
        lines.append(
            (story['objectID'], 
             datetime.strptime(story['created_at'], "%Y-%m-%dT%H:%M:%SZ"), 
             story['url'], 
             story['points'], 
             story['title'])
        )
    return build_csv(lines, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('title')
    
    return (line[idx] for line in reader)

@pipeline.task(depends_on=extract_titles)
def clean_title(titles):
    for title in titles:
        title = title.lower()
        title = ''.join(w for w in title if w not in string.punctuation)
        title = re.sub(r'[^a-z]', ' ', title)
        yield title
        
@pipeline.task(depends_on=clean_title)
def build_keyword_dictionary(titles):
    stopword = stopwords.words('english')
    word_freq = {}
    for title in titles:
        for word in title.split(' '):
            if word and word not in stopword:
                if word not in word_freq:
                    word_freq[word] = 1
                word_freq[word] += 1
    return word_freq

@pipeline.task(depends_on=build_keyword_dictionary)
def top_keywords(word_freq):
    freq_tuple = [ (word, word_freq[word]) for word in sorted(word_freq, key=word_freq.get, reverse=True) ]
    return freq_tuple[:100]

top_100 = pipeline.run()
print(top_100[top_keywords])


[('hn', 211), ('show', 193), ('new', 187), ('google', 179), ('bitcoin', 104), ('open', 94), ('web', 92), ('programming', 91), ('data', 87), ('us', 86), ('video', 80), ('python', 77), ('facebook', 73), ('code', 73), ('using', 72), ('released', 72), ('c', 68), ('javascript', 66), ('internet', 65), ('free', 65), ('game', 65), ('source', 65), ('first', 63), ('go', 62), ('microsoft', 60), ('linux', 60), ('one', 60), ('app', 59), ('pdf', 56), ('work', 55), ('language', 55), ('apple', 53), ('software', 53), ('startup', 53), ('use', 51), ('make', 51), ('world', 50), ('nsa', 50), ('yc', 50), ('time', 49), ('security', 49), ('get', 46), ('github', 46), ('system', 45), ('x', 45), ('windows', 45), ('way', 42), ('like', 42), ('heartbleed', 42), ('project', 41), ('computer', 41), ('ios', 40), ('dont', 39), ('git', 38), ('users', 38), ('back', 38), ('twitter', 38), ('design', 38), ('day', 38), ('developer', 37), ('os', 37), ('ceo', 37), ('vs', 37), ('big', 37), ('life', 37), ('android', 35), ('simple

In [43]:
graph = pipeline.tasks.graph
graph

{<function __main__.file_to_json()>: [<function __main__.filter_stories(stories)>],
 <function __main__.filter_stories(stories)>: [<function __main__.json_to_csv(stories)>],
 <function __main__.json_to_csv(stories)>: [<function __main__.extract_titles(csv_file)>],
 <function __main__.extract_titles(csv_file)>: [<function __main__.clean_title(titles)>],
 <function __main__.clean_title(titles)>: [<function __main__.build_keyword_dictionary(titles)>],
 <function __main__.build_keyword_dictionary(titles)>: [<function __main__.top_keywords(word_freq)>],
 <function __main__.top_keywords(word_freq)>: []}

In [53]:
dependencies = [func.__name__ for func in pipeline.tasks.sort()]
print(dependencies)

['file_to_json', 'filter_stories', 'json_to_csv', 'extract_titles', 'clean_title', 'build_keyword_dictionary', 'top_keywords']
