In [0]:
from pyspark.sql.functions import *
import urllib

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from functools import reduce

In [0]:
# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# AWS S3 bucket name
AWS_S3_BUCKET = "0afff69adbe3-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/mount_name"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
# dbutils.fs.mount(SOURCE_URL, MOUNT_NAME) 

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

In [0]:
# File location and type
# Asterisk(*) indicates reading all the content of the specified file that have .json extension
file_location = "/mnt/mount_name/topics/topics/1209b9ad90a5.pin/partition=0/*.json" 
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
df_pin = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
display(df_pin)

In [0]:
def clean_df_pin(df_pin):
    """
    Cleans and transforms the pin DataFrame by performing the following steps:
    
    1. Replace empty entries and entries with no relevant data in each column with None.
    2. Convert the `follower_count` column to integers, ensuring every entry is a number.
    3. Ensure numeric data columns have appropriate numeric data types.
    4. Clean the data in the `save_location` column to include only the save location path.
    5. Rename the `index` column to `ind`.
    6. Reorder the DataFrame columns.

    Parameters:
    df_pin (DataFrame): Input DataFrame containing Pinterest data.

    Returns:
    DataFrame: Cleaned and transformed DataFrame.
    """
    
    # Define the conditions for replacing invalid entries with None
    conditions = [
        ("", None),
        ("No description available Story format", None),
        ("User Info Error", None),
        ("Image src error.", None),
        ("N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", None),
        ("No Title Data Available", None)
    ]

    # Function to apply multiple conditions to a column
    def apply_conditions(col, conditions):
        return reduce(lambda acc, cond: F.when(F.col(col) == cond[0], cond[1]).otherwise(acc), conditions, F.col(col))

    # Apply the conditions to each column
    df_pin = df_pin.select([apply_conditions(c, conditions).alias(c) for c in df_pin.columns])

    # Remove non-numeric characters from follower_count and convert to integer
    df_pin = df_pin.withColumn("follower_count", F.regexp_replace(F.col("follower_count"), "[^0-9]", ""))
    df_pin = df_pin.withColumn("follower_count", F.col("follower_count").cast(IntegerType()))

    # Ensure numeric columns have the correct data type
    numeric_columns = ['follower_count', 'downloaded', 'index']
    for col in numeric_columns:
        df_pin = df_pin.withColumn(col, F.col(col).cast(IntegerType()))

    # Clean the save_location column
    df_pin = df_pin.withColumn("save_location", F.regexp_replace(F.col("save_location"), "^Local save in ", ""))

    # Rename the index column to ind
    df_pin = df_pin.withColumnRenamed("index", "ind")

    # Reorder the DataFrame columns
    df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count",
                           "poster_name", "tag_list", "is_image_or_video", "image_src",
                           "save_location", "category")
    
    return df_pin

# Apply the cleaning function to the df_pin DataFrame
df_pin = clean_df_pin(df_pin)

In [0]:
# File location and type
# Asterisk(*) indicates reading all the content of the specified file that have .json extension
file_location = "/mnt/mount_name/topics/topics/1209b9ad90a5.geo/partition=0/*.json" 
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
df_geo = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
display(df_geo)

In [0]:
def clean_df_geo(df_geo):
    """
    Cleans and transforms the geo DataFrame by performing the following steps:
    
    1. Create a new column 'coordinates' that contains an array based on the 'latitude' and 'longitude' columns.
    2. Drop the 'latitude' and 'longitude' columns from the DataFrame.
    3. Convert the 'timestamp' column from a string to a timestamp data type.
    4. Reorder the DataFrame columns to have the specified column order: 'ind', 'country', 'coordinates', 'timestamp'.

    Parameters:
    df_geo (DataFrame): Input DataFrame containing geolocation data.

    Returns:
    DataFrame: Cleaned and transformed DataFrame.
    """
    # Create a new column 'coordinates' that contains an array based on the 'latitude' and 'longitude' columns
    df_geo = df_geo.withColumn("coordinates", F.array("latitude", "longitude"))

    # Drop the 'latitude' and 'longitude' columns from the DataFrame
    df_geo = df_geo.drop("latitude", "longitude")

    # Convert the 'timestamp' column from a string to a timestamp data type
    df_geo = df_geo.withColumn("timestamp", F.col("timestamp").cast("timestamp"))

    # Reorder the DataFrame columns to have the specified column order
    df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")

    return df_geo

