# Mount S3 bucket to Databricks

The S3 bucket containing the data for the Pinterest posts, users and geolocation will first have to be mounted to Databricks.

Once mounted, the data within each topic will then be read into Spark dataframes so that we can clean and query this data.

In [None]:
dbutils.fs.ls("/FileStore/tables")

In [None]:
# The following libraries are required:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

In [None]:
# To read the authentication_credentials.csv files:
# Specifying the file type to be csv:
file_type = "csv"
# Indicating the file's first row is the header:
first_row_is_header = 'true'
# Indicating the delimiter is a comma:
delimiter = ","
# Reading the csv file to a Spark dataframe:
aws_keys_df = spark.read.format(file_type)\
    .option("header", first_row_is_header)\
    .option("sep", delimiter)\
    .load("/FileStore/tables/authentication_credentials.csv")

In [None]:
# Extracting the AWS access key and secret access key from the Spark dataframe created above:
ACCESS_KEY = aws_keys_df.where(col("User name")=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encoding the secret key for security purposes:
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [None]:
# Mounting the AWS S3 to Databricks:
# AWS S3 bucket name
AWS_S3_BUCKET = "user-0e1f6d6285c1-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/pinterest_project1"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mounting the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

In [None]:
# Checking if the S3 bucket was mounted succesfully:
display(dbutils.fs.ls("/mnt/pinterest_project1/"))

path,name,size,modificationTime
dbfs:/mnt/pinterest_project/kafka-connect-s3/,kafka-connect-s3/,0,1696529048803
dbfs:/mnt/pinterest_project/topics/,topics/,0,1696529048803


In [None]:
# The data in the S3 bucket is in JSON format, now to read this into Databricks for pinterest posts data:
# Specifying file location and type:
file_location = "/mnt/pinterest_project1/topics/0e1f6d6285c1.pin/partition=0/*.json" # Asterisk(*) indicates reading all the content of the specified file that have the .json extension
file_type = "json"
# Asking Spark to infer the schema:
infer_schema = "true"
# Reading in JSONs from mounted S3 bucket:
df_pin_0e1f6d6285c1 = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Displaying Speak dataframe to check its content:
display(df_pin_0e1f6d6285c1)

In [None]:
# The data in the S3 bucket is in JSON format, now to read this into Databricks for geolocation data:
# Specifying file location and type:
file_location = "/mnt/pinterest_project1/topics/0e1f6d6285c1.geo/partition=0/*.json" # Asterisk(*) indicates reading all the content of the specified file that have the .json extension
file_type = "json"
# Asking Spark to infer the schema:
infer_schema = "true"
# Reading in JSONs from mounted S3 bucket:
df_geo_0e1f6d6285c1 = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Displaying Speak dataframe to check its content:
display(df_geo_0e1f6d6285c1)

country,ind,latitude,longitude,timestamp
British Indian Ocean Territory (Chagos Archipelago),9455,-82.9272,-150.346,2022-03-15T01:46:32
British Indian Ocean Territory (Chagos Archipelago),6814,-86.5675,-149.565,2022-09-02T11:34:28
British Indian Ocean Territory (Chagos Archipelago),9455,-82.9272,-150.346,2022-03-15T01:46:32
British Indian Ocean Territory (Chagos Archipelago),6814,-86.5675,-149.565,2022-09-02T11:34:28
British Indian Ocean Territory (Chagos Archipelago),5111,-83.7472,8.65953,2021-04-01T00:56:57
British Indian Ocean Territory (Chagos Archipelago),5111,-83.7472,8.65953,2021-04-01T00:56:57
British Indian Ocean Territory (Chagos Archipelago),2989,-87.013,133.062,2020-01-09T19:18:54
Antarctica (the territory South of 60 deg S),10073,-32.8885,-170.295,2021-06-29T19:56:04
Antarctica (the territory South of 60 deg S),10073,-32.8885,-170.295,2021-06-29T19:56:04
Antarctica (the territory South of 60 deg S),10073,-32.8885,-170.295,2021-06-29T19:56:04


In [None]:
# The data in the S3 bucket is in JSON format, now to read this into Databricks for user data:
# Specifying file location and type:
file_location = "/mnt/pinterest_project1/topics/0e1f6d6285c1.user/partition=0/*.json" # Asterisk(*) indicates reading all the content of the specified file that have the .json extension
file_type = "json"
# Asking Spark to infer the schema:
infer_schema = "true"
# Reading in JSONs from mounted S3 bucket:
df_user_0e1f6d6285c1 = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Displaying Speak dataframe to check its content:
display(df_user_0e1f6d6285c1)

age,date_joined,first_name,ind,last_name
42,2017-02-18T00:31:22,Christopher,6353,Hernandez
27,2016-03-08T13:38:37,Christopher,2015,Bradshaw
59,2017-05-12T21:22:17,Alexander,10673,Cervantes
48,2016-02-27T16:57:44,Christopher,1857,Hamilton
45,2016-09-15T06:02:53,Christopher,10020,Hawkins
35,2015-10-22T22:42:23,Christopher,2041,Campbell
48,2016-06-13T17:09:14,Christopher,7031,Anderson
27,2016-03-08T13:38:37,Christopher,2015,Bradshaw
59,2017-05-12T21:22:17,Alexander,10673,Cervantes
48,2016-02-27T16:57:44,Christopher,1857,Hamilton


In [None]:
 # Checking to see if data from the S3 has been read correctly:
 dbutils.fs.ls("/mnt/pinterest_project1/topics/")

# Batch data processing: Spark on Databricks

The S3 bucket has been succesfully mounted to Databricks and the JSON data within the 3 topics has been successfully read into 3 Spark dataframes.

We will now clean the data within each of these dataframes using Spark. Spark is a unified engine for large-scale distributed data processing on computer clusters. It will offer:
- Distributed Processing
- Scalability
- In-Memory Processing
- Fault Tolerance
- and many more benefits.

## Cleaning the Pinterest posts data

In [None]:
# Importing pyspark:
import pyspark
# Importing the required pyspark functions:
from pyspark.sql.functions import when, array
# Importing the window:
from pyspark.sql.window import Window

In [None]:
# Renaming the 'index' column to 'ind':
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumnRenamed("index", "ind")

In [None]:
# Reordering the columns in the dataframe (whilst omitting the 'downloaded' column):
# new desired order of columns:
desired_pin_column_order = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"]

df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.select(desired_pin_column_order)

In [None]:
# Whilst performing a simple count on distinct values in the 'follower_count' column there were 38 hits for 'User Info Error'. Upon further investigation, the entire row of data is not useful so to drop these rows in the dataframe:
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.filter(col("follower_count") != "User Info Error")

In [None]:
# Whilst performing a simple count on distinct values in the 'ind' column, results show that there are several identical rows of data, to drop these duplicates:
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.dropDuplicates(["ind"])

In [None]:
# During EDA, the 'follower_count' column has values such as '10k, 500, 100k, 2M' so we need to turn all of these into integers:
# Create a custom user defined function (UDF) to remove the "k" and "M" from the 'follower_count' column and return an integer equivalent:

# First, defining a custom UDF to convert 'follower_count to integers and anything containing "k" in the values to integers:
def convert_follower_count(value):
    if 'M' in value:
        return int(value.strip('M')) * 1000000
    elif 'k' in value:
        return int(value.strip('k')) * 1000
    else:
        return int(value)

# Registering the UDF in Spark with function:
convert_follower_count_udf = udf(convert_follower_count)

# Applying the UDF to the "follower_count" column
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumn("follower_count", convert_follower_count_udf(col("follower_count")))

# Casting the 'follower_count' column to integer data type:
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumn("follower_count", col("follower_count").cast("integer"))

In [None]:
# Casting the 'ind' column to an integer:
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumn("ind", col("ind").cast("integer"))

In [None]:
# The "save_location" should only contain the the path location path so need to removed the prefix to this column:
# Creating a custom UDF to remove the "Local save in " of each row in the column:

def remove_prefix_save_location(value):
        return value.replace('Local save in ', '')
    
# Registering the UDF in Spark with function:
remove_prefix_save_location_udf = udf(remove_prefix_save_location)

# Applying the UDF to the "save_location" column
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumn("save_location", remove_prefix_save_location_udf(col("save_location")))

In [None]:
# Whilst performing EDA, there are several columns that contain values that are empty or contain no relevant data.
# We will now replace all these values with None:

# Replacing the "Image src error." with None in the image_src column:
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumn("image_src", when(col("image_src") == "Image src error.", None).otherwise(col("image_src")))

# Replacing the "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e" with None in the tag_list column:
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumn("tag_list", when(col("tag_list") == "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", None).otherwise(col("tag_list")))

# Replacing the "No description available" & the "No description available Story format" with None from the description column:
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumn("description", when((col("description") == "No description available") | (col("description") == "No description available Story format"), None).otherwise(col("description")))

# Replacing the blanks, "loading..." with None in the 'title' column"
df_pin_0e1f6d6285c1 = df_pin_0e1f6d6285c1.withColumn("title", when((col("title") == "") | (col("title") == "Loading..."), None).otherwise(col("title")))


## Cleaning the geolocation data

In [None]:
# Creating a new column called "coordinates" that contains an array based on the latitute and longitude columns:
df_geo_0e1f6d6285c1 = df_geo_0e1f6d6285c1.withColumn("coordinates", array(col("latitude"), col("longitude")))

In [None]:
# Dropping the latitude and longitude columns and reordering the columns:
desired_geo_columns = ["ind", "country", "coordinates", "timestamp"]

df_geo_0e1f6d6285c1 = df_geo_0e1f6d6285c1.select(desired_geo_columns)

In [None]:
# Casting the 'timestamp' column from a string to a timestamp data type:
df_geo_0e1f6d6285c1 = df_geo_0e1f6d6285c1.withColumn("timestamp", col("timestamp").cast("timestamp"))

In [None]:
# Casting the 'ind' column from long to an integer data type:
df_geo_0e1f6d6285c1 = df_geo_0e1f6d6285c1.withColumn("ind", col("ind").cast("integer"))

In [None]:
# Whilst performing a simple count on distinct values in the 'ind' column, results show that there are several identical rows of data, to drop these duplicates:
df_geo_0e1f6d6285c1 = df_geo_0e1f6d6285c1.dropDuplicates(["ind"])


## Cleaning the user data

In [None]:
# Creating a new column 'user_name' which concatenates the information found in the 'first_name' and 'last_name' columns:
df_user_0e1f6d6285c1 = df_user_0e1f6d6285c1.withColumn("user_name", concat(col("first_name"), lit(" "), col("last_name")))

In [None]:
# Casting the 'ind' column from long to an integer data type:
df_user_0e1f6d6285c1 = df_user_0e1f6d6285c1.withColumn("ind", col("ind").cast("integer"))

In [None]:
# Casting the 'age' column from long to an integer data type:
df_user_0e1f6d6285c1 = df_user_0e1f6d6285c1.withColumn("age", col("age").cast("integer"))

In [None]:
# Casting the 'date_joined' column from a string to a timestamp data type:
df_user_0e1f6d6285c1 = df_user_0e1f6d6285c1.withColumn("date_joined", col("date_joined").cast("timestamp"))

In [None]:
# Dropping the first_name and last_name columns and reordering the columns:
desired_user_columns = ["ind", "user_name", "age", "date_joined"]

df_user_0e1f6d6285c1 = df_user_0e1f6d6285c1.select(desired_user_columns)

In [None]:
# Whilst performing a simple count on distinct values in the 'ind' column, results show that there are several identical rows of data, to drop these duplicates:
df_user_0e1f6d6285c1 = df_user_0e1f6d6285c1.dropDuplicates(["ind"])


## Analysis on the created dataframes

In [None]:
# Finding the most popular category in each country:
# Creating a temporary dataframe which joins the geo and pin dataframes on ind:
temp_df1 = df_geo_0e1f6d6285c1.join(df_pin_0e1f6d6285c1, on=['ind'])

# Grouping by 'country', 'category', and count the number of pins in each category within each country
grouped_df1 = temp_df1.groupBy('country', 'category').agg(count('*').alias('category_count'))

# Using a window function to rank the categories within each country
window_spec1 = Window.partitionBy('country').orderBy(desc('category_count'))

# Creating a dataframe where there is a ranking column which ranks over the above specified window:
ranked_df1 = grouped_df1.withColumn('rank', rank().over(window_spec1))

# Filtering for rows where rank is 1 (top category)
max_categories_df = ranked_df1.filter(col('rank') == 1)

# Selecting the relevant columns for the final result
top_categories_per_country_df = max_categories_df.select('country', 'category', 'category_count')

# Shownig the result
top_categories_per_country_df.display()

country,category,category_count
Afghanistan,education,12
Albania,art,19
Algeria,quotes,27
American Samoa,tattoos,8
Andorra,tattoos,9
Angola,diy-and-crafts,4
Angola,education,4
Anguilla,diy-and-crafts,5
Antarctica (the territory South of 60 deg S),tattoos,4
Antigua and Barbuda,art,4


In [None]:
# Finding the most popular category in each year:

# Creating a temporary geo dataframe which also has a 'post_year' column:
temp_geo_df = df_geo_0e1f6d6285c1.withColumn("post_year", year("timestamp"))

# Filtering for years between 2018 and 2022
temp_geo_df = temp_geo_df.filter((col("post_year") >= 2018) & (col("post_year") <= 2022))

# Creating a temporary dataframe which joins the geo and pin dataframes on ind:
temp_df2 = temp_geo_df.join(df_pin_0e1f6d6285c1, on=['ind'])

# Grouping by 'timestamp' (post_year), 'category', and count the number of pins in each category within each year
grouped_df2 = temp_df2.groupBy('post_year', 'category').agg(count('*').alias('category_count'))

# Using a window function to rank the categories within each year:
window_spec2 = Window.partitionBy('post_year').orderBy(desc('category_count'))

# Creating a dataframe where there is a ranking column which ranks over the above specified window:
ranked_df2 = grouped_df2.withColumn('rank', rank().over(window_spec2))

# Filtering for rows where rank is 1 (top category)
max_year_categories_df = ranked_df2.filter(col('rank') == 1)

# Selecting the relevant columns for the final result
top_categories_per_year_df = max_year_categories_df.select('post_year', 'category', 'category_count')

# Showing the result
top_categories_per_year_df.display()

post_year,category,category_count
2018,christmas,34
2019,art,26
2020,finance,26
2021,education,28
2022,christmas,32


In [None]:
# Finding the user with the most followers in each country:

# Creating a temporary dataframe which joins the geo and pin dataframes on ind:
temp_df3 = df_geo_0e1f6d6285c1.join(df_pin_0e1f6d6285c1, on=['ind'])

# Using a window function to rank the follower_count within each country:
window_spec4 = Window.partitionBy('country').orderBy(desc('follower_count'))

# Creating a dataframe where there is a ranking column which ranks over the above specified window:
ranked_df3 = temp_df3.withColumn('rank', rank().over(window_spec4))

# Filtering for rows where rank is 1 (top follower_count)
top_followers_df = ranked_df3.filter(col('rank') == 1)

# Selecting the relevant columns for the final result
top_followers_per_country_df = top_followers_df.select('country', 'poster_name', 'follower_count')

# Removing the duplicates on the country:
top_followers_per_country_df = top_followers_per_country_df.dropDuplicates(["country"])

# Showing the result
top_followers_per_country_df.display()

country,poster_name,follower_count
Afghanistan,9GAG,3000000
Albania,The Minds Journal,5000000
Algeria,Apartment Therapy,5000000
American Samoa,Mamas Uncut,8000000
Andorra,Teachers Pay Teachers,1000000
Angola,Tastemade,8000000
Anguilla,"Kristen | Lifestyle, Mom Tips & Teacher Stuff Blog",92000
Antarctica (the territory South of 60 deg S),Refinery29,1000000
Antigua and Barbuda,Country Living Magazine,1000000
Argentina,Cheezburger,2000000


In [None]:
# Finding the country with the user with the most followers:

# Ordering the above resulting dataframe by follower_count:
ordered_top_followers_per_country_df = top_followers_per_country_df.orderBy(col('follower_count').desc())

# Selecting only the country and follower_count columns:
ordered_top_followers_per_country_df = ordered_top_followers_per_country_df.select('country', 'follower_count').limit(2)

# Showing result:
ordered_top_followers_per_country_df.display()


country,follower_count
Angola,8000000
American Samoa,8000000


In [None]:
# Finding the most popular category for different age groups:

# Defining the age groups that are required in a new column:
temp_user_df = df_user_0e1f6d6285c1.withColumn(
    "age_group",
    (when((col("age") >= 18) & (col("age") <= 24), "18-24")
     .when((col("age") >= 25) & (col("age") <= 35), "25-35")
     .when((col("age") >= 36) & (col("age") <= 50), "36-50")
     .when(col("age") > 50, "50+")
     .otherwise("unknown age"))
)

# Joining the users DF to the pins DF:
temp_df4 = temp_user_df.join(df_pin_0e1f6d6285c1, on=["ind"])

# Grouping the age_group and category and counting the number of posts per category per age group:
grouped_df4 = temp_df4.groupBy("age_group", "category").agg(count("*").alias("category_count"))

# Using a window function to rank the categories within each age group:
window_spec5 = Window.partitionBy("age_group").orderBy(desc('category_count'))

# Creating a dataframe where there is a ranking column which ranks over the above specified window:
ranked_df5  = grouped_df4.withColumn("rank", rank().over(window_spec5))

# Filtering for rows where the rank is 1:
top_categories_age_group_df = ranked_df5.filter(col("rank") == 1)

# Selecting the relevant columsn for the final output:
top_categories_age_group_df1 = top_categories_age_group_df.select("age_group", "category", "category_count")

# Displaying the result:
top_categories_age_group_df1.display()

age_group,category,category_count
18-24,tattoos,66
25-35,christmas,41
36-50,finance,31
50+,vehicles,15


In [None]:
# Finding the median follower count for different age groups:
# Can use the same temporary df created above temp_df4 as it has created the age group column and joined the users and pins df

# # Defining the age groups that are required in a new column:
# temp_user_df = df_user_0e1f6d6285c1.withColumn(
#     "age_group",
#     (when((col("age") >= 18) & (col("age") <= 24), "18-24")
#      .when((col("age") >= 25) & (col("age") <= 35), "25-35")
#      .when((col("age") >= 36) & (col("age") <= 50), "36-50")
#      .when(col("age") > 50, "50+")
#      .otherwise("unknown age"))
# )

# # Joining the users DF to the pins DF:
# temp_df4 = temp_user_df.join(df_pin_0e1f6d6285c1, on=["ind"])

# Using the 'percentile_approx' function from SQL (using the 'expr') we can do this simply by grouping by the age_group and then finding the median follower count per age group, which is equivalent to the 50th percentile value and then we give the alias median_follower_count:
median_followers_age_group_df = temp_df4.groupBy("age_group").agg(expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count"))

# Showing resulting df:
median_followers_age_group_df.display()

age_group,median_follower_count
50+,1000
36-50,6000
18-24,108000
25-35,24000


In [None]:
# Finding the how many users have joined each year:

# Creating a temporary geo dataframe which also has a 'post_year' column:
temp_user_df1 = df_user_0e1f6d6285c1.withColumn("joined_year", year("date_joined"))

# Grouping by the joined year and doing a count:
num_users_year_df = temp_user_df1.groupBy("joined_year").agg(count("*").alias("number_users_joined"))

# Showing result:
num_users_year_df.display()

joined_year,number_users_joined
2015,532
2016,604
2017,220


In [None]:
# Finding the median follower count of users based on their joining year:

# Joining the temp_users_df1 above to the pins dataframe:
temp_df5 = temp_user_df1.join(df_pin_0e1f6d6285c1, on =["ind"])

# Using the 'percentile_approx' function from SQL (using the 'expr') we can do this simply by grouping by the joined_year and then finding the median follower count per age group, which is equivalent to the 50th percentile value and then we give the alias median_follower_count:
median_followers_joined_date_df = temp_df5.groupBy("joined_year").agg(expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count"))

# Showing resulting df:
median_followers_joined_date_df.display()

joined_year,median_follower_count
2015,128000
2016,19000
2017,2000


In [None]:
# Finding the median follower count of users based on their age group and joining year:

# # Defining the age groups that are required in a new column:
# temp_user_df = df_user_0e1f6d6285c1.withColumn(
#     "age_group",
#     (when((col("age") >= 18) & (col("age") <= 24), "18-24")
#      .when((col("age") >= 25) & (col("age") <= 35), "25-35")
#      .when((col("age") >= 36) & (col("age") <= 50), "36-50")
#      .when(col("age") > 50, "50+")
#      .otherwise("unknown age"))
# )

# Can use the above temp df already created for age groups, now need to create a column for joined year:
temp_user_df2 = temp_user_df.withColumn("joined_year", year("date_joined"))

# Joining the temp user df to the pins dataframe:
temp_df6 = temp_user_df2.join(df_pin_0e1f6d6285c1, on = ["ind"])

# Using the 'percentile_approx' function from SQL (using the 'expr') we can do this simply by grouping by the age_group & joined_year and then finding the median follower count for this grouping, which is equivalent to the 50th percentile value and then we give the alias median_follower_count:
median_followers_age_group_joined_date_df = temp_df6.groupBy("age_group", "joined_year").agg(expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count"))

# Showing result:
median_followers_age_group_joined_date_df.orderBy("joined_year").display()

age_group,joined_year,median_follower_count
25-35,2015,44000
18-24,2015,228000
50+,2015,14000
36-50,2015,13000
50+,2016,908
25-35,2016,22000
36-50,2016,8000
18-24,2016,46000
25-35,2017,2000
50+,2017,1000


# Unmount S3 bucket

Finally, we can unmount the S3 bucket from Databricks. However, this step is not necessary as the mounted S3 bucket will add more data in batches to the S3 bucket and in turn to the Spark dataframes on Databricks.

In [None]:
# # To unmount the S3 bucket from Databricks:
dbutils.fs.unmount("/mnt/pinterest_project1")