In [0]:
# Required Dictionairies
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType

# Load table from Databricks Catalog
df_pin = spark.read.table("workspace.default.kafka_test_1")

# Drop Duplicates
df_pin = df_pin.dropDuplicates()
print(f'{df_pin.count()} rows')

# Replace unwanted values with None
df_pin = df_pin.replace({
    'User Info Error': None,
    'No description available Story format': None,
    'Image src error.': None,
    'N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e': None,
    'No Title Data Available': None
}, subset=['follower_count', 'description', 'image_src', 'tag_list', 'title'])

# Convert follower_count to numeric
df_pin = df_pin.withColumn(
    "follower_count",
    when(col("follower_count").contains("k"), 
         regexp_replace(col("follower_count"), "k", "").cast("double") * 1000)
    .when(col("follower_count").contains("M"), 
         regexp_replace(col("follower_count"), "M", "").cast("double") * 1000000)
    .otherwise(col("follower_count").cast("integer"))
    .cast(IntegerType())
)

# Convert columns to IntegerType
numeric_columns = ["downloaded", "index"]
for col_name in numeric_columns:
    df_pin = df_pin.withColumn(col_name, col(col_name).cast(IntegerType()))

# Extract path from save_location
df_pin = df_pin.withColumn("save_location", split(col("save_location"), "Local save in ").getItem(1))

# Rename index column
df_pin = df_pin.withColumnRenamed("index", "ind")

# Reorder columns
df_pin = df_pin.select(
    "ind", "unique_id", "title", "description", "follower_count", 
    "poster_name", "tag_list", "is_image_or_video", "image_src", 
    "save_location", "category"
)

# Display cleaned DataFrame
clean_pin = df_pin
display(clean_pin)