# Apply the cleaning function to the df_geo DataFrame
df_geo = clean_df_geo(df_geo)

In [0]:
# File location and type
# Asterisk(*) indicates reading all the content of the specified file that have .json extension
file_location = "/mnt/mount_name/topics/topics/1209b9ad90a5.user/partition=0/*.json" 
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)
# Display Spark dataframe to check its content
display(df_user)

In [0]:
def clean_df_user(df_user):
    """
    Cleans and transforms the user DataFrame by performing the following steps:
    
    1. Create a new column 'user_name' that concatenates 'first_name' and 'last_name'.
    2. Drop the 'first_name' and 'last_name' columns from the DataFrame.
    3. Convert the 'date_joined' column from a string to a timestamp data type.
    4. Reorder the DataFrame columns to have the specified column order: 'ind', 'user_name', 'age', 'date_joined'.

    Parameters:
    df_user (DataFrame): Input DataFrame containing user data.

    Returns:
    DataFrame: Cleaned and transformed DataFrame.
    """
    # Create a new column 'user_name' that concatenates 'first_name' and 'last_name'
    df_user = df_user.withColumn("user_name", F.concat_ws(" ", F.col("first_name"), F.col("last_name")))

    # Drop the 'first_name' and 'last_name' columns from the DataFrame
    df_user = df_user.drop("first_name", "last_name")

    # Convert the 'date_joined' column from a string to a timestamp data type
    df_user = df_user.withColumn("date_joined", F.col("date_joined").cast("timestamp"))

    # Reorder the DataFrame columns to have the specified column order
    df_user = df_user.select("ind", "user_name", "age", "date_joined")
    
    return df_user

# Apply the cleaning function to the df_user DataFrame
df_user = clean_df_user(df_user)

In [0]:
%sql
-- Register the DataFrames as temporary views
CREATE OR REPLACE TEMPORARY VIEW pin_view AS
SELECT ind, category
FROM df_pin;

CREATE OR REPLACE TEMPORARY VIEW geo_view AS
SELECT ind, country
FROM df_geo;

-- Join the pin and geo views on the common column 'ind'
CREATE OR REPLACE TEMPORARY VIEW pin_geo_join AS
SELECT g.country, p.category
FROM pin_view p
JOIN geo_view g
ON p.ind = g.ind;

-- Perform group-wise counting to find the most popular category in each country
CREATE OR REPLACE TEMPORARY VIEW popular_category_per_country AS
SELECT country, category, COUNT(*) AS category_count
FROM pin_geo_join
GROUP BY country, category;

-- Rank the categories within each country based on category count
CREATE OR REPLACE TEMPORARY VIEW ranked_categories_per_country AS
SELECT country, category, category_count,
       ROW_NUMBER() OVER (PARTITION BY country ORDER BY category_count DESC) AS category_rank
FROM popular_category_per_country;

-- Filter the result to get only the most popular category in each country
CREATE OR REPLACE TEMPORARY VIEW most_popular_category_per_country AS
SELECT country, category, category_count
FROM ranked_categories_per_country
WHERE category_rank = 1;

-- Convert the result to a DataFrame
SELECT * FROM most_popular_category_per_country;


In [0]:
%sql
-- Register the DataFrames as temporary views
CREATE OR REPLACE TEMPORARY VIEW pin_view AS
SELECT ind, category
FROM df_pin;

CREATE OR REPLACE TEMPORARY VIEW geo_view AS
SELECT ind, timestamp
FROM df_geo;

-- Join the pin and geo views on the common column 'ind'
CREATE OR REPLACE TEMPORARY VIEW pin_geo_join AS
SELECT p.category, g.timestamp
FROM pin_view p
JOIN geo_view g
ON p.ind = g.ind;

