In [None]:
import time

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, count, rank, concat, lit, max, row_number, broadcast


# Create access point for Spark services
# Use defaultParallelism to determine default number of partitions
# Use getNumPartitions() to verify number is appropriate for data size ~1GB
# Set default parallelism to balance processing speed and hardware limits
spark = SparkSession.builder \
    .appName("reddit loader") \
    .config("spark.default.parallelism", "2") \
    .config("spark.sql.debug.maxToStringFields", "2000") \
    .getOrCreate()

# Reducing sampling ratio used to infer schema to 0.1 to reduce resources needed for schema inference.
# submissions_df = spark.read.option("samplingRatio", 0.05).json("submissions_data.ndjson")

# Create parquet files for more efficient reads and writes
# submissions_df.write.option("compression", "snappy").mode("overwrite").parquet("submissions_data.parquet")

# Read and verify num partitions is appropriate
submissions_df = spark.read.parquet("../data/submissions_data.parquet")
# submissions_df.rdd.getNumPartitions() # --> 3
submissions_df.describe().show() 
submissions_df.count()

your 131072x1 screen size is bogus. expect trouble
24/12/11 01:08:19 WARN Utils: Your hostname, BOOK-80CSR0J7NE resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/12/11 01:08:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/11 01:08:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------+---------------+-----------+--------------------+-------------------+-----------------------------+----------------------+------------------------+--------------------+-----------------------+-----------------+---------------+-------------+---------+--------------+--------+------------------+--------------------+-------------------+----------------+---------------+-------------+---------+------------------+--------------------+----+-------+---------+--------------------+--------+-----+---------------------------+--------------------+----------------------+--------------------+---------------------+---------------+--------+-------------+----------------+----------+-----------------+-------------------+-----------+-----------------------+--------------------+---------------+---------+--------------------+-------------------+--------------+----------+-------------------+--------------------+--------------------+--------+------------------+--------------------+--------------------

839181

In [2]:
# Select the columns of data that we find most relevant (based on interest)
submissions_df = submissions_df.select(
    "author", "author_created_utc", 
    "author_fullname", "created_utc", 
    "id", "name", "num_comments", 
    "num_crossposts", "num_reports", 
    "score", "selftext", "subreddit", 
    "subreddit_id", "subreddit_subscribers", 
    "subreddit_type", "title", 
    "total_awards_received", "downs", "ups", 
    "upvote_ratio", "url")
submissions_df.show(5)

# Next, let's perform preprocessing and generate tables. According to Reddit, t1 indicates comment ids, t2 users, t3 submissions, t5 subreddits.a
# We want to enforce that all ids include this information to facilitate joins.
# The users dataframe will house user information: id, username, and time of join. 
# We can formally enforce that all user id and usernames are unique in Postgres, but for now we dropDuplicates().
users = submissions_df.select(
    col("author_fullname").alias("id"),
    col("author").alias("username"),
    col("author_created_utc").alias("joined_utc").cast("timestamp")
    ).dropDuplicates(["id", "username"])
users.show(5)

                                                                                

+-------------+------------------+---------------+-----------+-----+--------+------------+--------------+-----------+-----+--------+------------------+------------+---------------------+--------------+--------------------+---------------------+-----+---+------------+--------------------+
|       author|author_created_utc|author_fullname|created_utc|   id|    name|num_comments|num_crossposts|num_reports|score|selftext|         subreddit|subreddit_id|subreddit_subscribers|subreddit_type|               title|total_awards_received|downs|ups|upvote_ratio|                 url|
+-------------+------------------+---------------+-----------+-----+--------+------------+--------------+-----------+-----+--------+------------------+------------+---------------------+--------------+--------------------+---------------------+-----+---+------------+--------------------+
|ilikebluepens|              NULL|           NULL| 1309564251|iemkn|t3_iemkn|           0|          NULL|       NULL|    4|        |A

[Stage 8:>                                                          (0 + 3) / 3]