355 rows


ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
8753,13b1bc17-0abf-44b2-b8d5-8557a24fe623,100+ Absolutely Gorgeous Birth Flower Tattoos,These are such a pretty way to honor the month you were born.,5000000.0,POPSUGAR,"Dainty Flower Tattoos,Narcissus Flower Tattoos,Little Flower Tattoos,Realistic Flower Tattoo,Colorful Flower Tattoo,Vintage Flower Tattoo,Simple Flower Tattoo,Daffodil Tattoo,Birth Flower Tattoos",image,https://i.pinimg.com/originals/6f/7f/de/6f7fdef66089357af14fe031822f9e5f.webp,/data/tattoos,tattoos
10573,d6036dcc-042f-47b7-aec2-5ee02e17ce92,MOTOR,The best luxury cars - Los mejores coches de lujo #cochesdelujo #superdeportivo #supercars #autos #superdeportivos #cars #luxurycars #lujos #coches #carros #supercar #luxurycar…,121000.0,MyLuxePoint,"Luxury Sports Cars,New Luxury Cars,Sport Cars,Bmw Sports Car,Best Sports Cars,Luxury Vehicle,Motor Sport,Allroad Audi,Rs6 Audi",image,https://i.pinimg.com/originals/35/38/2b/35382b507d8b10c3be0ad2f50f49c1c8.jpg,/data/vehicles,vehicles
4779,4de3d8d3-d03f-4f83-a9d1-569b046b574c,ME CASO ¿Y AHORA QUÉ?,"Hoy voy a hablaros de ‘Me Caso y ¿Ahora Qué?’, una iniciativa de Raquel Canseco, creadora del blog Dorothy’s Red Shoes que nació con el objetivo de ayudar a las novias a organiz…",28000.0,Una Boda Original UBO,"Hotel Conference Rooms,Events Place,Virtuous Woman,Creative Workshop,Outdoor Venues,Learning Letters,New Crafts,Event Styling,Beach Themes",image,https://i.pinimg.com/originals/5c/d5/bf/5cd5bfc7a108361a36292959238a3344.jpg,/data/event-planning,event-planning
8472,c55d27cb-b707-46c4-bd1b-bce0e7fe7867,Inspirational Quote,"Fred Rogers would have been 87 today. Happy Birthday, Mr. Rogers!",338000.0,Yahoo Life,"Great Quotes,Quotes To Live By,Me Quotes,Inspirational Quotes,Friend Quotes,Motivational,Gospel Quotes,Food Quotes,Faith Quotes",image,https://i.pinimg.com/originals/49/a5/8c/49a58cfdd860a43c155c246c90f848f9.jpg,/data/quotes,quotes
3972,689146e7-c78a-4f37-b307-ec809ac2f239,"Do you Understand? Umm, I think so...","A free, visual self assessment rubric to help students communicate understanding.",457.0,Ceci Summerall,"Student Self Assessment,Assessment For Learning,Learning Targets,Formative Assessment,Student Self Evaluation,Teaching Strategies,Teaching Tools,Teacher Resources,Teaching Art",image,https://i.pinimg.com/originals/8a/d6/66/8ad6665a53186f35520ed2779d7c688e.png,/data/education,education
4946,b420fea7-fba1-4714-a3ab-5d1c0ee20d22,Robot Challenge Screen,Inspire Others...83430 864SharesDid you know that the Internet is full of Free Stuff for Brides? It’s true. Wedding Freebies are everywhere and we’ve tracked them all down for y…,967.0,Green Eyed Girl Productions,"Super Hero Training,Event Planning,Wedding Planning,Wedding Freebies,Girl With Green Eyes,Graduation Party Themes,Buy Milk,Milk Glass Vase,It Gets Better",image,https://i.pinimg.com/originals/83/f1/9c/83f19c8007050b48bb13cea8cfa833c2.jpg,/data/event-planning,event-planning
3110,f63c2acf-f08e-4b59-98d6-b14fcb31a307,Wine Cork Crafts • What to do with Wine Corks,Save those wine corks for a rainy day craft project! Upcycle wine corks to make cute craft decorations with your kids.,12000.0,PROJECT KID,"Projects For Kids,Diy For Kids,Crafts For Kids,Craft Projects,Welding Projects,Foam Crafts,Decor Crafts,Craft Decorations,Shell Crafts",image,https://i.pinimg.com/originals/62/03/59/6203592be8361d2e675d451669c83f7a.jpg,/data/diy-and-crafts,diy-and-crafts
9879,4921b6e7-af3d-422c-b326-dd718618de1c,Norway On A Budget -Locals' Secret Tips For Cheap Travel in Norway,"How to travel Norway on a budget. Secret local tips for cheap travel in Norway, cheap accommodation in Norway, budget transport, cheap food & more",8000.0,Worldering Around | Travel Blog,"Top Travel Destinations,Europe Travel Tips,European Travel,Travel Advice,Budget Travel,Travel Deals,Free Travel,Oslo,Lofoten",image,https://i.pinimg.com/originals/70/a7/4d/70a74d3d91f8974ced16b015968e5558.jpg,/data/travel,travel
2805,e9c4ba1d-58dd-490f-bca0-6ab6ea1498c2,An Inexpensive and Easy Gift that Parents will Actually Love,"Make a simple parent gift with your classroom. Use this free printable Christmas ornament to make a meaningful Christmas gift for kids to give to parents. Easy, DIY, and inexpensive...",50000.0,Early Learning Ideas,"Printable Christmas Ornaments,Preschool Christmas Crafts,Preschool Gifts,Free Christmas Printables,Christmas Presents For Parents,Meaningful Christmas Gifts,Student Christmas Gifts,Best Gifts For Kids,Christmas Ornaments For Students",image,https://i.pinimg.com/originals/e4/07/14/e40714337928d277f05f57f15cad85c9.jpg,/data/diy-and-crafts,diy-and-crafts
8857,bf3d3376-d73d-410a-bacb-b29f061d5f16,Petite Tattoo Gallery for Women,Wrist Tattoo,457.0,Jennifer W.,"Cute Tattoos,Beautiful Tattoos,New Tattoos,Body Art Tattoos,Heart Wrist Tattoos,Arm Tattoo,Tattoo Cake,Facial Tattoos,Henna Tattoos",image,https://i.pinimg.com/originals/e2/71/6a/e2716af1d8648315497149936f759242.jpg,/data/tattoos,tattoos


In [0]:
# Load table from Databricks Catalog
df_geo = spark.read.table("workspace.default.geo_mo")

# drop dulpicate rows
df_geo = df_geo.dropDuplicates()
Drop_dup = df_geo.count() 
print(f'{Drop_dup} rows')

# Create the coordinates column
df_geo = df_geo.withColumn("coordinates", F.array("latitude", "longitude"))

# Drop the latitude and longitude columns
df_geo = df_geo.drop("latitude", "longitude")

#Convert the timestamp column to TimestampType
df_geo = df_geo.withColumn("timestamp", F.col("timestamp").cast(TimestampType())) 

# Reorder columns
df_geo = df_geo.select("ind", "country", "coordinates", "timestamp")
clean_geo = df_geo
display(clean_geo)

718 rows


ind,country,coordinates,timestamp
8420,Algeria,"List(-74.408, -178.774)",2021-04-23T17:50:27.000Z
473,India,"List(85.9766, -124.826)",2022-01-23T15:14:58.000Z
1475,American Samoa,"List(-88.5252, -172.436)",2018-08-29T19:39:53.000Z
10539,Barbados,"List(-73.8461, -167.426)",2020-04-15T02:59:15.000Z
5634,Reunion,"List(19.9974, -83.5076)",2021-02-27T17:22:34.000Z
1809,Seychelles,"List(28.9974, -10.4279)",2018-03-07T09:42:26.000Z
5520,Pakistan,"List(-76.2835, -1.1006)",2021-01-03T08:17:55.000Z
10128,Mozambique,"List(-65.6932, -32.7373)",2018-09-06T06:49:45.000Z
9961,Algeria,"List(13.0591, 155.233)",2021-02-01T23:14:30.000Z
720,Denmark,"List(30.6428, 132.596)",2019-06-17T03:05:42.000Z


