In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# AWS S3 bucket name
AWS_S3_BUCKET = "user-12d4ce482aeb-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/aws_data"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

In [0]:
display(dbutils.fs.ls("/mnt/aws_data/topics"))

path,name,size,modificationTime
dbfs:/mnt/aws_data/topics/12d4ce482aeb.geo/,12d4ce482aeb.geo/,0,1697639959725
dbfs:/mnt/aws_data/topics/12d4ce482aeb.pin/,12d4ce482aeb.pin/,0,1697639959725
dbfs:/mnt/aws_data/topics/12d4ce482aeb.user/,12d4ce482aeb.user/,0,1697639959725


In [0]:
df_geo = spark.read.json("/mnt/aws_data/topics/12d4ce482aeb.geo/partition=0/")
df_pin = spark.read.json("/mnt/aws_data/topics/12d4ce482aeb.pin/partition=0/")
df_user = spark.read.json("/mnt/aws_data/topics/12d4ce482aeb.user/partition=0/")

In [0]:
def get_missing_vals(df):
    for col in df.columns:
        print(f'{col} {(df.schema[col].dataType)}: {df.filter(df[col].isNull()).count()}')

get_missing_vals(df_pin)

In [0]:
from pyspark.sql.functions import col,when

def replace_invalid_data(df,col_name,value_to_replace):
    df = df.withColumn(col_name, \
            when(col(col_name).like(value_to_replace),None)
            .otherwise(col(col_name)))
    return df

obj = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}

for k,v in obj.items():
        df_pin = replace_invalid_data(df_pin,k,v)

In [0]:
from pyspark.sql.functions import regexp_replace, regexp_extract

## Replacing letters
df_pin = df_pin.withColumn("follower_count",regexp_replace("follower_count","k","000"))
df_pin = df_pin.withColumn("follower_count",regexp_replace("follower_count","M","000000"))

## Converting datatype
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast("int"))

## Removing unwanted strings from save_location column
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

## Renaming column
df_pin = df_pin.withColumnRenamed("index","ind")

## Creating new column order
new_pin_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_pin_column_order)

In [0]:
df_pin.printSchema()

In [0]:
get_missing_vals(df_geo)

In [0]:
## Creating new column based on longitude and latitude, placing both values in an array
df_geo = df_geo.withColumn("coordinates",array(concat_ws(",",col("latitude"),col("longitude"))))

## Dropping longitude and latitude columns
df_geo = df_geo.drop("latitude","longitude")

## Converting timestamp column datatype
df_geo = df_geo.withColumn("timestamp",col("timestamp").cast("timestamp"))

## Re-ordering Dataframe
new_geo_column_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp"
]

df_geo = df_geo.select(new_geo_column_order)

df_geo.printSchema()

In [0]:
get_missing_vals(df_user)

In [0]:
## Creating new column based on first_name and last_name
df_user = df_user.withColumn("user_name",concat_ws(" ",col("first_name"),col("last_name")))

## Dropping first_name and last_name columns
df_user = df_user.drop("first_name","last_name")

## Converting timestamp column datatype
df_user = df_user.withColumn("date_joined",col("date_joined").cast("timestamp"))

## Re-ordering Dataframe
new_user_column_order = [
    "ind",
    "user_name",
    "age",
    "date_joined"
]

df_user = df_user.select(new_user_column_order)

df_user.printSchema()

In [0]:
# import for performing window functions
from pyspark.sql.window import Window
# join df_pin and df_geo dataframes on index
pin_geo = df_pin.join(df_geo, df_pin.ind == df_geo.ind)
# join df_pin and df_user and create temp view for SQL query
df_pin.join(df_user, df_pin.ind == df_user.ind).createOrReplaceTempView("category_age")
# SQL query to create age group column
pin_user_age_group = spark.sql(
    "SELECT CASE \
        WHEN age between 18 and 24 then '18-24' \
        WHEN age between 25 and 35 then '25-35' \
        WHEN age between 36 and 50 then '36-50' \
        WHEN age > 50 then '50+' \
        END as age_group, * FROM category_age")




In [0]:
# create partition by country and order by category_count descending
windowCountryByCatCount = Window.partitionBy("country").orderBy(col("category_count").desc())
# find the most popular category in each country
pin_geo.groupBy("country", "category") \
.agg(count("category") \
.alias("category_count")) \
.withColumn("rank", row_number().over(windowCountryByCatCount)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()



In [0]:


# create partition by year and order by category_count descending
windowYearByCatCount = Window.partitionBy("post_year").orderBy(col("category_count").desc())
# find which was the most popular category each year between 2018 and 2022
pin_geo.withColumn("post_year", year("timestamp")) \
.filter(col("post_year") >= 2018) \
.filter(col("post_year") <= 2022) \
.groupBy("post_year", "category") \
.agg(count("category").alias("category_count")) \
.withColumn("rank", row_number().over(windowYearByCatCount)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()



In [0]:


# create partition by country and order by follower_count descending
windowCountryByFollowers = Window.partitionBy("country").orderBy(col("follower_count").desc())

# find the user with the most followers in each country
max_followers_by_country = \
    df_pin.join(df_geo, df_pin.ind == df_geo.ind) \
    .withColumn("rank", row_number().over(windowCountryByFollowers)) \
    .filter(col("rank") == 1) \
    .select("country", "poster_name", "follower_count")

# get highest number of followers from all countries
max_followers_all_countries = max_followers_by_country.select(max("follower_count")).collect()[0][0]

# find the country with the user with most followers
country_with_max_followers = \
    max_followers_by_country \
    .select("*") \
    .where(col("follower_count") == max_followers_all_countries)

max_followers_by_country.show()
country_with_max_followers.show()



In [0]:


# create partition by age_group and order by category_count descending
windowAgeGroup = Window.partitionBy("age_group").orderBy(col("category_count").desc())
# find the most popular category for different age groups
pin_user_age_group.groupBy("age_group", "category") \
.agg(count("category").alias("category_count")) \
.withColumn("rank", row_number().over(windowAgeGroup)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()



In [0]:


# find the median follower count for different age groups
pin_user_age_group \
.select("user_name", "date_joined", "age_group", "follower_count") \
.distinct() \
.groupBy("age_group") \
.agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
.orderBy("age_group") \
.show()



In [0]:


# find out how many users joined each year
df_user.withColumn("post_year", year("date_joined")) \
.drop("ind") \
.distinct() \
.groupBy("post_year") \
.agg(count("user_name").alias("number_users_joined")) \
.orderBy("post_year") \
.show()



In [0]:


# find the median follower count of users based on their joining year
pin_user_age_group \
.select("user_name", "date_joined", "follower_count") \
.distinct() \
.withColumn("post_year", year("date_joined")) \
.groupBy("post_year") \
.agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
.orderBy("post_year") \
.show()



In [0]:


# find the median follower count of users that have joined between 2015 and 2020, based on which age group they are part of
pin_user_age_group \
.select("user_name", "age_group", "date_joined", "follower_count") \
.distinct() \
.withColumn("post_year", year("date_joined")) \
.groupBy("post_year", "age_group") \
.agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
.orderBy("post_year", "age_group") \
.show()



In [0]:
dbutils.fs.unmount("/mnt/aws_data")