In [1]:
%load_ext lab_black

In [2]:
import requests
import json
import os
import pathlib
import time
import bs4
import sqlite3
import newspaper

import numpy as np

from IPython.display import clear_output
from urllib3.exceptions import LocationParseError
from multiprocessing import Pool
from math import floor, ceil
from subreddit_sql import sqlize

The following cell defines three functions which will act as a handler for the Pushshift API for data aggregation. As we want as many posts as possible, the data_aggregator function will start at the beginning of time (also known as 00:00 January 1st, 1970), fetches the first 500 posts, then repeats this process using the timestamp of the last post it fetched as the first time to check. This cycle continues while being rate-limited by submission_api_handler until there are no new posts received, in which case the loop automatically terminates. In the case that pushshift_fetcher receives an invalid status code, it will attempt to repeat the query. After ten consecutive failed attempts, the function will raise an error and terminate.

Upon a successful download, the posts are stored loosely as individual json files with a name corresponding to their integer id. I made this choice because not every post will contain the same keys, and this method is robust against crashes as it references files in storage for looping.

In [3]:
def pushshift_fetcher(endpoint, params):
    # send formatted request to pushshift api
    base_url = "https://api.pushshift.io/reddit/"
    api_url = f"{base_url}{endpoint}"

    r = requests.get(api_url, params=params)

    return r


def data_aggregator(subreddit):
    # create directory to store json files
    data_directory = "../data/raw"
    p = pathlib.Path(f"{data_directory}/{subreddit}")
    p.mkdir(parents=True, exist_ok=True)

    # get list of currently existing files
    # and get most recent post
    all_files = list(p.glob("*.json"))
    if all_files:
        last_fetched_file = all_files[np.argmax([int(f.stem) for f in all_files])]
        with open(last_fetched_file, "r") as last:
            content = json.load(last)
            # get timestamp and id of most recent post
            last_utc = content["created_utc"] + 1
            last_id = content["id"]

    else:
        # if no posts are saved
        # set timestamp and id as zero
        last_utc = 0
        last_id = 0

    # specify params for requests.get
    params = {
        "subreddit": subreddit,
        "after": last_utc,
        "size": 500,
        "sort_type": "id",
    }
    # get content
    r = pushshift_fetcher("search/submission/", params)

    # if response code is not 200
    # retry until it is or ten consecutive fails
    fails = 0
    while r.status_code != 200:
        fails += 1
        time.sleep(10)
        r = pushshift_fetcher("search/submission/", params)
        if fails >= 10:
            # not sure which error to use here
            raise BaseException("Request failed ten consecutive times. Exiting.")

    # write each post returned to its own json file
    data = r.json()["data"]
    if data:
        for d in data:
            # convert base 36 id to integer id
            base36_id = d["id"]
            integer_id = int(base36_id, 36)
            dump_path = p / f"{integer_id}.json"
            with dump_path.open(mode="w+") as new_file:
                json.dump(d, new_file)

        # if data has entries, return true
        return True

    else:
        # if data is empty, return false
        return False


def submission_api_handler(subreddit):
    rate_limiter = 1
    is_new_content = True
    while is_new_content:
        is_new_content = data_aggregator(subreddit)
        time.sleep(rate_limiter)

    return


submission_api_handler("theonion")
submission_api_handler("nottheonion")

r/TheOnion and r/nottheonion are "news" aggregate subreddits. Users will share links to articles they find online, and occasionally videos. As such, there are very few text posts from users. This left me with three options: use only the titles of posts, incorporate the comments, or follow the links to get the text from the actual articles. I chose the latter. The function below iterates through all of the json files, gets the url to the article if it's available, and uses automatic webscraping with Newspaper3k to extract the article content. The article text is then saved as a txt file with the same name as its corresponding post json.

In [4]:
def article_content_fetcher(path):
    integer_id = path.stem
    text_dump = pathlib.Path(path.parent / f"{integer_id}.txt")

    if not text_dump.is_file():
        js = path.open(mode="r")
        json_content = json.load(js)
        url = json_content.get("url")
        try:
            article = newspaper.Article(url=url)
            article.download()
            article.parse()

            content = article.text

            if not content:
                content = ""

        except (TypeError, newspaper.ArticleException, LocationParseError):
            # TypeError will occur when there is no url
            # ArticleException will occur when the response forbids webscraping
            # not sure what causes LocationParseError, but it's rare
            content = ""

        with text_dump.open(mode="w+") as txt:
            txt.write(content)

    return

Webscraping like this is an incredibly long process, and would taken well over a week on its own. Therefore, multiprocessing is used to reduce this time to under 24 hours.

In [None]:
onion_directory = "../data/raw/theonion"
onion_path = pathlib.Path(onion_directory)
onion_glob = list(onion_path.glob("*.json"))

not_onion_directory = "../data/raw/nottheonion"
not_onion_path = pathlib.Path(not_onion_directory)
not_onion_glob = list(not_onion_path.glob("*.json"))

pool = Pool()
not_onion_percent = 0
for onion_percent in range(100):
    # prints progress as percents
    bot = floor(onion_percent * len(onion_glob) / 100.0)
    top = ceil((onion_percent + 1) * len(onion_glob) / 100.0)

    pool.map(article_content_fetcher, onion_glob[bot:top])

    clear_output(wait=True)
    print(f"Onion harvesting {onion_percent + 1}% complete")
    print(f"Not Onion harvesting {not_onion_percent}% complete")

for not_onion_percent in range(100):
    bot = floor(not_onion_percent * len(not_onion_glob) / 100.0)
    top = ceil((not_onion_percent + 1) * len(not_onion_glob) / 100.0)

    pool.map(article_content_fetcher, not_onion_glob[bot:top])

    clear_output(wait=True)
    print(f"Onion harvesting {onion_percent + 1}% complete")
    print(f"Not Onion harvesting {not_onion_percent + 1}% complete")

pool.close()

Finally, up until now, all files have been stored loosely, with one example per file. The sheer number of posts (over 500,000) makes estimating total data volume tough. During data aggregation, I was worried that the magnitude of data could exceed what pandas could practically handle. Therefore, I created a SQL database in storage that I could write formatted data to. This would allow me to do the first round of data cleaning without having to load all of the data into memory (this fear turned out to be mostly unfounded, with only about 1.1 GB of data being stored total).

The following function gets all possible keys found in any json file, creates a table with the same keys, and formats each file as a row.

In [6]:
sqlize("../data/reddit.db", "theonion")

Posts SQLized: 17644/17644

In [7]:
sqlize("../data/reddit.db", "nottheonion")

Posts SQLized: 487364/487364