+-----------+---------------+-------------------+
|         id|       username|         joined_utc|
+-----------+---------------+-------------------+
|t2_2ev8r79a|      FGCUPsych|2018-10-14 17:14:07|
|   t2_qwu96|       tedSquee|2015-10-03 07:30:58|
|   t2_lctgx|AngelinaLawmeno|2015-02-14 09:05:23|
|  t2_15it1p|  LightSquirrel|2017-02-18 08:11:15|
|t2_47mr1qdn|    jamielearns|               NULL|
+-----------+---------------+-------------------+
only showing top 5 rows



                                                                                

In [3]:
# The subreddit dataframe identifies subreddit information. We want to enforce that id is unique.
subreddits = submissions_df.select(
    col("subreddit_id").alias("id"),
    col("subreddit").alias("name"),
    col("subreddit_type")
).dropDuplicates(["id"])
subreddits.show(5)

# With subscriber counts we want to make sure that only valid timestamps are present so we drop all rows with an na value.
subscriber_counts = submissions_df.select(
    col("subreddit_id"),
    col("subreddit_subscribers").alias("subscriber_count").cast("int"),
    col("author_created_utc").alias("date").cast("timestamp")
).dropna()
subscriber_counts.show(5)

                                                                                

+--------+-------------+--------------+
|      id|         name|subreddit_type|
+--------+-------------+--------------+
|t5_2qh0k|       cogsci|        public|
|t5_2qjfk|       stocks|        public|
|t5_2qqpg|          NLP|        public|
|t5_2r5zc|       edtech|        public|
|t5_2rawx|climatechange|        public|
+--------+-------------+--------------+
only showing top 5 rows

+------------+----------------+-------------------+
|subreddit_id|subscriber_count|               date|
+------------+----------------+-------------------+
|    t5_2sluh|           18442|2017-05-16 18:55:05|
|    t5_2sluh|           18442|2017-02-12 06:52:13|
|    t5_2sluh|           18442|2014-10-17 09:42:41|
|    t5_2sluh|           18442|2018-04-14 20:33:44|
|    t5_2sluh|           18442|2018-03-12 18:40:44|
+------------+----------------+-------------------+
only showing top 5 rows



In [4]:
# For the submissions dataframe, we format submission id to include the t3_ prefix and select relevant columns.
submissions = submissions_df.select(
    concat(lit("t3_"), col("id")).alias("id"),
    col("title"),
    col("created_utc").cast("timestamp"),
    col("author"),
    col("score").cast("int"),
    col("num_comments").cast("int"),
    col("num_crossposts").cast("int"),
    col("num_reports").cast("int"),
    col("selftext"),
    col("upvote_ratio"),
    col("url"),
    col("total_awards_received"),
    col("downs"),
    col("ups")
)
submissions.show(5)

[Stage 15:>                                                         (0 + 1) / 1]

+--------+--------------------+-----------+-------------+-----+------------+--------------+-----------+--------+------------+--------------------+---------------------+-----+---+
|      id|               title|created_utc|       author|score|num_comments|num_crossposts|num_reports|selftext|upvote_ratio|                 url|total_awards_received|downs|ups|
+--------+--------------------+-----------+-------------+-----+------------+--------------+-----------+--------+------------+--------------------+---------------------+-----+---+
|t3_iemkn|Sister subreddit ...|       NULL|ilikebluepens|    4|           0|          NULL|       NULL|        |        NULL|http://reddit.com...|                 NULL|    2|  6|
|t3_iellt|Decline of Fluid ...|       NULL| inquilinekea|    5|           1|          NULL|       NULL|        |        NULL|http://www.quora....|                 NULL|    2|  7|
|t3_ielfu|Ecological Moment...|       NULL|       drooze|    6|           3|          NULL|       NULL|  

                                                                                

In [5]:
# We can probably impute upvote_ratio, num_reports, and num_crossposts.
# For now, we will remove any columns that are more than 50% missing.
threshold = int(0.50 * submissions.count())
columns_to_keep = [column for column in submissions.columns
    if submissions.filter(submissions[column].isNotNull()).count() >= threshold]
