## If using this notebook on AWS/EMR, Start Here!

In [None]:
## Run this cell in AWS/EMR Only
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("reddit").getOrCreate()

In [None]:
# AWS/EMR
spark

In [None]:
# 150GB File - JSON plain text
reddit_text = spark.read.json("s3://bigdatateaching/reddit/plain-text/RC_2019-04")

In [None]:
# Same dataset in Parquet format
reddit = spark.read.parquet("s3://bigdatateaching/reddit/plain-text/parquet")

## If using this notebook in Databricks, start here!

When using this notebook in Databricks, the SparkSession `spark` already exists.

In [None]:
# Run this cell in Databricks
# read sas
dbutils.fs.mount(
  source = "wasbs://public@bigdatateaching.blob.core.windows.net/reddit",
  mount_point = "/mnt/reddit",
  extra_configs = {"fs.azure.sas.public.bigdatateaching.blob.core.windows.net":
"https://bigdatateaching.blob.core.windows.net/?sv=2020-02-10&ss=b&srt=co&sp=rl&se=2021-08-02T03:58:56Z&st=2021-03-16T15:58:56Z&spr=https&sig=WOWPuCpfSGQaYZu7HWNDm4Pcp%2BCq0VAyt8ieOEt%2FZ0o%3D"})

In [None]:
# 150GB File - JSON plain text
reddit_text = spark.read.json("/mnt/reddit/RC_2019-04")")

In [None]:
# Same dataset in Parquet format
reddit = spark.read.parquet("/mnt/reddit/parquet/")

## Analysis begins here

In [None]:
reddit.printSchema()

In [None]:
reddit_text.printSchema()

In [None]:
reddit.count()

In [None]:
reddit_text.count()

In [None]:
# List of columns to drop 

drop_columns = (
 'archived',
 'author',
 'author_cakeday',
 'author_created_utc',
 'author_flair_background_color',
 'author_flair_css_class',
 'author_flair_richtext',
 'author_flair_template_id',
 'author_flair_text',
 'author_flair_text_color',
 'author_flair_type',
 'author_fullname',
 'author_patreon_flair',
 'can_gild',
 'can_mod_post',
 'collapsed_reason',
 'distinguished',
 'edited',
 'id',
 'link_id',
 'parent_id',
 'permalink',
 'removal_reason',
 'send_replies',
 'stickied',
 'subreddit',
 'subreddit_id',
 'subreddit_name_prefixed',
)

In [None]:
# Drop columns and save to a new dataframe
reddit_small = reddit.drop(*drop_columns)

In [None]:
reddit_small.printSchema()

In [None]:
# Register a table and run a sql query
reddit.createOrReplaceTempView("sql_tbl")
spark.sql(""" select Score, count(Score) as number 
    from sql_tbl group by Score order by number desc limit 10""").show()

This following cell may produce an error in AWS/EMR. You'd need to `import pandas as pd` first.


In [None]:
# Querying average score for each subreddit
subreddit_avg_score = sqlContext.sql(""" 
    SELECT subreddit, AVG(score) as avg_score 
    FROM sql_tbl
    GROUP BY subreddit 
    ORDER BY avg_score DESC 
    """)
subreddit_score = subreddit_avg_score.toPandas()