# Data Collection

In [1]:
#| output: false
#| echo: false

spark

StatementMeta(7c27951d-2bdf-4140-be82-d0dc0c0e7d8e, 20, 6, Finished, Available)

### Setting up Data Configuration

In [2]:
#| output: false
blob_account_name = "marckvnonprodblob"
blob_container_name = "bigdata"
# read only
blob_sas_token = "?sv=2021-10-04&st=2023-10-04T01%3A42%3A59Z&se=2024-01-02T02%3A42%3A00Z&sr=c&sp=rlf&sig=w3CH9MbCOpwO7DtHlrahc7AlRPxSZZb8MOgS6TaXLzI%3D"

wasbs_base_url = (
    f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/"
)
spark.conf.set(
    f"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net",
    blob_sas_token,
)

StatementMeta(7c27951d-2bdf-4140-be82-d0dc0c0e7d8e, 20, 7, Finished, Available)

### Reading in Reddit Data

In [3]:
#| output: false

comments_path = "reddit-parquet/comments/"
submissions_path = "reddit-parquet/submissions/"

comments_single_df = spark.read.parquet(f"{wasbs_base_url}{comments_path}")
submissions_single_df = spark.read.parquet(f"{wasbs_base_url}{submissions_path}")

StatementMeta(7c27951d-2bdf-4140-be82-d0dc0c0e7d8e, 20, 8, Finished, Available)

### Selecting Desired Subreddits

In [5]:
#| output: false

subreddit_values = ["CollegeBasketball", "jayhawks", "tarheels"]
submissions_filtered = submissions_single_df.filter(submissions_single_df['subreddit'].isin(subreddit_values))
comments_filtered = comments_single_df.filter(comments_single_df['subreddit'].isin(subreddit_values))

from pyspark.sql.functions import *
from datetime import datetime

start_timestamp = datetime(2021, 9, 1)
end_timestamp = datetime(2022, 4, 30)

submissions_filtered = submissions_filtered.filter((col("created_utc") >= start_timestamp) & (col("created_utc") <= end_timestamp))
comments_filtered = comments_filtered.filter((col("created_utc") >= start_timestamp) & (col("created_utc") <= end_timestamp))

StatementMeta(d3c1b951-094c-48e1-919d-e32bc175397b, 7, 10, Finished, Available)

### Saving Intermediate Data

In [2]:
#| output: false

workspace_default_storage_account = "aml6214727930"
workspace_default_container = "azureml-blobstore-6653633b-3460-4381-9199-d9e0f368353c"

workspace_wasbs_base_url = (
    f"wasbs://{workspace_default_container}@{workspace_default_storage_account}.blob.core.windows.net/"
)

comments_filtered.select("subreddit", "author","author_flair_text", "body", "parent_id", "link_id", "id", "created_utc").write.mode("overwrite").parquet(f"{workspace_wasbs_base_url}/basketball_comments_sep.parquet")
submissions_filtered.select("subreddit", "author", 'author_flair_text',"title", "selftext", "created_utc", "num_comments").write.mode("overwrite").parquet(f"{workspace_wasbs_base_url}/basketball_submissions_sep.parquet")

StatementMeta(d3c1b951-094c-48e1-919d-e32bc175397b, 11, 7, Finished, Available)

#### Loading Intermediate Data

In [None]:
#| output: false

workspace_default_storage_account = "aml6214727930"
workspace_default_container = "azureml-blobstore-6653633b-3460-4381-9199-d9e0f368353c"

workspace_wasbs_base_url = (
    f"wasbs://{workspace_default_container}@{workspace_default_storage_account}.blob.core.windows.net/"
)

comments_path = "/basketball_comments_sep.parquet"
submissions_path = "/basketball_submissions_sep.parquet"

comments = spark.read.parquet(f"{workspace_wasbs_base_url}{comments_path}")
submissions = spark.read.parquet(f"{workspace_wasbs_base_url}{submissions_path}")