## Find most popular category in each country

In [None]:
# Using SQL
# Create temp view for SQL query
df_128a59195de3_pin.createOrReplaceTempView("pin")
df_128a59195de3_geo.createOrReplaceTempView("geo")
df_128a59195de3_user.createOrReplaceTempView("user")
# SQL query
result = spark.sql(
    """SELECT country, category, category_count
        FROM (
                SELECT country
                        ,category
                        ,COUNT(*) AS category_count
                        ,ROW_NUMBER() OVER (PARTITION BY country ORDER BY COUNT(*) DESC) AS rank
                FROM geo 
                INNER JOIN pin
                ON pin.ind = geo.ind
                GROUP BY country, category
            ) 
        WHERE rank = 1""")
display(result)


In [None]:
# Using Pyspark
# Create partition by country and order by category_count descending
country_categorycount_rank = Window.partitionBy("country").orderBy(col("category_count").desc())

# Find the most popular category in each country
pin_geo.groupBy("country", "category").agg(count("category").alias("category_count")) \
.withColumn("rank", row_number().over(country_categorycount_rank)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()

## Find the most popular category for each year between 2018 and 2022

In [None]:
# Using SQL
result = spark.sql(
    """SELECT post_year, category, category_count
        FROM (
                SELECT year(timestamp) as post_year
                        ,category
                        ,COUNT(*) AS category_count
                        ,ROW_NUMBER() OVER (PARTITION BY year(timestamp) ORDER BY COUNT(*) DESC) AS rank
                FROM geo 
                INNER JOIN pin 
                ON pin.ind = geo.ind
                GROUP BY year(timestamp), category
            ) 
        WHERE rank = 1 AND post_year BETWEEN 2018 AND 2022""")
display(result)

In [None]:
# Using Pyspark
# Create partition by year and order by category_count descending
year_categorycount_rank = Window.partitionBy("post_year").orderBy(col("category_count").desc())

# Retrive most popular category between 2018 and 2022
# apply filter to post_year between 2018 and 2022
pin_geo.withColumn("post_year", year("timestamp")).filter(col("post_year") >= 2018).filter(col("post_year") <= 2022) 
# group data by country and post_year and aggregate on category count
.groupBy("post_year", "category").agg(count("category").alias("category_count")) \
# use window function to filter by highest ranking category by year
.withColumn("rank", row_number().over(year_categorycount_rank)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()

## User with most followers by country

In [None]:
# Using SQL
# Find user with most followers by country
result1 = spark.sql(
    """ SELECT country
                ,poster_name
                ,max_follower_count
        FROM (
                SELECT country
                        ,poster_name
                        ,MAX(follower_count) as max_follower_count
                        ,ROW_NUMBER() OVER (PARTITION BY country ORDER BY MAX(follower_count) DESC) AS rank
                FROM pin 
                INNER JOIN geo ON geo.ind = pin.ind
                GROUP BY country, poster_name
        )
        WHERE rank = 1
        ORDER BY max_follower_count DESC
    """)

display(result1)

# Identify country with most followers
result2 = spark.sql(
    """ SELECT country
                ,max_follower_count AS follower_count
        FROM (
                SELECT country
                        ,poster_name
                        ,MAX(follower_count) as max_follower_count
                        ,ROW_NUMBER() OVER (PARTITION BY country ORDER BY MAX(follower_count) DESC) AS rank
                FROM pin 
                INNER JOIN geo ON geo.ind = pin.ind
                GROUP BY country, poster_name
        )
        WHERE rank = 1
        ORDER BY max_follower_count DESC
        LIMIT 1
    """)
display(result2)


In [None]:
# Using Pyspark

# Create partition by country and order by follower_count descending
country_rank_maxfollowers = Window.partitionBy("country").orderBy(col("follower_count").desc())

# Find the user with the most followers in each country
max_followers_by_country = pin_geo.withColumn("rank", row_number().over(country_rank_maxfollowers)) \
    .filter(col("rank") == 1) \
    .select("country", "poster_name", "follower_count")

# get highest number of followers from all countries
max_followers_all_countries = max_followers_by_country.select(max("follower_count")).collect()[0][0]

# find the country with the user with most followers
country_with_max_followers = max_followers_by_country.select("*").where(col("follower_count") == max_followers_all_countries)

max_followers_by_country.show()
country_with_max_followers.show()

## Most popular category for different age groups

In [None]:
# Using SQL

result = spark.sql("""
    SELECT age_group, category, category_count
    FROM (
            SELECT age_group
                    ,category
                    ,category_count
                    ,ROW_NUMBER() OVER (PARTITION BY age_group ORDER BY category_count DESC) AS rank
            FROM (
                    SELECT
                        CASE 
                            WHEN age BETWEEN 18 AND 24 THEN "18-24"
                            WHEN age BETWEEN 25 AND 35 THEN "25-35"
                            WHEN age BETWEEN 36 AND 50 THEN "36-50"
                            WHEN age > 50 THEN "+50"
                            ELSE "Below 18" 
                        END AS age_group
                        ,category
                        ,COUNT(*) AS category_count
            FROM user
            JOIN pin ON pin.ind = user.ind
            GROUP BY age_group, category
        )
    )
    WHERE rank = 1
""")

display(result)

In [None]:
# Define age groups
df_user_age = df_128a59195de3_user.withColumn(
    "age_group",
    when(col("age").between(18, 24), "18-24")
    .when(col("age").between(25, 35), "25-35")
    .when(col("age").between(36, 50), "36-50")
    .otherwise("+50")
)
# Join on 'ind' column
df_user_pin = df_user_age.alias("user").join(df_128a59195de3_pin.alias("pin"), col("user.ind") == col("pin.ind"), 'inner')

# Create partition by age_group and order by category_count descending
age_rank_categorycount = Window.partitionBy("age_group").orderBy(col("category_count").desc())

# Find the most popular category for different age groups
df_user_pin.groupBy("age_group", "category").agg(count("category").alias("category_count")) \
.withColumn("rank", row_number().over(windowAgeGroup)) \
.filter(col("rank") == 1) \
.drop("rank") \
.show()

## Median follower count for set age groups

In [None]:
results = spark.sql("""
                    SELECT CASE 
                                WHEN age BETWEEN 18 AND 24 THEN "18-24"
                                WHEN age BETWEEN 25 AND 35 THEN "25-35"
                            WHEN age BETWEEN 36 AND 50 THEN "36-50"
                            WHEN age > 50 THEN "+50"
                            ELSE "NONE" 
                        END AS age_group, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY follower_count) AS median_follower_count
                    FROM user 
                    INNER JOIN pin on pin.ind = user.ind
                    GROUP BY age_group
                    """)

display(results)

In [None]:
# Pyspark
df_user_pin.groupBy("age_group").agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
.orderBy("age_group") \
.show()

## Users joined each year

In [None]:
# Using SQL

results = spark.sql("""
                    SELECT year(date_joined) as post_year
                            ,COUNT(date_joined) as numbers_users_joined
                    FROM user 
                    GROUP by post_year
                    HAVING post_year BETWEEN 2015 and 2020
                    """)

display(results)

In [None]:
# Using Pyspark
df_128a59195de3_user.withColumn("post_year", year("date_joined")) \
.groupBy("post_year") \
.agg(count("user_name").alias("number_users_joined")) \
.orderBy("post_year") \
.show()

## Median follower count of users based on joining year

In [None]:
# Using SQL

results = spark.sql("""
                    SELECT year(date_joined) as join_year
                    ,PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY follower_count) AS median_follower_count
                    FROM user 
                    INNER JOIN pin on pin.ind = user.ind
                    GROUP by join_year
                    HAVING join_year BETWEEN 2015 and 2020
                    """)
display(results)

In [None]:
# Using Pyspark
df_user_pin.withColumn("join_year", year("date_joined")).filter((col("join_year") >= 2015) & (col("join_year") <= 2020)) \
.groupBy("join_year") \
.agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
.orderBy("join_year") \
.show()


## Median follower count based on age group

In [None]:
# Using SQL

results = spark.sql("""
                    SELECT CASE 
                                WHEN age BETWEEN 18 AND 24 THEN "18-24"
                                WHEN age BETWEEN 25 AND 35 THEN "25-35"
                                WHEN age BETWEEN 36 AND 50 THEN "36-50"
                                WHEN age > 50 THEN "+50"
                            ELSE "Under 18" 
                        END AS age_group
                        ,year(date_joined) as join_year
                        ,PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY follower_count) AS median_follower_count
                    FROM user 
                    INNER JOIN pin on pin.ind = user.ind
                    GROUP by join_year, age_group
                    HAVING join_year BETWEEN 2015 and 2020
                    ORDER BY join_year, age_group
                    """)
display(results)

In [None]:
# Using Pyspark
df_user_pin.withColumn("join_year", year("date_joined")).filter((col("join_year") >= 2015) & (col("join_year") <= 2020)) \
.groupBy("join_year", "age_group").agg(percentile_approx("follower_count", 0.5).alias("median_follower_count")) \
.orderBy("join_year", "age_group") \
.show()

In [None]:
# Unmount the bucket from the filestore
dbutils.fs.unmount("/mnt/user-128a59195de3-bucket")