In [None]:
# Milestone 6
# pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.window import Window # used in milestone 7
# URL processing
import urllib

In [None]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [None]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
# For pin data
# AWS S3 bucket name
# NOTE code snippet below has been hashed out because it has already been run, therefore it will return an error.
"""
AWS_S3_BUCKET = "user-0eb84f80c29b-bucket/topics/0eb84f80c29b.pin/partition=0/"
# Mount name for the bucket
MOUNT_NAME = "/mnt/df_pin"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
print(SOURCE_URL)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)
"""

In [None]:
# For geo data
# AWS S3 bucket name
# NOTE code snippet below has been hashed out because it has already been run, therefore it will return an error.
"""
AWS_S3_BUCKET = "user-0eb84f80c29b-bucket/topics/0eb84f80c29b.geo/partition=0/"
# Mount name for the bucket
MOUNT_NAME = "/mnt/df_geo"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
print(SOURCE_URL)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)
"""

In [None]:
# For user data
# AWS S3 bucket name
# NOTE code snippet below has been hashed out because it has already been run, therefore it will return an error.
"""
AWS_S3_BUCKET = "user-0eb84f80c29b-bucket/topics/0eb84f80c29b.user/partition=0/"
# Mount name for the bucket
MOUNT_NAME = "/mnt/user-0eb84f80c29b-bucket"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
print(SOURCE_URL)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)
"""

In [None]:
# list the topics stored on the mounted S3 bucket
display(dbutils.fs.ls("/mnt/df_pin"))

path,name,size,modificationTime
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000000.json,0eb84f80c29b.pin+0+0000000000.json,444,1701340250000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000001.json,0eb84f80c29b.pin+0+0000000001.json,681,1701340252000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000002.json,0eb84f80c29b.pin+0+0000000002.json,714,1701340254000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000003.json,0eb84f80c29b.pin+0+0000000003.json,444,1701342665000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000004.json,0eb84f80c29b.pin+0+0000000004.json,681,1701342667000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000005.json,0eb84f80c29b.pin+0+0000000005.json,444,1701342728000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000006.json,0eb84f80c29b.pin+0+0000000006.json,444,1701342769000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000007.json,0eb84f80c29b.pin+0+0000000007.json,681,1701342770000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000008.json,0eb84f80c29b.pin+0+0000000008.json,714,1701342772000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000009.json,0eb84f80c29b.pin+0+0000000009.json,640,1701342774000


In [None]:
# list the topics stored on the mounted S3 bucket
display(dbutils.fs.ls("/mnt/df_geo"))

path,name,size,modificationTime
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000034.json,0aedb74d3ba3.geo+0+0000000034.json,108,1701348667000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000035.json,0aedb74d3ba3.geo+0+0000000035.json,108,1701348670000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000036.json,0aedb74d3ba3.geo+0+0000000036.json,108,1701348673000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000037.json,0aedb74d3ba3.geo+0+0000000037.json,113,1701348677000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000038.json,0aedb74d3ba3.geo+0+0000000038.json,105,1701348679000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000039.json,0aedb74d3ba3.geo+0+0000000039.json,107,1701348681000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000040.json,0aedb74d3ba3.geo+0+0000000040.json,113,1701348684000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000041.json,0aedb74d3ba3.geo+0+0000000041.json,125,1701348687000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000042.json,0aedb74d3ba3.geo+0+0000000042.json,109,1701348689000
dbfs:/mnt/df_geo/0aedb74d3ba3.geo+0+0000000043.json,0aedb74d3ba3.geo+0+0000000043.json,111,1701348691000


In [None]:
# list the topics stored on the mounted S3 bucket
display(dbutils.fs.ls("/mnt/df_pin"))

