In interactive notebook, the `spark` object is already created.
Instructors tested with 1 driver, 6 executors of small e4 (24 cores, 192GB memory)

### Launch spark environment

In [1]:
spark

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 6, Finished, Available)

### Set up data configuration

In [2]:
blob_account_name = "marckvnonprodblob"
blob_container_name = "bigdata"
# read only
blob_sas_token = "?sv=2021-10-04&st=2023-10-04T01%3A42%3A59Z&se=2024-01-02T02%3A42%3A00Z&sr=c&sp=rlf&sig=w3CH9MbCOpwO7DtHlrahc7AlRPxSZZb8MOgS6TaXLzI%3D"

wasbs_base_url = (
    f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/"
)
spark.conf.set(
    f"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net",
    blob_sas_token,
)

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 7, Finished, Available)

In [3]:
comments_path = "reddit-parquet/comments/"
submissions_path = "reddit-parquet/submissions/"

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 8, Finished, Available)

### Reading in all of the Reddit data

In [4]:
# Read in the data
comments_df = spark.read.parquet(f"{wasbs_base_url}{comments_path}")
submissions_df = spark.read.parquet(f"{wasbs_base_url}{submissions_path}")

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 9, Finished, Available)

### Filter the data to only include AskReddit and NoStupidQuestions from 2021 through 2023

In [5]:
# Filter for AskReddit/NSQ and 2021-2023
## Comments
questionsComments_df = comments_df.filter(
    ((comments_df.subreddit == "AskReddit") | (comments_df.subreddit == "NoStupidQuestions")) & ((comments_df.year >= 2021) & (comments_df.year <= 2023))
).cache()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 10, Finished, Available)

In [6]:
## Submissions
questionsSubmissions_df = submissions_df.filter(
    ((submissions_df.subreddit == "AskReddit") | (submissions_df.subreddit == "NoStupidQuestions")) & ((submissions_df.year >= 2021) & (submissions_df.year <= 2023))
).cache()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 11, Finished, Available)

In [7]:
# Print the schema for both
## Comments
questionsComments_df.printSchema()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 12, Finished, Available)

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [8]:
## Submissions
questionsSubmissions_df.printSchema()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 13, Finished, Available)

root
 |-- adserver_click_url: string (nullable = true)
 |-- adserver_imp_pixel: string (nullable = true)
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- brand_safe: boolean (nullable = true)
 |-- contest_mode: boolean (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- crosspost_parent: string (nullable = true)
 |-- crosspost_parent_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- approved_at_utc: string (nullable = true)
 |    |    |-- approved_by: string (nullable = true)
 |    |    |-- archived: boolean (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- author_flair_css_class: string (nullable = true)
 |    |    |-- author_flair_text: string (nullable = true)
 |    |    

In [9]:
# Find the number of comments pulled from each subreddit
questionsComments_df.groupBy("subreddit").count().show()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 14, Cancelled, Waiting)

In [10]:
# Find the number of submissions pulled from each subreddit
questionsSubmissions_df.groupBy("subreddit").count().show()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 15, Cancelled, Waiting)

### Pull in r/TaylorSwift data

In [11]:
# Filter for TaylorSwift from 2021-2023
## Comments
swiftComments_df = comments_df.filter(
    (comments_df.subreddit == "TaylorSwift") & ((comments_df.year >= 2021) & (comments_df.year <= 2023))
).cache()

## Submissions
swiftSubmissions_df = submissions_df.filter(
    (submissions_df.subreddit == "TaylorSwift") & ((submissions_df.year >= 2021) & (submissions_df.year <= 2023))
).cache()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 16, Finished, Available)

### Data Cleaning

In [12]:
# View the first five questions comments
questionsComments_df.show(5)

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 17, Finished, Available)

+------------------+--------------+----------------------+-----------------+--------------------+--------+----------------+-------------------+-------------+------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+---------+------------+----+-----+
|            author|author_cakeday|author_flair_css_class|author_flair_text|                body|can_gild|controversiality|        created_utc|distinguished|edited|gilded|     id|is_submitter|  link_id| parent_id|           permalink|retrieved_on|score|stickied|subreddit|subreddit_id|year|month|
+------------------+--------------+----------------------+-----------------+--------------------+--------+----------------+-------------------+-------------+------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+---------+------------+----+-----+
|         cuminpant|          null|                  null|             null|It's more comfort...|    true|   

In [13]:
# View the first five questions submissions
questionsSubmissions_df.show(5)

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 18, Finished, Available)

+------------------+------------------+--------+------------+--------------+----------------------+-----------------+---------+----------+------------+-------------------+----------------+---------------------+----------------+-------------+--------------+---------------+------+----------+---------+------+------+----------+--------+-------+---------+----------------+----------------------+-------+--------+--------------------+---------------+------+-----+--------------------+-------------+------------+--------------+-------------+-------+-----------------------+--------------------+------+---------+-------+--------+-----------+---------------------+------------+-------------------+-----+------------+--------------------+---------+-------+--------+-----------------+------------+--------------+--------------------+--------------------+----------------------+---------+----------------+---------------+--------------------+--------------------+----------------+----+-----+
|adserver_click_ur

