# Data Cleaning and pre-processing

In [2]:
import sagemaker
sess = sagemaker.Session()
bucket = sess.default_bucket()
# !aws s3 ls s3://{bucket}/project/submissions/ --recursive

In [3]:
# Setup - Run only once per Kernel App
%conda install https://anaconda.org/conda-forge/openjdk/11.0.1/download/linux-64/openjdk-11.0.1-hacce0ff_1021.tar.bz2

# install PySpark
%pip install pyspark==3.4.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")


Downloading and Extracting Packages:

Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [4]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)



:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/sagemaker-user/.ivy2/cache
The jars for the packages stored in: /home/sagemaker-user/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6a3c965c-65b0-448d-8eb6-e18d716c2719;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 209ms :: artifacts dl 7ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------

3.4.0


In [5]:
output_prefix_data_submissions = f"project/submissions/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_submissions}"
submissions = spark.read.parquet(s3_path, header=True)
submissions = submissions.select("subreddit", "author", "title", "selftext", "num_comments", "score", "created_utc", "id").cache()

24/11/13 20:06:23 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [6]:
output_prefix_data_comments = "project/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
comments = spark.read.parquet(s3_path, header=True)
comments = comments.select("subreddit", "author", "body", "parent_id", "link_id", "id", "controversiality","score", "created_utc")

In [7]:
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.pandas as ps
from pyspark.sql import functions as F
# import seaborn as sns



In [8]:
filtered_submissions = submissions \
    .withColumn("title", F.lower(F.col("title"))) \
    .filter(F.col("title").rlike(r"(?i)Trump|(?i)Biden")) \
    .filter(~F.col("selftext").contains("[deleted]")) \
    .withColumn("date", F.date_format(F.from_unixtime(F.col("created_utc")), "yyyy-MM-dd"))

In [9]:
filtered_submissions.show()

[Stage 2:>                                                          (0 + 1) / 1]

+------------+------------------+--------------------+---------+------------+-----+-----------+-------+----------+
|   subreddit|            author|               title| selftext|num_comments|score|created_utc|     id|      date|
+------------+------------------+--------------------+---------+------------+-----+-----------+-------+----------+
|   democrats|  BrianForCongress|every single majo...|         |           0|    1| 1721575517|1e8osif|2024-07-21|
|    politics|         buddyboys|biden asking if h...|         |          76|    0| 1721575870|1e8oxdk|2024-07-21|
|Conservative|         yuri_2022|biden announces h...|         |           5|   56| 1721576100|1e8p0bl|2024-07-21|
|Conservative|         yuri_2022|going full trump:...|         |           0|    6| 1721576177|1e8p1cb|2024-07-21|
|Conservative|         yuri_2022|new poll spells d...|         |           8|   31| 1721576350|1e8p3l4|2024-07-21|
|Conservative|  Muted_Leader_327|proud to vote for...|[removed]|           0|   

                                                                                

In [10]:
output_path = "../data/csv/filtered_submissions.parquet"

# Write the DataFrame to a Parquet file
filtered_submissions.write.parquet(output_path)

                                                                                

In [9]:
a = filtered_submissions.toPandas()

                                                                                

In [10]:
a.to_csv("../data/csv/filtered_submissions.csv")

In [28]:
trump_submissions = submissions \
    .withColumn("title", F.lower(F.col("title"))) \
    .filter(F.col("title").rlike(r"(?i)Trump")) \
    .filter(~F.col("selftext").contains("[deleted]"))

[Stage 14:==>            (24 + 4) / 130][Stage 15:>                 (0 + 0) / 4]

In [29]:
biden_submissions = submissions \
    .withColumn("title", F.lower(F.col("title"))) \
    .filter(F.col("title").rlike(r"(?i)Biden")) \
    .filter(~F.col("selftext").contains("[deleted]"))

In [30]:
harris_submissions = submissions \
    .withColumn("title", F.lower(F.col("title"))) \
    .filter(F.col("title").rlike(r"(?i)Harris")) \
    .filter(~F.col("selftext").contains("[deleted]"))

