In [1]:
%load_ext lab_black

In [2]:
import warnings
import pathlib
import newspaper

from psycopg2 import connect
from urllib3.exceptions import LocationParseError
from multiprocessing import Pool
from math import floor, ceil
from IPython.display import clear_output

# Article Aggregation 

There are quite a few source urls in the SQL table. Therefore, it is a good idea for this code to be able to start and stop as needed. The function below will fetch the article text and title given a url, and save them directly to files. No processing is done at this time, just file aggregation.

In [3]:
def article_content_fetcher(row):
    path = pathlib.Path("../data/raw")
    path.mkdir(parents=True, exist_ok=True)
    _id, url, date, day = row

    # create files for the text and title
    text_dump = pathlib.Path(path / f"{_id}.text")
    title_dump = pathlib.Path(path / f"{_id}.title")

    if not text_dump.is_file():
        # if the file already exists, skip it
        try:
            # there are a lot of things that can prevent successful webscraping
            # if nothing throws an error, the text and title are saved
            article = newspaper.Article(url=url)
            article.download()
            article.parse()

            article_text = article.text
            article_title = article.title

            article_text = article_text if article_text else ""
            article_title = article_title if article_title else ""

        # if something does go wrong, empty strings are written instead
        except (TypeError, newspaper.ArticleException, LocationParseError):
            # TypeError will occur when there is no url
            # ArticleException will occur when the response forbids webscraping
            # not sure what causes LocationParseError, but it's rare
            article_text = ""
            article_title = ""

        # the content is written to files
        with text_dump.open(mode="w+") as txt, title_dump.open(mode="w+") as ttl:
            txt.write(article_text)
            ttl.write(article_title)

    return

Next, we need a function which will write the relevant content to a table for storage. As the aggregated files are iterated through, this function will write a row for each row in the orginal SQL table it is given.

In [4]:
def row_writer(row, conn):
    # unpack the row into named variables
    _id, url, date, day = row
    path = pathlib.Path("../data/raw/")

    # get the text and title
    text_dump = pathlib.Path(path / f"{_id}.text")
    title_dump = pathlib.Path(path / f"{_id}.title")

    cursor = conn.cursor()

    with text_dump.open(mode="r") as txt, title_dump.open(mode="r") as ttl:
        title = title_dump.read_text()
        text = text_dump.read_text()
        # write the content into the (to be created) source_text table
        cmd = f"INSERT INTO source_text(id, source, date, day, title, text) VALUES (%s, %s, %s, %s, %s, %s)"
        cursor.execute(
            cmd, (_id, url, date, day, title if title else None, text if text else None)
        )

    conn.commit()

Next, have to select a subset of the articles. The following command will:

1. Create a new table for the article text
2. Group the sources by week to conform with the price data
3. Order them randomly
4. Select a maximum of 1000 articles per week

The limit is implemented to ensure that some weeks don't have an absurd amount of data compared to others, especially because the later years have much more data than earlier years. Additionally, both the formatted date and unformatted date are returned. This is because, due to some error, some dates are returned in the year 1920, which is far earlier than GDELT actually maintains. I wanted to see if the error was on my end or GDELT's (spoiler: it was GDELT).

In [5]:
SQL_RANDOM_SEED = 0.42
ENTRIES_PER_WEEK = 1000

In [6]:
cmd = f"""
      DROP TABLE IF EXISTS source_text;
      CREATE TABLE source_text(id integer PRIMARY KEY, source text, date timestamp, day text, title text, text text); -- create table for source text 
      SELECT SETSEED ({SQL_RANDOM_SEED}); -- set a seed for reproducability
      WITH grouped_by_week AS (
          SELECT
              DISTINCT ON (SOURCEURL) SOURCEURL AS source,
              id,
              TO_DATE(Day::text, 'YYYYMMDD') AS date,
              Day as day,
              RANK() OVER (PARTITION BY date_trunc('week', TO_DATE(Day::text, 'YYYYMMDD')) ORDER BY RANDOM()) as row
          FROM events
      )
      SELECT
          id,
          source,
          date,
          day
      FROM grouped_by_week
      WHERE row <= {ENTRIES_PER_WEEK};
      """

Finally, a function to exacute all of the above. Here we implement code which will do the above tasks in a multiprocessed fashion.

In [7]:
pool = Pool()
with open("../etc/postgres.password") as psql_pass_file:
    postgres_password = psql_pass_file.read()
    conn = connect(
        f"host='localhost' dbname='gdelt' user='postgres' password='{postgres_password}'"
    )
    cursor = conn.cursor()
    cursor.execute(cmd)
    conn.commit()
    rows = cursor.fetchall()
    print("Source article download progress 0% complete")
    # the task is broken down into blocks so that progress can be tracked
    for percent in range(100):
        # prints progress as percents
        bot = floor(percent * len(rows) / 100.0)
        top = ceil((percent + 1) * len(rows) / 100.0)

        pool.map(article_content_fetcher, rows[bot:top])

        clear_output(wait=True)
        print(f"Source article download progress {percent + 1}% complete")
    for n, row in enumerate(rows):
        row_writer(row, conn)

        clear_output(wait=True)
        print(f"Source article download progress 100% complete")
        print(f"{n + 1}/{len(rows)} rows written to postgreSQL table")

Source article download progress 100% complete
112475/112475 rows written to postgreSQL table