-- Filter the data for posts between 2018 and 2022
CREATE OR REPLACE TEMPORARY VIEW filtered_posts AS
SELECT category, timestamp
FROM pin_geo_join
WHERE YEAR(timestamp) BETWEEN 2018 AND 2022;

-- Extract the year from the timestamp column
CREATE OR REPLACE TEMPORARY VIEW posts_with_year AS
SELECT category, YEAR(timestamp) AS post_year
FROM filtered_posts;

-- Perform group-wise counting to find the number of posts for each category in each year
CREATE OR REPLACE TEMPORARY VIEW category_post_count AS
SELECT post_year, category, COUNT(*) AS category_count
FROM posts_with_year
GROUP BY post_year, category;

-- Convert the result to a DataFrame
SELECT * FROM category_post_count
ORDER BY post_year;

In [0]:
%sql
-- Register the DataFrames as temporary views
CREATE OR REPLACE TEMPORARY VIEW user_view AS
SELECT ind, user_name
FROM df_user;

CREATE OR REPLACE TEMPORARY VIEW geo_view AS
SELECT ind, country
FROM df_geo;

CREATE OR REPLACE TEMPORARY VIEW pin_view AS
SELECT ind, poster_name, follower_count
FROM df_pin;

-- Join the views on the common column 'ind'
CREATE OR REPLACE TEMPORARY VIEW join_view AS
SELECT u.user_name, g.country, p.poster_name, p.follower_count
FROM user_view u
JOIN geo_view g ON u.ind = g.ind
JOIN pin_view p ON p.ind = g.ind;

-- Rank the users within each country based on follower_count
CREATE OR REPLACE TEMPORARY VIEW ranked_users_per_country AS
SELECT country, user_name, poster_name, follower_count,
       ROW_NUMBER() OVER (PARTITION BY country ORDER BY follower_count DESC) AS user_rank
FROM join_view;

-- Filter the result to get only the user with the most followers in each country
CREATE OR REPLACE TEMPORARY VIEW user_with_most_followers_per_country AS
SELECT country, poster_name, follower_count
FROM ranked_users_per_country
WHERE user_rank = 1;

-- Convert the result to a DataFrame
SELECT * FROM user_with_most_followers_per_country;

-- Perform group-wise counting to find the user with the most followers in each country
CREATE OR REPLACE TEMPORARY VIEW country_with_user_most_followers AS
SELECT country, MAX(follower_count) AS max_follower_count
FROM user_with_most_followers_per_country
GROUP BY country;

-- Find the country with the user with the most followers
CREATE OR REPLACE TEMPORARY VIEW country_with_most_followers AS
SELECT c.country, u.poster_name, u.follower_count
FROM user_with_most_followers_per_country u
JOIN country_with_user_most_followers c
ON u.country = c.country AND u.follower_count = c.max_follower_count;

-- Convert the result to a DataFrame
SELECT * FROM country_with_most_followers;

In [0]:
%sql
-- Register the DataFrames as temporary views
CREATE OR REPLACE TEMPORARY VIEW user_view AS
SELECT ind, user_name
FROM df_user;

CREATE OR REPLACE TEMPORARY VIEW geo_view AS
SELECT ind, country
FROM df_geo;

CREATE OR REPLACE TEMPORARY VIEW pin_view AS
SELECT ind, poster_name, follower_count
FROM df_pin;

-- Join the views on the common column 'ind'
CREATE OR REPLACE TEMPORARY VIEW join_view AS
SELECT u.user_name, g.country, p.poster_name, p.follower_count
FROM user_view u
JOIN geo_view g ON u.ind = g.ind
JOIN pin_view p ON p.ind = g.ind;

-- Rank the users within each country based on follower_count
CREATE OR REPLACE TEMPORARY VIEW ranked_users_per_country AS
SELECT country, user_name, poster_name, follower_count,
       ROW_NUMBER() OVER (PARTITION BY country ORDER BY follower_count DESC) AS user_rank
