## Data Extraction
**This notebook works best an EMR 6.1.0 (Haddop 3.2.1, Spark 3.0.0) cluster with 1 master-node (m5.xlarge) and 3 core-nodes (m5.xlarge).**

The pushshift dataset is available in the `s3a://bigdatateaching` under the `/reddit/parquet/` prefix. This notebook provides code to read this data so that you can extract the subreddits that meet your requirements and ready to work with the filtered data!

In this notebook we do the following:

1. Set up a SparkSession.

2. Read the `submissions` and `comments` data into Pyspark dataframes and do basic exploratory analysis.

3. Filter the subreddits of interest into new PySpark dataframes, save them into an S3 bucket in your account.

After all these steps, you are ready to work with reddit data for your project!

## Set up a SparkSession

In [2]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.2.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 23.3.1
  latest version: 24.3.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=24.3.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2024.3.11  |       h06a4308_0         127 KB
    certifi-2024.2.2           |  py310h06a4308_0         159 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       341.3 MB

The following NEW packages will be INSTALLED:

  openjdk            pkgs/main/linux-64::openjdk-11.0.13-h87a67e

In [3]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)



:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-34985a56-686b-443e-ba38-60fa5b96605a;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 341ms :: artifacts dl 22ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------

3.2.0


## Save the `submissions` and `comments` data in the local path

In [4]:
!mkdir -p ./code

In [8]:
%%writefile ./code/process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")    
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
    parser.add_argument("--col_name_for_filtering", type=str, help="Name of the column to filter")
    parser.add_argument("--values_to_keep", type=str, help="comma separated list of values to keep in the filtered set")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    logger.info(f"spark version = {spark.version}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

   
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path}")
    df = spark.read.parquet(args.s3_dataset_path, header=True)
    logger.info(f"finished reading files...")
    

    
    # filter the dataframe to only keep the values of interest
    vals = [s.strip() for s in args.values_to_keep.split(",")]
    df_filtered = df.where(col(args.col_name_for_filtering).isin(vals))
    
    # save the filtered dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}"
    logger.info(f"going to write data for {vals} in {s3_path}")
    logger.info(f"shape of the df_filtered dataframe is {df_filtered.count():,}x{len(df_filtered.columns)}")
    df_filtered.write.mode("overwrite").parquet(s3_path)
    
    logger.info(f"all done...")
    
if __name__ == "__main__":
    main()

Overwriting ./code/process.py


Now submit this code to SageMaker Processing Job.

In [4]:
%%time
import time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    framework_version="3.3",
    role=role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_logs = f"spark_logs"

# modify this comma separated list to choose the subreddits of interest
subreddits = "technology, Futurology, news"
configuration = [
    {
        "Classification": "spark-defaults",
        "Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
    }
]

# the dataset contains data for these 3 years
year_list = [2021,2022,2023]

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
CPU times: user 3.24 s, sys: 521 ms, total: 3.76 s
Wall time: 4.26 s