In [14]:
# View the first five TS comments
swiftComments_df.show(5)

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 19, Finished, Available)

+--------------------+--------------+----------------------+--------------------+--------------------+--------+----------------+-------------------+-------------+------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------+------------+----+-----+
|              author|author_cakeday|author_flair_css_class|   author_flair_text|                body|can_gild|controversiality|        created_utc|distinguished|edited|gilded|     id|is_submitter|  link_id| parent_id|           permalink|retrieved_on|score|stickied|  subreddit|subreddit_id|year|month|
+--------------------+--------------+----------------------+--------------------+--------------------+--------+----------------+-------------------+-------------+------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------+------------+----+-----+
| twentyyeardarknight|          null|                  null|                null|State o

In [15]:
# View the first five TS submissions
swiftSubmissions_df.show(5)

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 20, Finished, Available)

+------------------+------------------+--------+-------------+--------------+----------------------+-----------------+---------+----------+------------+-------------------+----------------+---------------------+----------------+-------------+----------------+---------------+-------------+----------+---------+------+------+----------+--------+-------+---------+----------------+----------------------+-------+--------+--------------------+--------------------+------+-----+--------------------+-------------+------------+--------------+-------------+-------+-----------------------+--------------------+------+---------+--------------------+--------+-----------+---------------------+------------+-------------------+-----+------------+--------------------+--------------------+-------+--------+-----------+------------+--------------+--------------------+--------------------+----------------------+--------------------+----------------+---------------+--------------------+--------------------+---

Our data has many rows that are filled with null values - we removed them in order to have cleaner, more useful data.

In [16]:
# Subset the data
questionsComments_small = questionsComments_df.sample(
    fraction = 0.0001, withReplacement=False
)

questionsSubmissions_small = questionsSubmissions_df.sample(
    fraction = 0.0001, withReplacement=False
)

swiftComments_small = swiftComments_df.sample(
    fraction = 0.1, withReplacement=False
)

swiftSubmissions_small = swiftSubmissions_df.sample(
    fraction = 0.1, withReplacement=False
)

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 21, Finished, Available)

In [17]:
# Drop all rows with an NA present
## Comments
questionsComments_small = questionsComments_small.dropna()
swiftComments_small = swiftComments_small.dropna()

## Submissions
questionsSubmissions_small = questionsSubmissions_small.dropna()
swiftSubmissions_small = swiftSubmissions_small.dropna()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 22, Finished, Available)

## Saving data to Azure Storage

Sometimes we may want to save intermediate data, especially if it is big and took a significant time to generate, in DBFS. The following code shows an example of this. We save submissions on the `memes` subreddit into dbfs so that we can read from it at a later stage.


In [22]:
# this code saves the data as a csv just for illustration, use parquet for faster analytics operations.
qCSmall_save = questionsComments_df.limit(10000).cache()
# qSSmall_save = questionsSubmissions_small.cache()
# tsCSmall_save = swiftComments_small.cache()
# tsSSmall_save = swiftSubmissions_small.cache()

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 27, Finished, Available)

In [26]:
# Write the parquet files
qCSmall_save.write.parquet("Users/Mjv38/fall-2023-reddit-project-team-21/data/filtered_parquets/questionsComments1.parquet")
# qSSmall_save.write.parquet("Users/Mjv38/data/filtered_parquets/questionsSubmissions.parquet")
# tsCSmall_save.write.parquet("Users/Mjv38/data/filtered_parquets/swiftComments.parquet")
# tsSSmall_save.write.parquet("Users/Mjv38/data/filtered_parquets/swiftSubmissions.parquet")

StatementMeta(c8407576-5b92-4819-bbab-f6e8aab1d710, 5, 31, Cancelled, Waiting)

### Saving intermediate data

The intermediate outputs go into the azureml workspace attached storage using the URI `azureml://datastores/workspaceblobstore/paths/<PATH-TO_STORE>` this is the same for all workspaces. Then to re-load you use the same URI

In [8]:
datastore = 'azureml://datastores/workspaceblobstore/paths'
comments_single_df.write.parquet(f"{datastore}/memes.parquet")

StatementMeta(cf47b043-0c98-4514-b15a-1e3237e3aed3, 23, 13, Finished, Available)

In [9]:
comments_single_df_load = spark.read.parquet(f"{datastore}/memes.parquet")

StatementMeta(cf47b043-0c98-4514-b15a-1e3237e3aed3, 23, 14, Finished, Available)

In [11]:
print(f'data saved {comments_single_df.count()} - {len(comments_single_df.columns)}')
print(f'data loaded {comments_single_df_load.count()} - {len(comments_single_df_load.columns)}')

StatementMeta(cf47b043-0c98-4514-b15a-1e3237e3aed3, 23, 16, Finished, Available)

data saved 97410 - 21
data loaded 97410 - 21