path,name,size,modificationTime
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000000.json,0eb84f80c29b.pin+0+0000000000.json,444,1701340250000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000001.json,0eb84f80c29b.pin+0+0000000001.json,681,1701340252000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000002.json,0eb84f80c29b.pin+0+0000000002.json,714,1701340254000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000003.json,0eb84f80c29b.pin+0+0000000003.json,444,1701342665000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000004.json,0eb84f80c29b.pin+0+0000000004.json,681,1701342667000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000005.json,0eb84f80c29b.pin+0+0000000005.json,444,1701342728000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000006.json,0eb84f80c29b.pin+0+0000000006.json,444,1701342769000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000007.json,0eb84f80c29b.pin+0+0000000007.json,681,1701342770000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000008.json,0eb84f80c29b.pin+0+0000000008.json,714,1701342772000
dbfs:/mnt/df_pin/0eb84f80c29b.pin+0+0000000009.json,0eb84f80c29b.pin+0+0000000009.json,640,1701342774000


In [None]:
display(dbutils.fs.ls("/mnt/df_user/../.."))

path,name,size,modificationTime
dbfs:/FileStore/,FileStore/,0,1701534864800
dbfs:/Volume/,Volume/,0,0
dbfs:/Volumes/,Volumes/,0,0
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/delta/,delta/,0,1701534864800
dbfs:/df_pin.csv/,df_pin.csv/,0,1701534864800
dbfs:/geo_dirty.csv/,geo_dirty.csv/,0,1701534864800
dbfs:/local_disk0/,local_disk0/,0,1701534864800
dbfs:/mnt/,mnt/,0,1701534864800


In [None]:
# File location and type
# Asterisk(*) indicates reading all the content of the specified file that have .json extension
file_location = "/mnt/df_user/*.json" 
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
display(df)

age,date_joined,first_name,ind,last_name
27,2016-03-08T13:38:37,Christopher,2015,Bradshaw
39,2016-06-29T20:43:59,Christina,6398,Davenport
20,2015-10-23T04:13:23,Alexandria,3599,Alvarado
20,2015-12-01T15:08:31,Christopher,5076,Butler
39,2017-07-19T07:12:04,Michelle,7790,Gutierrez
49,2016-04-22T20:36:02,Brittany,10509,Thompson
43,2016-07-21T15:25:08,Chelsea,10119,Gonzalez
21,2015-11-10T09:27:42,Andrea,8731,Alexander
24,2016-03-31T20:56:39,Austin,8887,Rodriguez
23,2015-12-01T18:15:02,Christine,7768,Cortez


In [None]:
# list of topic suffixes
topics = ["pin", "geo", "user"]

def read_topics_into_dataframe(topic):
    # create path to topic files
    file_location = "/mnt/df_"+topic+"/*.json"
    # specify file type
    file_type = "json"
    # Ask Spark to infer the schema
    infer_schema = "true"
    # load JSONs from mounted S3 bucket to Spark dataframe
    df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .load(file_location)
    #print(df.head(5))
    return df

for topic in topics:
    if topic == 'pin':
        df_pin = read_topics_into_dataframe(topic)
    elif topic == 'geo':
        df_geo = read_topics_into_dataframe(topic)
    else:
        df_user = read_topics_into_dataframe(topic)


In [None]:
print(df_pin.head(5))

In [None]:
def convert_null_values(df, column, value_to_replace):
    df = df.withColumn(column, when(col(column).like(value_to_replace), None).otherwise(col(column)))
    return df


In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 1 : Clean the Dataframe that contains information about Pinterest posts
# column names to convert
columns_and_values_to_convert_null = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}
# Replace empty entries and entries with no relevant data in each column with Nones
# Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int.
for key, value in columns_and_values_to_convert_null.items():
    df_pin = convert_null_values(df_pin, key, value)
# Perform the necessary transformations on the follower_count to ensure every entry is a number
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
# Ensure that each column containing numeric data has a numeric data type
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast('int'))
# Clean the data in the save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
# Rename the index column to ind.
df_pin = df_pin.withColumnRenamed("index", "ind")
# Reorder the DataFrame columns
new_pin_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_pin_column_order)
print(type(df_pin))
print(df_pin.head(5))

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 2 : Clean the Dataframe that contains information about geolocation

# Create a new column coordinates that contains an array based on the latitude and longitude columns
def combine_lat_and_long(latitude, longitude):
    return [latitude, longitude]