In [5]:
%%time
for yyyy in year_list:
    print(f"going to filter comments data for year={yyyy}")
    s3_dataset_path_commments = f"s3://bigdatateaching/reddit-parquet/comments/year={yyyy}/month=*/*.parquet" # "s3a://bigdatateaching/reddit/parquet/comments/yyyy=*/mm=*/*comments*.parquet"
    output_prefix_data_comments = f"finalproject/comments/yyyy={yyyy}"
    col_name_for_filtering = "subreddit"
    subreddits = "technology, Futurology, news"

    # run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
    spark_processor.run(
        submit_app="./code/process.py",
        arguments=[
            "--s3_dataset_path",
            s3_dataset_path_commments,
            "--s3_output_bucket",
            bucket,
            "--s3_output_prefix",
            output_prefix_data_comments,
            "--col_name_for_filtering",
            col_name_for_filtering,
            "--values_to_keep",
            subreddits,
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
        logs=False,
        configuration=configuration
    )
    # give some time for resources from this iterations to get cleaned up
    # if we start the job immediately we could get insufficient resources error
    time.sleep(60)

going to filter comments data for year=2021


INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-01-10-48-257


..........................................................................................................................................................................................................!

INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-01-28-59-512


going to filter comments data for year=2022
...................................................................................................................................................................................................................!

INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-01-47-55-902


going to filter comments data for year=2023
...................................................................................................................................................!CPU times: user 2.91 s, sys: 523 ms, total: 3.43 s
Wall time: 50min 39s


In [None]:
%%time
for yyyy in year_list:
    print(f"going to filter submissions data for year={yyyy}")
    s3_dataset_path_submissions = f"s3://bigdatateaching/reddit-parquet/submissions/year={yyyy}/month=*/*.parquet" # "s3a://bigdatateaching/reddit/parquet/submissions/yyyy=*/mm=*/*submissions*.parquet"
    output_prefix_data_submissions = f"finalproject/submissions/yyyy={yyyy}"

    # run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
    spark_processor.run(
        submit_app="./code/process.py",
        arguments=[
             "--s3_dataset_path",
            s3_dataset_path_submissions,
            "--s3_output_bucket",
            bucket,
            "--s3_output_prefix",
            output_prefix_data_submissions,
            "--col_name_for_filtering",
            col_name_for_filtering,
            "--values_to_keep",
            subreddits,
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
        logs=False,
        configuration=configuration
    )
    # give some time for resources from this iterations to get cleaned up
    # if we start the job immediately we could get insufficient resources error
    time.sleep(60)

INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-02-20-34-981


going to filter submissions data for year=2021
...................................................................................................................................................................................................................................!

INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-02-40-52-671


going to filter submissions data for year=2022
..............................................................................................................................................................................................................................................................................................................!

INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-04-01-03-07-31-547


going to filter submissions data for year=2023
.............................................................................................................................!CPU times: user 3.31 s, sys: 585 ms, total: 3.9 s
Wall time: 58min 36s


## Read the filtered data

Now that we have filtered the data to only keep submissions and comments from subreddits of interest. Let us read data from the s3 path where we saved the filtered data.

In [6]:
%%time
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_data_comments = "finalproject/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
#s3_path = "s3a://sagemaker-us-east-1-527047370587/project/comments/yyyy=2021/425151 part-00000-28396f0a-9f66-4e79-bdee-5a0fcc71cf24-c000.snappy.parquet"
print(f"reading comments from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
reading comments from s3a://sagemaker-us-east-1-527047370587/finalproject/comments/yyyy=*


24/04/01 02:03:23 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties

shape of the comments dataframe is 31,880,148x21
CPU times: user 672 ms, sys: 188 ms, total: 860 ms
Wall time: 7min 13s


                                                                                

In [7]:
# check counts (ensuring all needed subreddits exist)
comments.groupBy('subreddit').count().show()



+----------+--------+
| subreddit|   count|
+----------+--------+
|      news|21525282|
|technology| 7320261|
|Futurology| 3034605|
+----------+--------+



                                                                                

In [8]:
comments.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)



In [9]:
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "link_id", "id", "created_utc").show()

+----------+--------------------+--------------------+----------+---------+-------+-------------------+
| subreddit|              author|                body| parent_id|  link_id|     id|        created_utc|
+----------+--------------------+--------------------+----------+---------+-------+-------------------+
|      news|            Sulaco99|           Why wait?|t1_gid6yr4|t3_krvwkf|gid7nhx|2021-01-07 00:22:43|
|      news|  jeopardy_themesong|The tweet is gone...|t1_gicxxdr|t3_krvwkf|gid7ni1|2021-01-07 00:22:43|
|      news|   West_Incident9552|You should probab...|t1_gid72ei|t3_krvwkf|gid7njz|2021-01-07 00:22:43|
|      news|      StatusReality4|You wouldn't be a...|t1_gid6aea|t3_krvwkf|gid7nm5|2021-01-07 00:22:44|
|      news|              5omkiy|I believe the ter...|t1_gid79xs|t3_krvwkf|gid7nmz|2021-01-07 00:22:44|
|      news|      AnneONymous125|Peep the video I ...|t1_gid705n|t3_krzopk|gid7nnk|2021-01-07 00:22:44|
|      news|       SpeedflyChris|He's sent a viole...|t1_gid77lu

In [11]:
%%time
output_prefix_data_submissions = f"finalproject/submissions/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_submissions}"
print(f"reading submissions from {s3_path}")
submissions = spark.read.parquet(s3_path, header=True)
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}")

reading submissions from s3a://sagemaker-us-east-1-527047370587/finalproject/submissions/yyyy=*


24/04/01 03:22:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

shape of the submissions dataframe is 1,094,770x68
CPU times: user 132 ms, sys: 26.4 ms, total: 159 ms
Wall time: 2min 59s


                                                                                

In [12]:
# check counts (ensuring all needed subreddits exist)
submissions.groupBy('subreddit').count().show()



+----------+------+
| subreddit| count|
+----------+------+
|      news|868430|
|technology|181596|
|Futurology| 44744|
+----------+------+



                                                                                

In [13]:
submissions.printSchema()

root
 |-- adserver_click_url: string (nullable = true)
 |-- adserver_imp_pixel: string (nullable = true)
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- brand_safe: boolean (nullable = true)
 |-- contest_mode: boolean (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- crosspost_parent: string (nullable = true)
 |-- crosspost_parent_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- approved_at_utc: string (nullable = true)
 |    |    |-- approved_by: string (nullable = true)
 |    |    |-- archived: boolean (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- author_flair_css_class: string (nullable = true)
 |    |    |-- author_flair_text: string (nullable = true)
 |    |    

In [14]:
# display a subset of columns
submissions.select("subreddit", "author", "title", "selftext", "created_utc", "num_comments").show()

+----------+--------------------+--------------------+---------+-------------------+------------+
| subreddit|              author|               title| selftext|        created_utc|num_comments|
+----------+--------------------+--------------------+---------+-------------------+------------+
|      news|           [deleted]|New Warp Drive Mo...|[deleted]|2021-03-30 10:52:37|           0|
|      news|First-Situation-1384|UNICAL CES admiss...|         |2021-03-30 10:54:26|           0|
|      news|             bustead|Attack on Asian W...|         |2021-03-30 10:55:12|           0|
|      news|     Som2ny-Official|'Nomadland' wins ...|         |2021-03-30 10:55:50|           0|
|      news|           pm30music|دانلود آهنگ کردی ...|         |2021-03-30 10:56:28|           0|
|      news|    Anon-fickleflake|Ireland to promot...|         |2021-03-30 10:57:10|           0|
|      news|First-Situation-1384|Newspaper Mistake...|         |2021-03-30 10:57:29|           0|
|      news| DPRK_JU