Each markdown block indicates a new section which is a prerequisite for the next.

First we check the file system for the access keys, then load the keys and mount the S3 bucket.


In [0]:
print(spark.conf.get("spark.databricks.clusterUsageTags.clusterId")) # we'll need this to configure the DAG in AWS

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# check we can access where the access keys are stored
dbutils.fs.ls(delta_table_path)

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# AWS S3 bucket name
AWS_S3_BUCKET = "user-0a1153066525-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/incoming"
# MOUNT_NAME = "/mnt/dan_bucket"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)

In [0]:
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

# test it mounted
# display(dbutils.fs.ls(MOUNT_NAME))
display(dbutils.fs.ls(MOUNT_NAME + "/../.."))

Now we load the files from the 3 streams each into a Spark data frame.

In [0]:
# File location and type
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"

# Asterisk(*) indicates reading all the content of the specified file that have .json extension
file_location = MOUNT_NAME + "/topics/0a1153066525.pin/partition=0/*.json" 
# Read in JSONs from mounted S3 bucket
df_pin = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
# display works best for many columns as it presents a scrollbar and also the type of each column is indicated
display(df_pin.head(10))


In [0]:
# Asterisk(*) indicates reading all the content of the specified file that have .json extension
file_location = MOUNT_NAME + "/topics/0a1153066525.geo/partition=0/*.json" 
# Read in JSONs from mounted S3 bucket
df_geo = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
df_geo.show(10)


In [0]:
# Asterisk(*) indicates reading all the content of the specified file that have .json extension
file_location = MOUNT_NAME + "/topics/0a1153066525.user/partition=0/*.json" 
# Read in JSONs from mounted S3 bucket
df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
df_user.show(10)


Clean the data in each data frame as per specification. Note that these cells transform the data so it may be required to rerun the dataframe loading if you wish to rerun a cleanup transformation cell. 
Cleaning functions have been moved to a Github repository for version control and sharing with other data connections such as in the Kinesis assignment.

In [0]:
import PinterestTransformations

# clean the df_pin DataFrame
df_pin = PinterestTransformations.clean_pin(df_pin)
display(df_pin.head(10))


In [0]:
# clean the df_geo DataFrame
df_geo = PinterestTransformations.clean_geo(df_geo)
df_geo.show(10)


In [0]:
# clean the df_user DataFrame
df_user = PinterestTransformations.clean_geo(df_user)
df_user.show(10)


Below are report requests. The begining of each code block will have the reporting requirements stated.

In [0]:
# Find the most popular Pinterest category people post to based on their country.
# return a DataFrame that contains the following columns:
#     country
#     category
#     category_count, a new column containing the desired query output

# join pin and geo tables so we have country and category together
combined_geo_df = df_pin.join(df_geo, df_geo["ind"] == df_pin["ind"], how="inner")
# display(combined_geo_df)
# count category use per country
grouped_df = combined_geo_df.groupBy("country","category").agg(count("category")).withColumnRenamed("count(category)", "category_count")
# display(grouped_df)
# get the maximum value for each country
max_df = grouped_df.groupBy("country").agg(max("category_count")).withColumnRenamed("country", "max_country")
# display(max_df)
# filter to only the maximum category per country
most_popular_category_per_country_df = max_df.join(grouped_df, ((grouped_df["country"] == max_df["max_country"]) & (grouped_df["category_count"] == max_df["max(category_count)"])), how="inner")
# display(most_popular_category_per_country)
# Selecting only the relevant columns
most_popular_category_per_country_df = most_popular_category_per_country_df.select("country", "category", "category_count")
display(most_popular_category_per_country_df.orderBy("country"))


In [0]:
# Find how many posts each category had between 2018 and 2022.
# return a DataFrame that contains the following columns:
#     post_year, a new column that contains only the year from the timestamp column
#     category
#     category_count, a new column containing the desired query output

# combined_geo_df already has timestamp and category together so we'll use it again here
# create post_year column which will be of type in
grouped_df = combined_geo_df.withColumn('post_year', year(combined_geo_df["timestamp"]))
# filter out requested years and count categories per country/year after filter
grouped_df = grouped_df.where("post_year >= 2018 and post_year <= 2022").groupBy("post_year","category").agg(count("category")).withColumnRenamed("count(category)", "category_count")
display(grouped_df.orderBy("post_year", "category"))


In [0]:
# Step 1: For each country find the user with the most followers.
# return a DataFrame that contains the following columns:
#     country
#     poster_name
#     follower_count