FROM join_view;

-- Filter the result to get only the user with the most followers in each country
CREATE OR REPLACE TEMPORARY VIEW user_with_most_followers_per_country AS
SELECT country, poster_name, follower_count
FROM ranked_users_per_country
WHERE user_rank = 1;

-- Perform group-wise counting to find the user with the most followers in each country
CREATE OR REPLACE TEMPORARY VIEW country_with_user_most_followers AS
SELECT country, MAX(follower_count) AS max_follower_count
FROM user_with_most_followers_per_country
GROUP BY country;

-- Find the country with the user with the most followers
CREATE OR REPLACE TEMPORARY VIEW country_with_most_followers AS
SELECT c.country, u.follower_count
FROM user_with_most_followers_per_country u
JOIN country_with_user_most_followers c
ON u.country = c.country AND u.follower_count = c.max_follower_count;

-- Select the country with the highest number of followers
CREATE OR REPLACE TEMPORARY VIEW country_with_highest_followers AS
SELECT country, follower_count
FROM country_with_most_followers
ORDER BY follower_count DESC
LIMIT 1;

-- Convert the result to a DataFrame containing only the desired columns
SELECT country, follower_count FROM country_with_highest_followers;

In [0]:
%sql
-- Register the DataFrames as temporary views
CREATE OR REPLACE TEMPORARY VIEW pin_view AS
SELECT ind, category
FROM df_pin;

CREATE OR REPLACE TEMPORARY VIEW user_view AS
SELECT ind, age
FROM df_user;

-- Join the views on the common column 'ind'
CREATE OR REPLACE TEMPORARY VIEW join_view AS
SELECT p.category, u.age
FROM pin_view p
JOIN user_view u
ON p.ind = u.ind;

-- Create age groups based on the age column
CREATE OR REPLACE TEMPORARY VIEW age_groups AS
SELECT *,
       CASE 
           WHEN age >= 18 AND age <= 24 THEN '18-24'
           WHEN age >= 25 AND age <= 35 THEN '25-35'
           WHEN age >= 36 AND age <= 50 THEN '36-50'
           WHEN age >= 51 THEN '+50'
       END AS age_group
FROM join_view;

-- Perform group-wise counting to find the most popular category in each age group
CREATE OR REPLACE TEMPORARY VIEW popular_category_by_age_group AS
SELECT age_group, category, COUNT(*) AS category_count
FROM age_groups
GROUP BY age_group, category;

-- Select the most popular category in each age group
CREATE OR REPLACE TEMPORARY VIEW most_popular_category_per_age_group AS
SELECT age_group, category, category_count,
       ROW_NUMBER() OVER (PARTITION BY age_group ORDER BY category_count DESC) AS category_rank
FROM popular_category_by_age_group;

-- Filter the result to get only the most popular category in each age group
CREATE OR REPLACE TEMPORARY VIEW result AS
SELECT age_group, category, category_count
FROM most_popular_category_per_age_group
WHERE category_rank = 1;

-- Convert the result to a DataFrame
SELECT * FROM result;


In [0]:
%sql
-- Register the DataFrames as temporary views
CREATE OR REPLACE TEMPORARY VIEW pin_view AS
SELECT ind, follower_count
FROM df_pin;

CREATE OR REPLACE TEMPORARY VIEW user_view AS
SELECT ind, age
FROM df_user;

-- Join the views on the common column 'ind'
CREATE OR REPLACE TEMPORARY VIEW join_view AS
SELECT p.follower_count, u.age
FROM pin_view p
JOIN user_view u
ON p.ind = u.ind;

-- Create age groups based on the age column
CREATE OR REPLACE TEMPORARY VIEW age_groups AS
SELECT *,
       CASE 
           WHEN age >= 18 AND age <= 24 THEN '18-24'
           WHEN age >= 25 AND age <= 35 THEN '25-35'
           WHEN age >= 36 AND age <= 50 THEN '36-50'
           ELSE '+50'
       END AS age_group
FROM join_view;

