# reddit NLP ETL prototype

This code prototypes a Spark workflow that retrieves data sets of compressed reddit posts and outputs TF-IDF scores for each post. After processing, a sample of records is exported to Amazon's DynamoDB for further analysis by a data science team.

This code has been tested on Google Colab's Spark engine for the years 2006 and 2007. A more thorough test is planned using a five-node Spark cluster provisioned from Amazon's EMR service. The entire workflow, spanning 10 years of posts and approximately 2.5B records, will be run on a 12-node EMR cluster.

In [None]:
# Install Spark and its dependencies and configure environment

import os

spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to security.u0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Hit:3 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
                                                                       

In [None]:
# Set location of AWS credentials (different from default: this is for boto3)
# No longer needed, as files are not saved to S3

# os.environ["AWS_SHARED_CREDENTIALS_FILE"] = "/content/credentials"


In [None]:
# Install application-specific dependencies
# Specific versions are required to resolve dependencies associated
# with boto3 (and *its* dependencies)
# Note the order must be preserved, as the boto3 install will
# also upgrade urllib3 to an incompatable version
# Also no longer needed

#!pip install folium==0.2.1
#!pip install requests==2.23.0
#!pip install boto3
#!pip install urllib3==1.25.4

## Initialize Spark and Import Packages

In [None]:
# Initialize Spark instance

import findspark
findspark.init()

In [None]:
# Imports

from pyspark.sql import SparkSession

# File handling and NLP

from pyspark import SparkFiles
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

# requests used to retrieve files
# bz2 to unzip them (bzip2 compression)
# time calculates running time of main loop

import requests
import bz2
import time
#import boto3

In [None]:
# Start Spark session

spark = SparkSession.builder.appName("RedditNlpEtlPoCTest").getOrCreate()

## Function definitions

### Data Retrieval and Initial Processing

In [None]:
def generate_urls(url_stub, start_year, end_year):
  """
  Generates a list of URLs corresponding the the Reddit posts
  to be retrieved. The URL pattern is somewhat idiosyncratic,
  so if it changes this code will need to be modified.

  Arguments:
    url_stub: the common prefix used for all URLs
    start_year, end_year: the beginning and ending years
    of posts to retrieve. For a single year, set
    start_year = end_year. Both start_year and end_year
    are inclusive, so (2010, 2012) will return three years
    of URLs

  Returns:
    list of URLs with one URL for each month and year
    to be retrieved

  """
  
  # URLs appended to this list
  file_urls = []
  
  for year in range(start_year, (end_year + 1)):
    
    # Set extension by year for most files
      
    if year < 2018:
      extension = '.bz2'
    elif year < 2019:
      extension = '.xz'
    else:
      extension = '.zst'
      
    # Generate the URLs based on the observed patterns

    for month in range(1,13):
          
      # Handle a few special cases    
      if (year == 2017 and month == 12):
        extension = '.xz'
      if (year == 2018 and month in [11,12]):
        extension = '.zst'

      # Create the file name, adding the leading zero
      # if the month is 1 - 9
      if month < 10:
        file = 'RC_' + str(year) + '-0' + str(month) + extension
      else:
        file = 'RC_' + str(year) + '-' + str(month) + extension
              
      file_urls.append(url_stub + file)

  return file_urls

In [None]:
def get_file(url):
  """
    Retrieves a file from the supplied URL using the requests library.
    Basically a wrapper for requests.get(), but abstracted as a function
    so type-checking and error-handling can be added if needed.

    Argument:
      url: the URL specifying the resource to be retrieved

    Returns:
      The content of the request response. In this invocation, this
      will be the compressed file of reddit posts at the supplied URL

  """
  
  response = requests.get(url)

  return response.content

In [None]:
def unzip_file(zipped_file):
  """
    Decompresses the supplied file, which must be in bzip2 format.
    Requires the bz2 library.

    Argument:
      The compressed file in bzip2 format

    Returns:
      A (potentially very long) string containing the uncompressed
      file contents.
  """

  unzipped_file = bz2.decompress(zipped_file).decode()

  return unzipped_file

### NLP Pipeline Functions

In [None]:
def tokenize(nlp_df):
    
  nlp_tokenizer = Tokenizer(inputCol="body", outputCol="post_words")
  tokenized_df = nlp_tokenizer.transform(nlp_df)
  return tokenized_df

In [None]:
def remove_stopwords(tokenized_df):

  remover = StopWordsRemover(inputCol='post_words', outputCol='post_filtered')
  remover.loadDefaultStopWords('english')
  filtered_df = remover.transform(tokenized_df)
  return filtered_df

In [None]:
def hasher(hashable_df):

# Number of Features is default (262,144)

  hasher = HashingTF(inputCol='post_filtered', outputCol='post_hashed')
  hashed_df = hasher.transform(hashable_df)
  return hashed_df