# combined_geo_df already has country and poster_name together so we'll use it again here
df_pin_sum_followers = combined_geo_df.groupBy("country", "poster_name").agg(sum("follower_count")).withColumnRenamed("sum(follower_count)", "follower_count")
#display(df_pin_sum_followers)
df_pin_max_followers = df_pin_sum_followers.groupBy("country").agg(max("follower_count")).withColumnRenamed("country", "max_country")
#display(df_pin_max_followers)
# filter to only the most follower user per country
most_followed_per_country_df = df_pin_max_followers.join(df_pin_sum_followers, ((df_pin_sum_followers["country"] == df_pin_max_followers["max_country"]) & (df_pin_sum_followers["follower_count"] == df_pin_max_followers["max(follower_count)"])), how="inner")
#display(most_followed_per_country_df)
# filter to relevant columns
most_followed_per_country_df = most_followed_per_country_df.select("country", "poster_name", "follower_count")
# display sorted by country - confirm one poster_name per country
display(most_followed_per_country_df.orderBy("country"))

# Step 2: Based on the above query, find the country with the user with most followers.
# return a DataFrame that contains the following columns:
#     country
#     follower_count
# This DataFrame should have only one entry.
country_with_user_with_most_followers_df = most_followed_per_country_df.select("country", "follower_count").orderBy("follower_count").tail(1)
display(country_with_user_with_most_followers_df)


In [0]:
# find the most popular category people post to based on the following age groups:
#     18-24
#     25-35
#     36-50
#     +50
# return a DataFrame that contains the following columns:
#     age_group, a new column based on the original age column
#     category
#     category_count, a new column containing the desired query output

# join user with age field to piin with category field
combined_user_df = df_pin.join(df_user, df_user["ind"] == df_pin["ind"], how="inner")
# create age_group - note this will be used by other report requests in cells below
combined_user_df = combined_user_df.withColumn("age_group",when(combined_user_df.age > 50, '+50').when(combined_user_df.age > 35, '36-50').when(combined_user_df.age > 24, '25-35').when(combined_user_df.age > 17, '18-24').otherwise('other'))

# back to specifics to this reporting request
user_category_count_df = combined_user_df.groupBy("age_group","category").agg(count("category")).withColumnRenamed("count(category)", "category_count")
# display(user_category_count_df)

# get the maximum category count per age group
user_category_max_df = user_category_count_df.groupBy("age_group").agg(max("category_count")).withColumnRenamed("age_group","max_age_group")
# display(user_category_max_df)
# filter to keep only the maximum category count per age group
user_category_max_df = user_category_count_df.join(user_category_max_df, (user_category_max_df['max_age_group'] == user_category_count_df['age_group']) & (user_category_max_df['max(category_count)'] == user_category_count_df['category_count']), how="inner")
# display(user_category_max_df)
# reduce to the columns as per specification
user_category_max_df = user_category_max_df.select("age_group", "category", "category_count")
display(user_category_max_df)


In [0]:
# Find the median follower count for users in the following age groups:
#     18-24
#     25-35
#     36-50
#     +50
# return a DataFrame that contains the following columns:
#     age_group, a new column based on the original age column
#     median_follower_count, a new column containing the desired query output