-- Calculate the median follower count for each age group
CREATE OR REPLACE TEMPORARY VIEW median_follower_count_per_age_group AS
SELECT age_group,
       percentile_approx(follower_count, 0.5) AS median_follower_count
FROM age_groups
GROUP BY age_group;

-- Convert the result to a DataFrame
SELECT * FROM median_follower_count_per_age_group;

In [0]:
%sql
-- Register the DataFrame as a temporary view
CREATE OR REPLACE TEMPORARY VIEW user_view AS
SELECT date_joined
FROM df_user;

-- Extract the year from the date_joined column and filter for users who joined between 2015 and 2020
CREATE OR REPLACE TEMPORARY VIEW filtered_users AS
SELECT YEAR(date_joined) AS join_year
FROM user_view
WHERE YEAR(date_joined) BETWEEN 2015 AND 2020;

-- Count the number of users who joined each year
CREATE OR REPLACE TEMPORARY VIEW users_joined_per_year AS
SELECT join_year, COUNT(*) AS number_users_joined
FROM filtered_users
GROUP BY join_year;

-- Convert the result to a DataFrame
SELECT * FROM users_joined_per_year;

In [0]:
%sql
-- Register the DataFrames as temporary views
CREATE OR REPLACE TEMPORARY VIEW user_view AS
SELECT ind, date_joined
FROM df_user;

CREATE OR REPLACE TEMPORARY VIEW pin_view AS
SELECT ind, follower_count
FROM df_pin;

-- Join the views on the common column 'ind'
CREATE OR REPLACE TEMPORARY VIEW join_view AS
SELECT p.follower_count, u.date_joined
FROM pin_view p
JOIN user_view u
ON p.ind = u.ind;

-- Filter the data to include only users who joined between 2015 and 2020
CREATE OR REPLACE TEMPORARY VIEW filtered_users AS
SELECT follower_count, YEAR(date_joined) AS join_year
FROM join_view
WHERE YEAR(date_joined) BETWEEN 2015 AND 2020;

-- Calculate the median follower count for users who joined between 2015 and 2020
CREATE OR REPLACE TEMPORARY VIEW median_follower_count AS
SELECT join_year,
       percentile_approx(follower_count, 0.5) AS median_follower_count
FROM filtered_users
GROUP BY join_year;

-- Convert the result to a DataFrame
SELECT * FROM median_follower_count;


In [0]:
%sql
-- Register the DataFrames as temporary views
CREATE OR REPLACE TEMPORARY VIEW user_view AS
SELECT ind, date_joined, age
FROM df_user;

CREATE OR REPLACE TEMPORARY VIEW pin_view AS
SELECT ind, follower_count
FROM df_pin;

-- Join the views on the common column 'ind'
CREATE OR REPLACE TEMPORARY VIEW join_view AS
SELECT p.follower_count, u.date_joined, u.age
FROM pin_view p
JOIN user_view u
ON p.ind = u.ind;

-- Filter the data to include only users who joined between 2015 and 2020
CREATE OR REPLACE TEMPORARY VIEW filtered_users AS
SELECT follower_count, YEAR(date_joined) AS join_year, age
FROM join_view
WHERE YEAR(date_joined) BETWEEN 2015 AND 2020;

-- Create age groups based on the age column
CREATE OR REPLACE TEMPORARY VIEW age_groups AS
SELECT *,
       CASE 
           WHEN age >= 18 AND age <= 24 THEN '18-24'
           WHEN age >= 25 AND age <= 35 THEN '25-35'
           WHEN age >= 36 AND age <= 50 THEN '36-50'
           ELSE '+50'
       END AS age_group
FROM filtered_users;

-- Calculate the median follower count for each age group
CREATE OR REPLACE TEMPORARY VIEW median_follower_count_per_age_group AS
SELECT age_group, join_year,
       percentile_approx(follower_count, 0.5) AS median_follower_count
FROM age_groups
GROUP BY age_group, join_year;

-- Convert the result to a DataFrame
SELECT * FROM median_follower_count_per_age_group;