[Stage 14:====>          (36 + 4) / 130][Stage 15:>                 (0 + 0) / 4]

In [32]:
trump_submissions.toPandas().to_csv("../data/csv/trump_submissions.csv")
biden_submissions.toPandas().to_csv("../data/csv/biden_submissions.csv")
harris_submissions.toPandas().to_csv("../data/csv/harris_submissions.csv")

                                                                                

In [33]:
def filter_comments(filtered_submission):
    # Collect the unique IDs from filtered_submissions into a set
    post_ids = filtered_submission.select("id").rdd.flatMap(lambda x: x).collect()
    
    # Prefix each post_id with "t3_" and update the post_ids set
    prefixed_post_ids = {"t3_" + post_id for post_id in post_ids}
    
    # Filter the comments DataFrame
    filtered_comments = comments.filter(F.col("link_id").isin(prefixed_post_ids))
    
    return filtered_comments

In [34]:
# trump_comments = filter_comments(trump_submissions)
# biden_comments = filter_comments(biden_submissions)
# harris_comments = filter_comments(harris_submissions)

                                                                                

In [35]:
# trump_comments.toPandas().to_csv("../data/csv/trump_comments.csv")
# biden_comments.toPandas().to_csv("../data/csv/biden_comments.csv")
# harris_comments.toPandas().to_csv("../data/csv/harris_comments.csv")

24/11/08 22:45:40 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
ERROR:root:KeyboardInterrupt while sending command.               (3 + 5) / 130]
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
[Stage 26:=>                                                      (4 + 4) / 130]

KeyboardInterrupt: 

In [48]:
# post_ids = trump_submissions.select("id").rdd.flatMap(lambda x: x).collect()

In [47]:
# trump_submissions.filter(F.col("num_comments") != 0).select("id").rdd.flatMap(lambda x: x).collect()

# trump_submissions.filter(F.col("num_comments") != 0).selectExpr("id").rdd.map(lambda row: row[0]).collect()

In [42]:
harris_ids = submissions \
    .withColumn("title", F.lower(F.col("title"))) \
    .filter(F.col("title").rlike(r"(?i)Harris")) \
    .filter(~F.col("selftext").contains("[deleted]")) \
    .filter(F.col("num_comments") != 0) \
    .selectExpr("id").rdd.map(lambda row: row[0]).collect()

                                                                                

In [46]:
submission_ids_df = spark.createDataFrame([(id,) for id in harris_ids], ["id"])

# Step 2: Modify the IDs to match the `link_id` format
submission_ids_df = submission_ids_df.withColumn("link_id", F.concat(F.lit("t3_"), submission_ids_df["id"]))

# Step 3: Join and filter the `comments` DataFrame based on the `link_id`
filtered_comments = comments.join(submission_ids_df, on="link_id", how="inner")

# Now `filtered_comments` contains only comments with `link_id`s matching the selected IDs.
filtered_comments.show()



+----------+---------+--------------------+--------------------+----------+-------+----------------+-----+-----------+-------+
|   link_id|subreddit|              author|                body| parent_id|     id|controversiality|score|created_utc|     id|
+----------+---------+--------------------+--------------------+----------+-------+----------------+-----+-----------+-------+
|t3_1dswta6| politics|       AutoModerator|\nAs a reminder, ...|t3_1dswta6|lb586l9|               0|    1| 1719850125|1dswta6|
|t3_1dswta6| politics|       Brian24jersey|That’s right lol ...|t3_1dswta6|lb58gyy|               1|   -4| 1719850222|1dswta6|
|t3_1dswta6| politics|           Tbone2797|I'm sure Republic...|t3_1dswta6|lb59hgr|               0|   27| 1719850555|1dswta6|
|t3_1dswta6| politics|          bl3ckm3mba|Probably many sta...|t3_1dswta6|lb5a04i|               1|   -3| 1719850724|1dswta6|
|t3_1dswta6| politics|           fairoaks2|As usual the Repu...|t3_1dswta6|lb5a1hl|               0|    8| 1719

                                                                                

In [49]:
filtered_comments.toPandas().to_csv("../data/csv/harris_comments.csv")

                                                                                