# we have our age group in our combined_user_df, so we'll use that here
# Databricks is using an older version of Spark (3.2.1). With version 3.4.0 and later, the following should work and avoid the "NameError: name 'median' is not defined" error
# user_median_follower_count = combined_user_df.groupBy("age_group","follower_count").agg(median("follower_count")).withColumnRenamed("median(follower_count)", "median_follower_count")
# Originally tried counting the number of users per age_group and then selecting the middle index; however, using the window function appears to run faster
from pyspark.sql import Window
median_window = Window.partitionBy("age_group")
first_window = median_window.orderBy("follower_count")                                  # first, order by column we want to compute the median for
user_median_follower_count = combined_user_df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 coressponds to median
second_window = median_window.orderBy(pow(user_median_follower_count.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending
user_median_follower_count = user_median_follower_count.withColumn("median_follower_count", first("follower_count").over(second_window))     # the first row of the window corresponds to median
# display(user_median_follower_count)
user_median_follower_count = user_median_follower_count.select("age_group","median_follower_count").distinct()
display(user_median_follower_count)


In [0]:
# Find how many users have joined between 2015 and 2020.
# return a DataFrame that contains the following columns:
#     post_year, a new column that contains only the year from the timestamp column
#     number_users_joined, a new column containing the desired query output
# note that users could appear in multiple years since they may have joined and posted in one year and continued to post in other years

# reduce to only the rows we want first
df_user_2015to2020 = df_user.withColumn('joined_year', year(df_user["date_joined"])).where("joined_year >= 2015 and joined_year <= 2020").withColumnRenamed("ind", "ind_user")
# join to the other tables - note this will be used by other reporting requests in the cells below
users_joined_by_year_2015to2020_df = df_geo.join(df_user_2015to2020, df_user_2015to2020["ind_user"] == df_geo["ind"], how="inner") # for timestamp

# back to specifics to this reporting request
# create the post_year column
users_joined_by_year_2015to2020_df = users_joined_by_year_2015to2020_df.withColumn('post_year', year(users_joined_by_year_2015to2020_df["timestamp"]))
# reduce to the columns we want which make a unique user and keep the distinct list
users_joined_by_year_2015to2020_df_summary = users_joined_by_year_2015to2020_df.select("post_year","age","date_joined","user_name").distinct()
# display(users_joined_by_year_2015to2020_df_summary)
users_joined_by_year_2015to2020_df_summary = users_joined_by_year_2015to2020_df_summary.groupBy("post_year").agg(count("date_joined")).withColumnRenamed("count(date_joined)", "number_users_joined")
display(users_joined_by_year_2015to2020_df_summary.orderBy("post_year"))


In [0]:
# Find the median follower count of users have joined between 2015 and 2020.
# return a DataFrame that contains the following columns:
#     post_year, a new column that contains only the year from the timestamp column
#     median_follower_count, a new column containing the desired query output

# Databricks is using an older version of Spark (3.2.1). With version 3.4.0 and later, the median function can be used. The current version of Spark is 3.5.

# we already have the data filtered, tables joined and post_year in users_joined_by_year_2015to2020_df so we'll re-use that
records_combined_year_2015to2020_df = df_pin.join(users_joined_by_year_2015to2020_df, users_joined_by_year_2015to2020_df["ind_user"] == df_pin["ind"], how="inner") # for follower_count

median_window = Window.partitionBy("post_year")
first_window = median_window.orderBy("follower_count")                                  # first, order by column we want to compute the median for
median_user_follow_count_by_year_2015to2020_df = records_combined_year_2015to2020_df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 coressponds to median
second_window = median_window.orderBy(pow(median_user_follow_count_by_year_2015to2020_df.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending
median_user_follow_count_by_year_2015to2020_df = median_user_follow_count_by_year_2015to2020_df.withColumn("median_follower_count", first("follower_count").over(second_window))     # the first row of the window corresponds to median
# display(median_user_follow_count_by_year_2015to2020_df)
median_user_follow_count_by_year_2015to2020_df = median_user_follow_count_by_year_2015to2020_df.select("post_year","median_follower_count").distinct()
display(median_user_follow_count_by_year_2015to2020_df.orderBy("post_year"))



In [0]:
# Find the median follower count of users that have joined between 2015 and 2020, based on which age group they are part of.
# return a DataFrame that contains the following columns:
#     age_group, a new column based on the original age column
#     post_year, a new column that contains only the year from the timestamp column
#     median_follower_count, a new column containing the desired query output

# Databricks is using an older version of Spark (3.2.1). With version 3.4.0 and later, the median function can be used. The current version of Spark is 3.5.

# we already have the data filtered, tables joined and post_year in records_combined_year_2015to2020_df so we'll re-use that and just add the age_group to it
records_combined_age_group_2015to2020_df = records_combined_year_2015to2020_df.withColumn("age_group",when(records_combined_year_2015to2020_df.age > 50, '+50').when(records_combined_year_2015to2020_df.age > 35, '36-50').when(records_combined_year_2015to2020_df.age > 24, '25-35').when(records_combined_year_2015to2020_df.age > 17, '18-24').otherwise('other'))

median_window = Window.partitionBy("post_year","age_group")
first_window = median_window.orderBy("follower_count")                                  # first, order by column we want to compute the median for
median_user_follow_count_by_year_2015to2020_df = records_combined_age_group_2015to2020_df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 coressponds to median
second_window = median_window.orderBy(pow(median_user_follow_count_by_year_2015to2020_df.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending
median_user_follow_count_by_year_2015to2020_df = median_user_follow_count_by_year_2015to2020_df.withColumn("median_follower_count", first("follower_count").over(second_window))     # the first row of the window corresponds to median
# display(median_user_follow_count_by_year_2015to2020_df)
median_user_follow_count_by_year_2015to2020_df = median_user_follow_count_by_year_2015to2020_df.select("post_year", "age_group","median_follower_count").distinct()
display(median_user_follow_count_by_year_2015to2020_df.orderBy("post_year", "age_group"))


Unmount the drive if we are finished with it. While working with the notebook and running selective cells, the filesystem should remain mounted.

In [0]:
dbutils.fs.unmount(MOUNT_NAME)