submissions = submissions.select(columns_to_keep)
submissions.describe().show()



+-------+----------+--------------------+--------------------+------------------+-----------------+-------------------+--------------------+------+---------------------+
|summary|        id|               title|              author|             score|     num_comments|     num_crossposts|            selftext|   url|total_awards_received|
+-------+----------+--------------------+--------------------+------------------+-----------------+-------------------+--------------------+------+---------------------+
|  count|    839181|              839181|              839181|            839181|           839181|             762691|              839181|838159|               451933|
|   mean|      NULL|8.555564974470297...|1.357251799493589...|19.061613644732184|7.622629682988533|0.01758641441947001|   5.050050055055E11|  NULL|   0.0723138164285414|
| stddev|      NULL|3.081412198113064...|2.694542489993745...| 268.3973863642429| 52.8010183274944|0.22683376522460375|7.141849278507634E11|  NULL|   

                                                                                

In [8]:
# Repeat the same steps with comments data
# comments_df = spark.read.option("samplingRatio", 0.05).json("comments_data.ndjson")
# comments_df.write.option("compression", "snappy").mode("overwrite").parquet("comments_data.parquet")
comments_df = spark.read.parquet("../data/comments_data.parquet")
comments_df.describe().show()
comments_df.count()

                                                                                

+-------+---------------+-----------+----------------+-------------+--------------------+-----------------------------+----------------------+------------------------+-----------------+-----------------------+-----------------+---------------+-------------+---------+--------------------+--------------------+--------------------+-------------------------------+--------------------+---------------------+------------+--------------------+-------------------+--------------------+-------------+------+--------------------+--------------------+--------------------+-----+----------+--------+-------------+----------------+-------+----------+-----------+--------------------+--------------------+--------------+-------+-------------------+--------------------+--------+------------------+------------------+------------+-----------------------+--------------+----------------+---------------------+------------------+-------------------+------------------+
|summary|approved_at_utc|approved_by|associat

1000833

In [9]:
comments_df = comments_df.select("author", "author_created_utc", "author_fullname", "body", "controversiality", "created_utc", "distinguished", "downs", "id", "name", "num_reports", "parent_id", "permalink", "replies", "score", "subreddit", "subreddit_id", "subreddit_name_prefixed", "subreddit_type", "total_awards_received", "ups")

users = users.union(comments_df.select(
    col("author_fullname").alias("id"),
    col("author").alias("username"),
    col("author_created_utc").alias("joined_utc").cast("timestamp")
    )).dropDuplicates()
users.show(5)

+-----------+----------------+-------------------+
|         id|        username|         joined_utc|
+-----------+----------------+-------------------+
|t2_1v4hef9h|          Iper91|2018-07-27 01:46:33|
|t2_qb41mdeu|          rccaad|2022-07-26 12:27:34|
|t2_t6n4l0nk|Senior-Storm-727|2022-10-08 07:43:01|
|  t2_128krh|        Tyler119|2016-10-19 07:59:11|
|t2_mbom6ns2|  choprakunaleth|2022-04-24 00:36:40|
+-----------+----------------+-------------------+
only showing top 5 rows



In [10]:
# We select the appropriate columns for the comments dataframe and format the id accordingly.
comments = comments_df.select(
    concat(lit("t1_"), col("id")).alias("comment_id"),
    col("parent_id"),
    col("subreddit_id"),
    col("author"),
    col("created_utc").alias("created_at").cast("timestamp"),
    col("body"),
    col("score").cast("int"),
    col("ups").cast("int"),
    col("downs").cast("int"),
    col("controversiality").cast("int"),
    col("distinguished"),
    col("num_reports").cast("int"),
    col("total_awards_received").cast("int"),
    col("permalink")
).dropna(subset=["comment_id", "body"])
comments.show(5)

[Stage 90:>                                                         (0 + 1) / 1]