In [0]:
# Load table from Databricks Catalog
df_user = spark.read.table("workspace.default.user_mo")

# drop dulpicate rows
df_user = df_user.dropDuplicates()
Drop_dup = df_user.count() 
print(f'{Drop_dup} rows')

# Create a new column user_name that concatenates the information found in the first_name and last_name columns
df_user = df_user.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

# Drop the first_name and last_name columns
df_user = df_user.drop("first_name", "last_name")

# Convert the date_joined column from a string to a timestamp data type
df_user = df_user.withColumn("date_joined", F.col("date_joined").cast(TimestampType()))

# Reorder columns
df_user = df_user.select("ind", "user_name", "age", "date_joined")
clean_user = df_user
display(clean_user)

699 rows


ind,user_name,age,date_joined
3574,Dana Mckinney,25,2015-10-25T14:27:30.000Z
9116,Roberta Perez,20,2016-04-20T19:21:36.000Z
6798,Jonathan Hill,29,2017-06-16T11:39:01.000Z
3714,Mary Perkins,54,2016-12-16T04:26:21.000Z
3017,Christina Allen,47,2016-09-10T01:16:03.000Z
7562,April Anderson,30,2016-06-27T09:35:51.000Z
3194,Amanda Bishop,20,2015-11-10T10:10:30.000Z
10762,Alejandra Acevedo,20,2015-11-24T21:01:23.000Z
2352,Chelsea Williams,39,2017-09-02T18:46:06.000Z
7504,Abigail Ali,20,2015-10-24T12:23:51.000Z


In [0]:
#  Task 4
from pyspark.sql.window import Window

# Joining geo and pin cleaned data 
joined_df = clean_geo.join(clean_pin, "ind")
# display(joined_df)

# Group by country and category, count occurrences, and find the most popular category
window = Window.partitionBy("country").orderBy(F.desc("category_count"))

final_df = joined_df.groupBy("country", "category") \
                    .agg(F.count("*").alias("category_count")) \
                    .withColumn("rank", F.rank().over(window)) \
                    .filter(F.col("rank") == 1) \
                    .drop("rank")

# Display the final result
display(final_df)

country,category,category_count
Afghanistan,vehicles,1
Albania,quotes,1
Algeria,quotes,2
Bahamas,art,1
British Indian Ocean Territory (Chagos Archipelago),beauty,1
Burundi,mens-fashion,1
Cambodia,tattoos,1
Cocos (Keeling) Islands,travel,1
Cook Islands,christmas,1
Dominican Republic,home-decor,1


In [0]:
#  Task 5
df_result = (clean_pin.join(clean_geo, 'ind', 'inner')
             .withColumn("timestamp", F.col("timestamp").cast("timestamp"))
             .filter((F.year("timestamp") >= 2018) & (F.year("timestamp") <= 2022))
             .withColumn("post_year", F.year("timestamp"))
             .groupBy("post_year", "category")
             .agg(F.count("*").alias("category_count"))
             .orderBy("post_year", "category"))


# Display the final result
display(df_result)

post_year,category,category_count
2018,art,1
2018,quotes,2
2018,vehicles,1
2019,christmas,1
2019,diy-and-crafts,1
2019,home-decor,1
2020,education,1
2020,mens-fashion,1
2020,quotes,1
2021,beauty,1


In [0]:
#  Task 6 step 1
window_spec = Window.partitionBy("country")

# Filter users with the maximum follower count per country
country_followers = (
    joined_df.withColumn("max_follower_count", max("follower_count").over(window_spec))
    .filter(col("follower_count") == col("max_follower_count"))
    .select("country", "poster_name", "follower_count")
    .dropDuplicates()
    .orderBy(col("follower_count").desc())
)

display(country_followers)

country,poster_name,follower_count
Algeria,YourTango,942000
Burundi,Macho Moda,620000
Cambodia,Colossal,294000
Bahamas,The Kitchen Table Classroom,221000
Niger,MYCREATIVELOOK | Men’s Outfit Grids,146000
Dominican Republic,Wonder Forest,104000
Guatemala,Menucha - Moms and Crafters,85000
Kazakhstan,Christina Maria - DIY Home Improvement + Decor,75000
Afghanistan,Ethnic Earring,14000
Germany,Across the Blvd,10000