In [None]:
def tfidf_calc(hashed_df):

  tfidf = IDF(inputCol='post_hashed', outputCol='post_tfidf')
  tfidfModel = tfidf.fit(hashed_df)
  tfidf_df = tfidfModel.transform(hashed_df)
  return tfidf_df

## Main Function

This function retrieves the reddit posts, decompresses them, and constructs a Spark DataFrame from them. The DataFrame is then processed by the NLP pipeline to output estimated TF-IDF scores for each post.

The corpus for this pipeline is the dowloaded posts: if posts from 2006 are downloaded, then the TF-IDF scores will be relative to 2006 posts. If 2006 and 2007 are downloaded, the scores will be relative to both years; and so on.

In [None]:

if __name__ == '__main__':

  # Main function for prototype.
  # Generates a list of URLs for the provided url_stub and range of years.
  # Then uses a loop to retrieve each file, decompress it,
  # and generate an RDD.
  # The union of these RDDs generates the DataFrame,
  # and the NLP pipeline outputs TF-IDF scores for each post
  # to a new column.

  start_year = 2006
  end_year = 2006
  url_stub = 'https://files.pushshift.io/reddit/comments/'
  #s3_bucket_prefix = 'reddit-nlp-etl-spark-posts'
  reddit_urls = generate_urls(url_stub, start_year, end_year)

  sc = spark.sparkContext
  #s3 = boto3.resource('s3')

  rdd_list = []

  # Display starting time (current time)
  print(f'Main loop starting at', {time.asctime()}, '\n')
  for url in reddit_urls:

    # Get file from website and decompress it to text  
    file_name = url.split('/')[5].split('.')[0]
    print(f'Now processing', file_name)

    reddit_file = get_file(url)
    unzipped_file = unzip_file(reddit_file)
  
    # Not saving files during this test run
    # s3.Bucket(s3_bucket_prefix).put_object(Key=file_name, Body=unzipped_file)

    # Make an RDD from the file and append it to a list
    print('Generating RDD from file and appending to rdd_list\n')
    rdd_list.append(sc.parallelize(unzipped_file.splitlines()))

  
  # Create Spark DataFrame from RDDs
  # (easier than appending each one in succession;
  # not sure if faster or not)
  print(f'Creating DataFrame of posts from years', start_year, 'to', end_year, '\n')
  reddit_df = spark.read.json(sc.union(rdd_list))

  # NLP pipeline
  print('Beginning NLP pipeline')
  print(f'\t* Tokenizing posts')
  reddit_tokenized_df = tokenize(reddit_df)
  print(f'\t* Removing stop words')
  reddit_filtered_df = remove_stopwords(reddit_tokenized_df)
  print(f'\t* Hashing posts')
  reddit_hashed_df = hasher(reddit_filtered_df)
  print(f'\t* Calculating TF-IDF scores')
  reddit_tfidf_df = tfidf_calc(reddit_hashed_df)

  # Display concluding time
  print(f'\nScript concluded at', {time.asctime()})


In [None]:
# Check a few rows to make sure everything worked

cols = ['author', 'body', 'post_words', 'post_filtered', 'post_hashed', 'post_tfidf']
reddit_tfidf_df.select(cols).show(5, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 author        | jh99                                                                                                                                                                                                                                                                                          

In [None]:
# Print schema

reddit_tfidf_df.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)
 |-- post_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- post_filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- post_hashed: vector (nullable = true)
 |-- post_tfidf: vector (nullable = tr

In [None]:
# Drop unneeded fields

reddit_export_df = reddit_tfidf_df.drop(
    'author_flair_css_class', 'author_flair_text', 'body', 'created_utc', 'edited', 'link_id', 'parent_id', 'retrieved_on', 'subreddit_id', 'post_words', 'post_filtered', 'post_hashed')

In [None]:
reddit_export_df.show()

+----------------+----------------+-------------+------+-----+-----+--------+----------+---+--------------------+
|          author|controversiality|distinguished|gilded|   id|score|stickied| subreddit|ups|          post_tfidf|
+----------------+----------------+-------------+------+-----+-----+--------+----------+---+--------------------+
|            jh99|               0|         null|     0|c2715|    0|   false|reddit.com|  0|(262144,[90957,13...|
|             jpb|               0|         null|     0|c2717|    0|   false|reddit.com|  0|(262144,[52351,57...|
|       Pichu0102|               0|         null|     0|c2718|    2|   false|reddit.com|  2|(262144,[7987,661...|
|        libertas|               0|         null|     0|c2719|    2|   false|reddit.com|  2|(262144,[2306,171...|
|        mdmurray|               0|         null|     0|c2722|    0|   false|reddit.com|  0|(262144,[8804,218...|
|        mdmurray|               0|         null|     0|c2723|    1|   false|reddit.com|