# HackerNews data analysis challenge with Spark

In this notebook, you will analyse a dataset of (almost) all submitted HackerNews posts with Spark. 

The notebook is a convenient way to explore your data, but you will be asked to implement corresponding functions in the `spark_rdd.py` file that will be submitted to KATE. Having standalone functions is a good way to automate reporting of the different metrics we will look at in a prouction environment.

Let's start by importing some of the libraries you will need.

In [18]:
import urllib.request
import zipfile
from io import BytesIO

DATASET_URL = "https://s3-eu-west-1.amazonaws.com/kate-datasets/hackernews/HNStories.zip"
DATA_DIR = "data"

if __name__ == "__main__":

    req = urllib.request.urlopen(DATASET_URL)
    data = BytesIO(req.read())

    with zipfile.ZipFile(data, "r") as zipref:
        zipref.extractall(DATA_DIR)

In [20]:
import json
import numpy as np
import matplotlib.pyplot as plt
import pyspark
from datetime import datetime as dt
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
print(sc)
print("Ready to go!")
%matplotlib inline

<SparkContext master=local[*] appName=pyspark-shell>
Ready to go!


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:57979)
Traceback (most recent call last):
  File "C:\Users\rosie\Miniconda3\envs\ads10\lib\site-packages\py4j\java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\rosie\Miniconda3\envs\ads10\lib\site-packages\py4j\java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:57979)
Traceback (most recent call last):
  File "C:\Users\rosie\Miniconda3\envs\ads10\lib\site-packages\py4j\java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: 

The file has one JSON entry per line. In order to make accessing it easier, first turn each entry as a dictionary and use `persist()` to cache the resulting RDD. 

Remember to download the dataset (instructions in `Readme.md`)

In [3]:
dataset_json = sc.textFile("data/HNStories.json")
dataset = dataset_json.map(lambda x: json.loads(x))
dataset.persist()

PythonRDD[2] at RDD at PythonRDD.scala:53

In [4]:
import re
from datetime import datetime as dt

In [5]:
def get_hour(rec):
    return re.compile(r"\w+").findall(line)
    

In [6]:
def extract_time(timestamp):
    return dt.utcfromtimestamp(timestamp)

In [7]:
def get_bucket(rec, min_timestamp, max_timestamp):
    interval = (max_timestamp - min_timestamp + 1) /200.0
    return int((rec["created_at_i"] - min_timestamp) / interval)