In [0]:
# Step 2 (Tried sql spark to show results)
country_followers.createOrReplaceTempView("country_followers")

# Run the Spark SQL query
most_followers = spark.sql("""
    SELECT country, MAX(follower_count) AS follower_count
    FROM country_followers
    GROUP BY country
    ORDER BY follower_count DESC
    LIMIT 1
""")

# Display the result
display(most_followers)


country,follower_count
Algeria,942000


In [0]:
# Task 7
clean_pin.createOrReplaceTempView("clean_pin")
clean_user.createOrReplaceTempView("clean_user")

age_group = spark.sql("""
WITH AGERANGE AS (
    SELECT 
        u.age, 
        p.category,
        CASE 
            WHEN u.age BETWEEN 18 AND 24 THEN '18-24'
            WHEN u.age BETWEEN 25 AND 35 THEN '25-35'
            WHEN u.age BETWEEN 36 AND 50 THEN '36-50'
            ELSE '+50'
        END AS age_group
    FROM clean_user u
    INNER JOIN clean_pin p 
        ON u.ind = p.ind
),

Categories AS (
    SELECT 
        age_group, 
        category, 
        COUNT(*) AS category_count,
        RANK() OVER (PARTITION BY age_group ORDER BY COUNT(*) DESC) AS rank
    FROM AGERANGE
    GROUP BY age_group, category
)

SELECT age_group, category, category_count 
FROM Categories 
""")

display(age_group)

age_group,category,category_count
36-50,home-decor,1
18-24,diy-and-crafts,2
+50,education,1
25-35,finance,2
18-24,education,1
18-24,christmas,1
18-24,mens-fashion,1
+50,beauty,1
18-24,tattoos,1
18-24,home-decor,1


In [0]:
# Task 8 
median_followers = spark.sql("""
WITH AgeGroups AS (
    SELECT
        u.age,
        p.follower_count,
        CASE
            WHEN u.age BETWEEN 18 AND 24 THEN '18-24'
            WHEN u.age BETWEEN 25 AND 35 THEN '25-35'
            WHEN u.age BETWEEN 36 AND 50 THEN '36-50'
            ELSE '+50'
        END AS age_group
    FROM clean_user u
    JOIN clean_pin p 
    ON u.ind = p.ind
)

SELECT
    age_group, percentile(follower_count, 0.5) AS median_followers_count
FROM AgeGroups
GROUP BY age_group
ORDER BY median_followers_count DESC
""")

display(median_followers)

age_group,median_followers_count
18-24,135000.0
25-35,17000.0
+50,748.5
36-50,27.0


In [0]:
# Task 9
user_joined = spark.sql("""
SELECT
    YEAR(date_joined) AS post_year,
    COUNT(*) AS number_users_joined
FROM clean_user
WHERE YEAR(date_joined) BETWEEN 2015 AND 2020
GROUP BY post_year
ORDER BY post_year
""")

display(user_joined)


post_year,number_users_joined
2015,271
2016,309
2017,119


In [0]:
# Task 10
median_count = spark.sql("""
SELECT
    YEAR(CAST(u.date_joined AS DATE)) AS post_year, 
    percentile_approx(p.follower_count, 0.5, 100) AS median_followers_count  
FROM clean_user u
JOIN clean_pin p
ON u.ind = p.ind
WHERE YEAR(CAST(u.date_joined AS DATE)) BETWEEN 2015 AND 2020
GROUP BY post_year
ORDER BY post_year ASC
""")

display(median_count)


post_year,median_followers_count
2015,1000000
2016,17000
2017,497


In [0]:
# Task 11
median_follower_age = spark.sql("""
WITH AGEGROUPS AS (
    SELECT CASE
        WHEN age BETWEEN 18 AND 24 THEN '18-24'
        WHEN age BETWEEN 25 AND 35 THEN '25-35'
        WHEN age BETWEEN 36 AND 50 THEN '36-50'
        ELSE '+50' 
    END AS age_group,
    YEAR(CAST(u.date_joined AS DATE)) AS post_year,
    p.follower_count
    FROM clean_user u
    JOIN clean_pin p 
    ON u.ind = p.ind
    WHERE YEAR(CAST(u.date_joined AS DATE)) BETWEEN 2015 AND 2020
)
SELECT age_group, post_year, percentile_approx(follower_count, 0.5, 100) AS median_follower_count
FROM AGEGROUPS
GROUP BY age_group, post_year
ORDER BY age_group, post_year
""")

display(median_follower_age)


age_group,post_year,median_follower_count
+50,2016,1000
+50,2017,497
18-24,2015,1000000
18-24,2016,69000
18-24,2017,86000
25-35,2015,77000
25-35,2016,5000
36-50,2016,27