+----------+---------+------------+---------+----------+--------------------+-----+---+-----+----------------+-------------+-----------+---------------------+---------+
|comment_id|parent_id|subreddit_id|   author|created_at|                body|score|ups|downs|controversiality|distinguished|num_reports|total_awards_received|permalink|
+----------+---------+------------+---------+----------+--------------------+-----+---+-----+----------------+-------------+-----------+---------------------+---------+
|t1_c233p63| t3_iejzu|    t5_2sluh|  amayain|      NULL|I agree that it i...|    3|  3|    0|               0|         NULL|       NULL|                 NULL|     NULL|
|t1_c233q0r| t3_iekel|    t5_2sluh|  amayain|      NULL|I thought I might...|    2|  2|    0|               0|         NULL|       NULL|                 NULL|     NULL|
|t1_c233vof| t3_iejzu|    t5_2sluh|nicson123|      NULL|Yes, the issue of...|    3|  3|    0|               0|         NULL|       NULL|                 NU

                                                                                

In [11]:
# For now, we will remove any columns that are more than 50% missing.
threshold = int(0.50 * comments.count())
columns_to_keep = [column for column in comments.columns
    if comments.filter(comments[column].isNotNull()).count() >= threshold]
comments = comments.select(columns_to_keep)
comments.describe().show()



+-------+----------+--------------------+------------+-------------+--------------------+------------------+--------------------+---------------------+--------------------+
|summary|comment_id|           parent_id|subreddit_id|       author|                body|             score|    controversiality|total_awards_received|           permalink|
+-------+----------+--------------------+------------+-------------+--------------------+------------------+--------------------+---------------------+--------------------+
|  count|   1000833|             1000833|     1000833|      1000833|             1000833|           1000833|             1000833|               738536|              802745|
|   mean|      NULL|4.336460094351968...|        NULL|     Infinity|5.406422518721979E75|2.5126129933765173|0.010223483837962977| 0.001618066011677...|                NULL|
| stddev|      NULL|1.1127094067500521E8|        NULL|          NaN|7.393171601096885E76| 6.075691118144939| 0.10059311272015548| 0.051

                                                                                

In [34]:
# Query the controversial comment with the most replies for each subreddit
# First get the number of replies for each controversial comment
controversial_comments = comments.filter(col("controversiality") == 1)
reply_counts = controversial_comments.alias("cc").join(comments.alias("c"), col("cc.comment_id")==col("c.parent_id"), "left") \
                .groupBy(col("cc.subreddit_id").alias("subreddit_id"), col("cc.comment_id").alias("comment_id")).agg(count("c.comment_id").alias("num_replies"))

# Rank the comments for each subreddit by creating windows and ranking the rows
window_spec = Window.partitionBy("subreddit_id").orderBy(col("num_replies").desc())
ranked_comments = reply_counts.withColumn("row_number", row_number().over(window_spec))

# Grab the top-ranked comment for each subreddit
most_engaging_controversial_comments = ranked_comments.filter(col("row_number") == 1) \
                                                      .select("comment_id", "num_replies")

# Retrieve the comment information for the most engaging comments
most_engaging_controversial_comments = broadcast(most_engaging_controversial_comments).join(
    comments, "comment_id").orderBy(col("num_replies").desc())

# Run the query and evaluate runtime. Observe that Spark runs every action when results are requested
# without storing intermediate results. 
most_engaging_controversial_comments.explain()
start = time.time()
most_engaging_controversial_comments.show(13)
end = time.time()