Finally, Spark has many helper functions on top of the ones we have studied which you will find useful. You can view them at [http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

### Function `count_elements_in_dataset`

Lets start with some initial analysis. 

**How many elements are in your datasets?**


In [8]:
def count_elements_in_dataset(dataset):
    return dataset.count()

In [9]:
count_elements_in_dataset(dataset)

1333789

### Function `get_first_element`

**What does the first element look like? Assign the result to a variable called `first` **


In [10]:
def get_first_element(dataset):
    return dataset.first()

In [11]:
get_first_element(dataset)

{'author': 'TuxLyn',
 'created_at': '2014-05-29T08:25:40Z',
 'created_at_i': 1401351940,
 'num_comments': 0,
 'objectID': '7815290',
 'points': 1,
 'title': 'DuckDuckGo Settings',
 'url': 'https://duckduckgo.com/settings'}

### Function `get_all_attributes`

Each element is a dictionary of attributes and their values for a post. 
**Can you find the set of all attributes used throughout the RDD?**
The function `dictionary.keys()` gives you the list of attributes of a dictionary.

In [12]:
def get_all_attributes(dataset):
    all_attributes = dataset.flatMap(lambda x: x.keys()).distinct().collect()
    return all_attributes

In [13]:
get_all_attributes(dataset)

['created_at',
 'points',
 'created_at_i',
 'story_text',
 'objectID',
 'title',
 'author',
 'url',
 'story_id',
 'num_comments']

### Function `get_elements_w_same_attributes`

We see that there are more attributes than just the one used in the first element. Can you filter the dataset to keep only elements that have the same set of attributes as the first element?
Hint: you might want to write a function that compares attributes for two elements and apply it on your dataset.

In [14]:
def get_elements_w_same_attributes(dataset):
    def compare_elems(first, second):
        if len(first) != len(second):
            return False
        for key in first.keys():
            if key not in second:
                return False
        return True
    first = dataset.first()
    return dataset.filter(lambda x: compare_elems(first, x))

How many elements did you find?

### How many posts through time

The field `created_at_i` is very useful, it gives you a UNIX timestamp of the time at which the file was created. The following function lets you extract a time from a timestamp.

### Function `get_min_max_timestamps`

**Find the minimum and maximum timestamps in the RDD and call them `min_time` and `max_time`.** 
These correspond to the first and last post, when did they occur? For the function `get_min_max_timestamps` you need to return min_time and max_time given a dataset

In [22]:
# Find the minimum timestamp
def get_min_max_timestamps(dataset):
    min_time = dataset.map(lambda x: x["created_at_i"]).reduce(
    lambda x, y: x if x < y else y
    )
    max_time = dataset.map(lambda x: x["created_at_i"]).reduce(
    lambda x, y: x if x > y else y
    )
    return extract_time(min_time), extract_time(max_time)
    

### Function `get_number_of_posts_per_bucket`

Now lets analyse how many elements through time. The following function assigns a record to one of 200 "buckets" of time. **Use it to count the number of elements that fall within each bucket and call the result `bucket_rdd`.** The result should be such that `buckets` below generates the corresponding output. If you want to use this function in your `spark_rdd.py` you will need to redefine it in the file.

In [23]:
def get_number_of_posts_per_bucket(dataset, min_time, max_time):
    min_timestamp = min_time.timestamp()
    max_timestamp = max_time.timestamp()
    return dataset.map(lambda x: (get_bucket(x, min_timestamp, max_timestamp), 1)).reduceByKey(lambda x, y: x + y)

In [None]:
buckets = sorted(buckets_rdd.collect())

We can then use this to plot the number of submitted posts over time.

In [None]:
bs = [dt.utcfromtimestamp(x[0]*interval + min_time) for x in buckets]
ts = [x[1] for x in buckets]
plt.plot(bs, ts)

### Function `get_number_of_posts_per_hour`

The following function gets the hour of the day at which a post was submitted. **Use it to find the number of posts submitted at each hour of the day.** The value of `hours_buckets` should match the one printed below.

In [25]:
def get_hour(rec):
    t = dt.utcfromtimestamp(rec['created_at_i'])
    return t.hour

In [26]:
def get_number_of_posts_per_hour(dataset):
    return dataset.map(lambda x: (get_hour(x), 1)).reduceByKey(lambda x, y: x + y)


In [None]:
hours_buckets = sorted(hours_buckets_rdd.collect())

In [None]:
hrs = [x[0] for x in hours_buckets]
sz = [x[1] for x in hours_buckets]
plt.plot(hrs, sz)

### Function `get_score_per_hour`

The number of points scored by a post is under the attribute `points`. **Use it to compute the average score received by submissions for each hour.**

In [28]:
def get_score_per_hour(dataset):
    scores_per_hour_rdd = (
    data.set.map(lambda x: (get_hour(x), (x["points"], 1))).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    .map(lambda x: (x[0], x[1][0] / x[1][1]))
    )
    
    return scores_per_hour_rdd


In [None]:
scores_per_hour = sorted(scores_per_hour_rdd.collect())

In [None]:
hrs = [x[0] for x in scores_per_hour]
sz = [x[1] for x in scores_per_hour]
plt.plot(hrs, sz)

### Function `get_proportion_of_scores`

It may be more useful to look at sucessful posts that get over 200 points. **Find the proportion of posts that get above 200 points per hour.**

In [29]:
def get_proportion_of_scores(dataset):
    prop_per_hour_rdd = (
    dataset.map(lambda x: (get_hour(x), (1 if x["points"] > 200 else 0, 1)))
    .reduceByKey(lambda x, y: (x[0] + y[0], y[0], x[1] + y[1]))
    .map(lambda x: (x[0], x[1][0] / x[1][1]))
    )
    
    return prop_per_hour_rdd

In [None]:
prop_per_hour = sorted(prop_per_hour_rdd.collect())

In [None]:
hrs = [x[0] for x in prop_per_hour]
sz = [x[1] for x in prop_per_hour]
plt.plot(hrs, sz)

### Function `get_proportion_of_success`

The following function lists the word in the title. **Use it to count the number of words in the title of each post, and look at the proportion of successful posts for each title length.**

If an element does not have a title, it should count it as a length of 0.

In [30]:
import re
def get_words(line):
    return re.compile('\w+').findall(line)

In [31]:
def get_proportion_of_success(dataset):
    prop_per_title_length_rdd = (
    dataset.map(
        lambda x: (
            len(get_word(x.get("title", ""))),
        )
    )
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    .map(lambda x: (x[0], x[1][0] / x[1][1]))
    )
    
    return prop_per_title_length_rdd

In [None]:
prop_per_title_length = sorted(prop_per_title_length_rdd.collect())

In [None]:
hrs = [x[0] for x in prop_per_title_length]
sz = [x[1] for x in prop_per_title_length]
plt.plot(hrs, sz)

### Function `get_title_length_distribution`

Lets compare this with the distribution of number of words. **Count for each title length the number of submissions with that length.**

In [32]:
def get_title_length_distribution(dataset):
    submissions_per_length_rdd = dataset.map(
        lambda x: (len(get_words(x.get("title", ""))), 1)
    ).reduceByKey(lambda x, y: x + y)
    
    return submissions_per_length_rdd

In [None]:
submissions_per_length = sorted(submissions_per_length_rdd.collect())

In [None]:
hrs = [x[0] for x in submissions_per_length]
sz = [x[1] for x in submissions_per_length]
plt.plot(hrs, sz)

Looks like most people are getting it wrong!

### Optional

For this task, you will need a new function: `takeOrdered()`. Like `take()` it collects elements from an RDD. However, it can be applied to take the smallest elements. For example, `takeOrdered(10)` returns the 10 smallest elements. Furthermore, you can pass it a function to specify the way in which the elements should be ordered. For example, `takeOrdered(10, lambda x: -x)` will return the 10 largest elements.

The function below extracts the url domain out of a record. **Use it to count the number of distinct domains posted to.**

In [None]:
from urllib.parse import urlparse
def get_domain(rec):
    url = urlparse(rec['url']).netloc
    if url[0:4] == 'www.':
        return url[4:]
    else:
        return url
print(get_domain(dataset.take(1)[0]))

In [None]:
# Count the number of distinct domains


Using `takeOrdered()` find the 25 most popular domains posted to.

In [None]:
print(top25)

In [None]:
index = np.arange(25)
labels = [x[0] for x in top25]
counts = np.array([x[1] for x in top25]) * 100.0/dataset.count()
plt.xticks(index,labels, rotation='vertical')
plt.bar(index, counts, 0.5)