new_col_func = udf(combine_lat_and_long, ArrayType(DoubleType()))
df_geo = df_geo.withColumn("coordinates", new_col_func("latitude", "longitude"))
#Drop the latitude and longitude columns from the DataFrame
drop_cols = ("latitude", "longitude")
df_geo = df_geo.drop(*drop_cols)
# Convert the timestamp column from a string to a timestamp data type
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))
# Reorder the DataFrame columns to have the following column order:
new_geo_col_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp",
]
df_geo = df_geo.select(new_geo_col_order)
print(df_geo.head(5))

In [None]:
print(df_geo.head(5))

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 3 : Clean the Dataframe that contains information about users
# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("user_name", concat_ws(" ", "first_name", "last_name"))
# Drop the first_name and last_name columns from the DataFrame
drop_cols = ("first_name", "last_name")
df_user = df_user.drop(*drop_cols)
# Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))
# Reorder the DataFrame columns 
new_user_column_order = [
    "ind",
    "user_name",
    "age",
    "date_joined",
]
df_user = df_user.select(new_user_column_order)

In [None]:
print(df_pin.printSchema())
print(df_user.printSchema())
print(df_geo.printSchema())

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 4 : Find the most popular category in each country
"""Find the most popular Pinterest category people post to based on their country. 

Your query should return a DataFrame that contains the following columns:
country
category
category_count, a new column containing the desired query output
"""
# to get some metrics from the data first we will join
# the tables in a similar way (using sql) to the MRDC # project.
# join df_pin and df_geo dataframes on index
pin_geo = df_pin.join(df_geo, df_pin.ind == df_geo.ind)
# join df_pin and df_user and create temp view for SQL query
df_pin.join(df_user, df_pin.ind == df_user.ind)
# creating new dataframe
windowTask4 = Window.partitionBy("country").orderBy(col("category_count").desc())
pin_geo.groupBy("country", "category") \
.agg(count("category") \
.alias("category_count")) \
.withColumn("rank", row_number().over(windowTask4)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 5 : Find which was the most popular category in each country year
"""
Find how many posts each category had between 2018 and 2022. 

Your query should return a DataFrame that contains the following columns:
post_year, a new column that contains only the year from the timestamp column
category
category_count, a new column containing the desired query output
"""
# creating new dataframe
windowTask5 = Window.partitionBy("post_year").orderBy(col("category_count").desc())
pin_geo.withColumn("post_year", year("timestamp")) \
.filter(col("post_year") >= 2018) \
.filter(col("post_year") <= 2022) \
.groupBy("post_year", "category") \
.agg(count("category").alias("category_count")) \
.withColumn("rank", row_number().over(windowTask5)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 6 : find the user with the most followers in each country
"""
Step 1: For each country find the user with the most followers. 

Your query should return a DataFrame that contains the following columns:
country
poster_name
follower_count
Step 2: Based on the above query, find the country with the user with most followers. 

Your query should return a DataFrame that contains the following columns:
country
follower_count
This DataFrame should have only one entry.
"""
# creating new dataframe
#print(df_pin.printSchema())
windowTask6 = Window.partitionBy("country").orderBy(col("follower_count").desc())
# step 1 we need to get the follow_count from the df_pin dataframe. We can achieve this by doing a join.
# We then use the rank function to rank the follower_count using the partition over the country
user_with_most_follower_count = \
    df_pin.join(df_geo, df_pin.ind == df_geo.ind) \
    .withColumn("rank", row_number().over(windowTask6)) \
    .filter(col("rank") == 1) \
    .select("country", "poster_name", "follower_count")
user_with_most_follower_count.show()
# great that looks good. We now know the highest for each country. Now we want to know the max of these top follower_count per each country.
max_follower_count = user_with_most_follower_count.select(max("follower_count")).collect()[0][0]
print(max_follower_count)

# now we know the max follower_count we can return the columns required, as stated in the question
final_table = \
    user_with_most_follower_count \
    .select("*") \
    .where(col("follower_count") == max_follower_count)
final_table.show()


In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 7 : find the most popular category for different age groups
"""What is the most popular category people post to based on the following age groups:
18-24
25-35
36-50
+50
Your query should return a DataFrame that contains the following columns:
age_group, a new column based on the original age column
category
category_count, a new column containing the desired query output
"""
# df_pin has follower_count information. df_user has age information, therefore we need to join these tables and create a new col category_age
df_pin.join(df_user, df_pin.ind == df_user.ind).createOrReplaceTempView("category_age")
df_age_group = spark.sql(
    "SELECT CASE \
        WHEN age between 18 and 24 then '18-24' \
        WHEN age between 25 and 35 then '25-35' \
        WHEN age between 36 and 50 then '36-50' \
        WHEN age > 50 then '50+' \
        END as age_group, * FROM category_age")
# Now that we have categorised the age groups. We need to count the frequency for each category with respect to the age groups. We then use the rank operation to get the highest category for each age_group.
# We only want the three columns; age_group, category, category_count so we will drop the rank column.
windowTask7 = Window.partitionBy("age_group").orderBy(col("category_count").desc())
new_age_group = \
    df_age_group.groupBy("age_group", "category") \
        .agg(count("category").alias("category_count")) \
        .withColumn("rank", row_number().over(windowTask7)) \
        .filter(col("rank") == 1) \
        .drop("rank") \
        .show()

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 8 : find the median follower count for the different age groups
"""
What is the median follower count for users in the following age groups:
18-24
25-35
36-50
+50
Your query should return a DataFrame that contains the following columns:
age_group, a new column based on the original age column
median_follower_count, a new column containing the desired query output
"""
# We can use the df_age_group from task 7. Percentile_approx is a method that enables us to calculate the quartiles, we use 0.5 for the median.
windowTask8 = Window.partitionBy("age_group").orderBy(col("median_follower_count").desc())
new_age_group = \
    df_age_group \
        .select("age_group", "follower_count") \
        .groupBy("age_group") \
        .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
        .orderBy("age_group") \
        .show()

In [None]:
df_user.printSchema()
df_pin.printSchema()

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 9 : find how many users have joined each year
"""
Find how many users have joined between 2015 and 2020. 

Your query should return a DataFrame that contains the following columns:
post_year, a new column that contains only the year from the timestamp column
number_users_joined, a new column containing the desired query output
"""
# first need to answer the question when did the user join - this is provided by the date_joined column in the user dataframe. Then need to group by date_joined and count how many users occur for that year.

df_user.groupBy(year("date_joined").alias("post_year")) \
    .agg(count("user_name").alias("number_users_joined")) \
    .show()

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 10 : find the median follower count of users based on their joining year
"""Find the median follower count of users have joined between 2015 and 2020. 

Your query should return a DataFrame that contains the following columns:
post_year, a new column that contains only the year from the timestamp column
median_follower_count, a new column containing the desired query output
"""
# df_user dataframe doesn't have follower_count so need to join df_pin with df_user to get this column, now that we have the follower_count we can group by post_year and find median value.
df_new = df_pin.join(df_user, df_pin.ind == df_user.ind)
df_new.select(year("date_joined").alias("post_year"), "follower_count") \
    .groupBy("post_year") \
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
    .orderBy("post_year") \
    .show()

In [None]:
# Milestone 7 Batch Processing: Spark on Databricks
# Task 11 : find  the median follower count of users based on their joining year and age group
"""Find the median follower count of users that have joined between 2015 and 2020, based on which age group they are part of. 

Your query should return a DataFrame that contains the following columns:
age_group, a new column based on the original age column
post_year, a new column that contains only the year from the timestamp column
median_follower_count, a new column containing the desired query output"""
# In a similar way to previous examples we can group users by age group.
# we can then group by post_year and age_group and get the median
df_age_group = spark.sql(
    "SELECT CASE \
        WHEN age between 18 and 24 then '18-24' \
        WHEN age between 25 and 35 then '25-35' \
        WHEN age between 36 and 50 then '36-50' \
        WHEN age > 50 then '50+' \
        END as age_group, * FROM category_age")
df_age_group \
    .select("age_group", year("date_joined").alias("post_year"),  "follower_count") \
    .groupBy("post_year", "age_group") \
    .agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
    .orderBy("age_group") \
    .show()