runtime = (end - start) * 1000
print(f"Total execution time: {runtime} miliseconds") 

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [num_replies#25713L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(num_replies#25713L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=7556]
      +- Project [comment_id#25693, num_replies#25713L, parent_id#25781, subreddit_id#25797, author#25731, body#25748, score#19503, controversiality#19506, total_awards_received#19508, permalink#25782]
         +- BroadcastHashJoin [comment_id#25693], [comment_id#19501], Inner, BuildLeft, false
            :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=7552]
            :  +- Project [comment_id#25693, num_replies#25713L]
            :     +- Filter ((row_number#25718 = 1) AND isnotnull(comment_id#25693))
            :        +- Window [row_number() windowspecdefinition(subreddit_id#14306, num_replies#25713L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#25718], [subre



+----------+-----------+----------+------------+------------------+--------------------+-----+----------------+---------------------+--------------------+
|comment_id|num_replies| parent_id|subreddit_id|            author|                body|score|controversiality|total_awards_received|           permalink|
+----------+-----------+----------+------------+------------------+--------------------+-----+----------------+---------------------+--------------------+
|t1_kf1q6ou|         14|t1_kf1j4jb|    t5_2rawx|TransitionProof625|China emits 26% o...|    1|               1|                    0|/r/climatechange/...|
|t1_kax8i0d|         13|t1_kax83jo|    t5_3crzr| newExperience2020|But why do you ne...|  -14|               1|                    0|/r/ArtificialInte...|
|t1_hl247bc|          9| t3_qw7l65|    t5_2tf7t|        VerumJerum|It's a shame Star...|   13|               1|                    0|/r/ImaginaryTechn...|
|t1_dg95chg|          8| t3_65byl6|    t5_2qh0k|         [deleted]|   

                                                                                

In [None]:
# Query the highest scoring comment for each subreddit and its number of replies
reply_counts = comments.withColumn("row_number", row_number().over(Window.partitionBy("subreddit_id").orderBy(col("score").desc())))
reply_counts = reply_counts.filter(col("row_number") == 1).select("comment_id", "num_replies")
reply_counts = reply_counts.alias("r").join(comments.alias("c"), col("r.comment_id") == col("c.parent_id")).groupBy(col("r.subreddit_id").alias("subreddit_id"), col("r.comment_id").alias("comment_id")).agg(count("c.comment_id").alias("num_replies"))
top_scoring_comment_engagement = broadcast(reply_counts).join(comments, "comment_id").orderBy(col("num_replies").desc())

# Run the query and evaluate runtime. Observe that Spark runs every action when results are requested
# without storing intermediate results. 
top_scoring_comment_engagement.explain()
start = time.time()
top_scoring_comment_engagement.show(13)
end = time.time()

runtime = (end - start) * 1000
print(f"Total execution time: {runtime} miliseconds") 

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [num_replies#25414L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(num_replies#25414L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=7020]
      +- Project [comment_id#25393, subreddit_id#14306, num_replies#25414L, parent_id#25474, subreddit_id#25490, author#25424, body#25441, score#19503, controversiality#19506, total_awards_received#19508, permalink#25475]
         +- BroadcastHashJoin [comment_id#25393], [comment_id#19501], Inner, BuildLeft, false
            :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [plan_id=7016]
            :  +- HashAggregate(keys=[subreddit_id#14306, comment_id#19501], functions=[count(comment_id#25345)])
            :     +- Exchange hashpartitioning(subreddit_id#14306, comment_id#19501, 200), ENSURE_REQUIREMENTS, [plan_id=7013]
            :        +- HashAggregate(keys=[subreddit_id#14306, comment_id#19501], functions=[partial_count(c



+----------+------------+-----------+----------+------------+--------------------+--------------------+-----+----------------+---------------------+--------------------+
|comment_id|subreddit_id|num_replies| parent_id|subreddit_id|              author|                body|score|controversiality|total_awards_received|           permalink|
+----------+------------+-----------+----------+------------+--------------------+--------------------+-----+----------------+---------------------+--------------------+
|t1_kch9kvt|    t5_2rawx|         45|t3_18dgxau|    t5_2rawx|            burrwati|I’m a human right...|  320|               0|                    0|/r/climatechange/...|
|t1_c0noh6g|    t5_2qh0k|         31|  t3_bnr31|    t5_2qh0k|           theonusta|&gt; Mark Jaffe, ...|  723|               0|                 NULL|                NULL|
|t1_kcxfofc|   t5_7ipnaj|         26|t3_18g0cha|   t5_7ipnaj|             pete_68|>  People think t...|  186|               0|                    0|/r

                                                                                