##Cleaning The DataFrames

In [None]:
#Clean the df_pin Data Frame
from pyspark.sql.functions import expr

# a) Replace empty entries and entries with no relevant data in each column with Nones
df_pin_cleaned = df_pin.na.replace('', None)

# b) Perform the necessary transformations on the follower_count column
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", df_pin_cleaned["follower_count"].cast("int"))

# c) Ensure that each column containing numeric data has a numeric data type
numeric_columns = ["downloaded", "index"]
for col in numeric_columns:
    df_pin_cleaned = df_pin_cleaned.withColumn(col, df_pin_cleaned[col].cast("double"))

# d) Clean the data in the save_location column
df_pin_cleaned = df_pin_cleaned.withColumn("save_location", expr("substring_index(save_location, '/', -1)"))

# e) Rename the index column to ind
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")

# f) Reorder the DataFrame columns
column_order = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list",
                "is_image_or_video", "image_src", "save_location", "category"]

df_pin_cleaned = df_pin_cleaned.select(column_order)

# Save the cleaned DataFrame to the specified mount point
output_path_pin = "/mnt/diane_mount/cleaned_data/df_pin_cleaned"
df_pin_cleaned.write.mode("overwrite").parquet(output_path_pin)

# Show the cleaned DataFrame
df_pin_cleaned.show()


In [None]:
#Clean the geo data

from pyspark.sql.functions import col, array, to_timestamp, expr

# a) Create a new column 'coordinates' that contains an array based on the latitude and longitude columns
df_geo_cleaned = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# b) Drop the latitude and longitude columns from the DataFrame
df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")

# c) Convert the timestamp column from a string to a timestamp data type
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", to_timestamp(col("timestamp")))

# d) Reorder the DataFrame columns
column_order_geo = ["ind", "country", "coordinates", "timestamp"]
df_geo_cleaned = df_geo_cleaned.select(column_order_geo)

# Save the cleaned DataFrame to the specified mount point
output_path_geo = "/mnt/diane_mount/cleaned_data/df_geo_cleaned"
df_geo_cleaned.write.mode("overwrite").parquet(output_path_geo)

# Show the cleaned DataFrame
df_geo_cleaned.show()


In [None]:
#clean the user data frame

from pyspark.sql.functions import col, concat_ws, to_timestamp

# a) Create a new column 'user_name' that concatenates the information found in the first_name and last_name columns
df_user_cleaned = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# b) Drop the first_name and last_name columns from the DataFrame
df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")

# c) Convert the date_joined column from a string to a timestamp data type
df_user_cleaned = df_user_cleaned.withColumn("date_joined", to_timestamp(col("date_joined")))

# d) Reorder the DataFrame columns
column_order_user = ["ind", "user_name", "age", "date_joined"]
df_user_cleaned = df_user_cleaned.select(column_order_user)

# Save the cleaned DataFrame to the specified mount point
output_path_user = "/mnt/diane_mount/cleaned_data/df_user_cleaned"
df_user_cleaned.write.mode("overwrite").parquet(output_path_user)

# Show the cleaned DataFrame
df_user_cleaned.show()


##Checking the Data Frames

In [None]:
display(df_geo)

country,ind,latitude,longitude,timestamp
Antarctica (the territory South of 60 deg S),2418,-88.4642,-171.061,2022-05-27 11:30:59
Cocos (Keeling) Islands,10794,-89.5236,-154.567,2022-01-01 02:26:50
Central African Republic,2074,-52.3213,-50.11,2019-11-03 05:41:59
British Virgin Islands,2293,-87.7946,-159.647,2022-03-21 10:46:53
Saint Kitts and Nevis,10663,-27.3474,-162.83,2019-07-25 18:53:51
Antigua and Barbuda,7922,-88.0974,-172.052,2021-01-27 09:14:19
Antigua and Barbuda,8606,-88.0974,-172.052,2021-03-28 14:54:07
Netherlands Antilles,603,14.0083,-141.603,2019-06-25 05:13:01
Dominican Republic,9979,14.9967,-120.682,2018-07-18 19:01:46
Christmas Island,1704,-56.8702,-93.6232,2018-11-29 04:46:39


In [None]:
display(df_pin)

category,description,downloaded,follower_count,image_src,index,is_image_or_video,poster_name,save_location,tag_list,title,unique_id
christmas,"Christmas decorating ideas for porches. Beautiful holiday decor ideas for front porches both small and large. Outdoor decorations like sleds, lanterns, Christmas trees, wreaths,…",1,46k,https://i.pinimg.com/videos/thumbnails/originals/40/83/f5/4083f5b4971bf235f89a4784ab87271e.0000001.jpg,2482,video,"Life on Summerhill | Home, Holiday Decor & DIY Website",Local save in /data/christmas,"Diy Christmas Decorations For Home,Farmhouse Christmas Decor,Christmas Home,Christmas Holidays,Christmas Front Porches,How To Decorate For Christmas,Christmas Porch Ideas,Christmas Decorating Ideas,Large Outdoor Christmas Decorations",FORNT PORCH CHRISTMAS DECORATING IDEAS,08604f20-fa17-4b9a-9949-781717eca6cd
travel,"This Costa Rica itinerary is the ultimate guide to spending two weeks in Costa Rica. Find out about visiting La Fortuna, Arenal, Monteverde, Naranjo, Corcovado National Park, Or…",1,10k,https://i.pinimg.com/originals/30/93/cb/3093cb01d9de2d125fda8ba5e3e41946.jpg,10138,image,"Wanderlust Chloe ✈️ Travel guides, inspo and adventure travel ✈️",Local save in /data/travel,"Costa Rica Travel,Rio Celeste Costa Rica,Dream Vacations,Vacation Spots,Vacation Travel,Travel Pictures,Travel Photos,Fortuna Costa Rica,Costa Rica Pictures","14 Amazing Things To Do In Costa Rica | Volcanoes, Waterfalls, Wildlife And More",927c4658-cc3f-4b92-9b5c-70743d0c238d
diy-and-crafts,"This post may contain affiliate links, read our Disclosure Policy for more information. As an Amazon Associate I earn from qualifying purchases, thank you! Make some cute handpr…",1,892k,https://i.pinimg.com/originals/ff/fe/38/fffe384f3ec18a0d87cb2d80cc8c1499.jpg,3156,image,Michelle {CraftyMorning.com},Local save in /data/diy-and-crafts,"Christmas Gifts For Parents,Christmas Decorations For Kids,Christmas Crafts For Toddlers,Preschool Christmas,Christmas Crafts For Gifts,Christmas Activities,Toddler Crafts,Kids Christmas,Christmas Feeling",Handprint Reindeer Ornaments - Crafty Morning,fa6e31a4-18c2-4eca-a6d8-e903eee2c2a4
diy-and-crafts,Easy Christmas tree Craft Ideas for toddlers and preschoolers. Engage your kids in these DIY,1,3k,https://i.pinimg.com/originals/69/f0/75/69f075939d4449dffa69519756c30e26.png,3419,image,Kids Crafts & Free Preschool Printables- Sharing Our Experiences,Local save in /data/diy-and-crafts,"Christmas Crafts For Kids To Make,Christmas Tree Painting,Christmas Activities For Kids,Easy Christmas Crafts For Toddlers,Kid Activities,Christmas Handprint Crafts,Christmas Tree Crafts,Christmas Baby,Xmas Tree",Easy Christmas Tree Crafts Ideas for toddlers and preschoolers | Sharing Our Experiences,d0b80187-0171-49b2-8ee4-572984244f65
finance,"If you love budgeting, make sure to give Dave Ramsey's 7 Baby Steps a try. Follow these steps to begin your debt snowball, build an emergency fund, invest and reach riches. I ca…",1,26k,https://i.pinimg.com/originals/1e/9d/90/1e9d906e4e150e3b95187f3b76ea7c71.png,5494,image,"Living Low Key | Save Money, Make Money, & Frugal Living",Local save in /data/finance,"Financial Peace,Financial Tips,Saving Money Quotes,Total Money Makeover,Budgeting Finances,Money Management,Wealth Management,Personal Finance,Making Ideas",Dave Ramsey's 7 Baby Steps: What Are They And Will They Work For You,8fb2af68-543b-4639-8119-de33d28706ed
christmas,Here are the best DIY Christmas Centerpieces ideas perfect for your Christmas & holiday season home decor. From Christmas Vignettes to Table Centerpieces.,1,500k,https://i.pinimg.com/originals/aa/6d/0f/aa6d0f44d7c1c96b998cb9aa6c4446b8.png,2418,image,HikenDip,Local save in /data/christmas,"Farmhouse Christmas Decor,Rustic Christmas,Christmas Time,Vintage Christmas,Xmas,Primitive Christmas Crafts,Christmas Vignette,Indoor Christmas Decorations,Diy Christmas Ornaments",100 DIY Christmas Centerpieces You'll Love To Decorate Your Home With For The Christmas Season - Hike n Dip,da8745a6-5160-46c4-877d-181d50a729fd
quotes,summcoco gives you inspiration for the women fashion trends you want. Thinking about a new look or lifestyle? This is your ultimate resource to get the hottest trends. 45 Top Li…,1,306k,https://i.pinimg.com/originals/bb/c0/e6/bbc0e6a797079505f11ac12bcb0b8c66.jpg,7922,image,"Sumcoco | Decor Ideas, Hairstyles, Nails Fashion Advice",Local save in /data/quotes,"Life Quotes Love,Inspirational Quotes About Love,Mood Quotes,Motivational Quotes,Tears Quotes,Quotes About Sadness,Deep Quotes About Life,Quotes Quotes,Quote Life",45 Top Life Quotes School Did Not Teach You,a584581c-1b38-4731-a1cc-f36115ecf229
art,"Use your mini world figures to create this beautiful African sunset. Your kids will love learning about shadows, angles and distortion in this fun art and STEM activity for kids.",1,4k,https://i.pinimg.com/originals/e3/aa/35/e3aa350f8f104d0e59f26d7f17ea7461.png,771,image,Taming Little Monsters - Fun Activities for Kids,Local save in /data/art,"African Art Projects,Cool Art Projects,Projects For Kids,African Art For Kids,African Crafts Kids,Art Club Projects,Art Education Projects,Tracing Art,African Sunset",African Sunset Shadow Tracing Art - Taming Little Monsters,a5021766-a8aa-4dc7-9857-4da6b8e3dc1a
event-planning,"Personalize your event or shop with a customized neon sign. Make a statement with your own custom vibes! This light is 32 -40 inches (80cm-100cm) if you need something bigger, p…",1,111,https://i.pinimg.com/originals/e9/c0/7c/e9c07cf0cf16cab23764a36718ab76c1.jpg,4508,image,Life of Neon | Custom Neon Light Signs | Home Decor Wall Art,Local save in /data/event-planning,"Our Wedding,Wedding Venues,Dream Wedding,Wedding Cakes,Church Wedding,Wedding Flowers,Lace Wedding,Wedding Rings,Wedding Dresses",Custom Event and Shop Neon Sign Lights - Event & Shop,9064f4a2-2753-476c-815e-db360f45a93e
education,"Hi everyone! As a teacher using the Orton-Gillingham approach, I am constantly looking for phonics activities that my students will find fun and engaging. Using Orton-Gillingham…",1,22k,https://i.pinimg.com/originals/58/8e/38/588e380b19942a71a86a69d9c9973d25.png,4076,image,The Literacy Nest,Local save in /data/education,"Literacy Games,Kindergarten Activities,Literacy Centers,Fun Phonics Activities,Listening Activities,Vocabulary Games,Literacy Stations,Letter Activities,Montessori Activities",Phonics Activities Your Kids Will Love - The Literacy Nest,3a52d364-7c04-47cb-a3e5-56d9e2b77528


In [None]:
display(df_user)

age,date_joined,first_name,ind,last_name
27,2016-03-08 13:38:37,Christopher,2015,Bradshaw
39,2016-06-29 20:43:59,Christina,6398,Davenport
20,2015-10-23 04:13:23,Alexandria,3599,Alvarado
20,2015-12-01 15:08:31,Christopher,5076,Butler
39,2017-07-19 07:12:04,Michelle,7790,Gutierrez
49,2016-04-22 20:36:02,Brittany,10509,Thompson
21,2015-11-10 09:27:42,Andrea,8731,Alexander
24,2016-03-31 20:56:39,Austin,8887,Rodriguez
23,2015-12-01 18:15:02,Christine,7768,Cortez
36,2015-12-20 16:38:13,Michelle,4315,Prince



##Updated Data Cleaning Section 

In [None]:
# Datacleaning the df_pin dataframe

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, array, to_timestamp, concat_ws
from pyspark.sql.types import IntegerType

# Create a Spark session
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

# Replace empty entries and entries with no relevant data in each column with Nones
df_pin_cleaned = df_pin.na.replace('', None)

# Perform the necessary transformations on the follower_count column
df_pin_cleaned = df_pin_cleaned.withColumn("follower_count", df_pin_cleaned["follower_count"].cast(IntegerType()))

# Ensure that each column containing numeric data has a numeric data type
numeric_columns = ["downloaded", "index"]
for col_name in numeric_columns:
    df_pin_cleaned = df_pin_cleaned.withColumn(col_name, df_pin_cleaned[col_name].cast("double"))

# Clean the data in the save_location column
df_pin_cleaned = df_pin_cleaned.withColumn("save_location", expr("substring_index(save_location, '/', -1)"))

# Rename the index column to ind
df_pin_cleaned = df_pin_cleaned.withColumnRenamed("index", "ind")

# Reorder the DataFrame columns
column_order_pin = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list",
                    "is_image_or_video", "image_src", "save_location", "category"]

df_pin_cleaned = df_pin_cleaned.select(column_order_pin)

# Get the current timestamp
timestamp_str_pin = spark.sql("SELECT current_timestamp() AS timestamp").collect()[0].timestamp

# Create a new output path with a timestamp
output_path_pin = f"/mnt/diane_mount/cleaned_data/df_pin_cleaned_{timestamp_str_pin}"

# Save the cleaned DataFrame to the new output path
df_pin_cleaned.write.mode("overwrite").parquet(output_path_pin)

# Show the cleaned DataFrame
df_pin_cleaned.show()


In [None]:
#Cleaning the df_geo dataframe

from pyspark.sql.functions import col, array, to_timestamp, expr

# Create a new column 'coordinates' that contains an array based on the latitude and longitude columns
df_geo_cleaned = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# Drop the latitude and longitude columns from the DataFrame
df_geo_cleaned = df_geo_cleaned.drop("latitude", "longitude")

# Convert the timestamp column from a string to a timestamp data type
df_geo_cleaned = df_geo_cleaned.withColumn("timestamp", to_timestamp(col("timestamp")))

# Reorder the DataFrame columns
column_order_geo = ["ind", "country", "coordinates", "timestamp"]
df_geo_cleaned = df_geo_cleaned.select(column_order_geo)

# Get the current timestamp
timestamp_str_geo = spark.sql("SELECT current_timestamp() AS timestamp").collect()[0].timestamp

# Create a new output path with a timestamp
output_path_geo = f"/mnt/diane_mount/cleaned_data/df_geo_cleaned_{timestamp_str_geo}"

# Save the cleaned DataFrame to the new output path
df_geo_cleaned.write.mode("overwrite").parquet(output_path_geo)

# Show the cleaned DataFrame
df_geo_cleaned.show()


In [None]:
#Cleaning the df_user dataframe

from pyspark.sql.functions import col, concat_ws, to_timestamp

# Create a new column 'user_name' that concatenates the information found in the first_name and last_name columns
df_user_cleaned = df_user.withColumn("user_name", concat_ws(" ", col("first_name"), col("last_name")))

# Drop the first_name and last_name columns from the DataFrame
df_user_cleaned = df_user_cleaned.drop("first_name", "last_name")

# Convert the date_joined column from a string to a timestamp data type
df_user_cleaned = df_user_cleaned.withColumn("date_joined", to_timestamp(col("date_joined")))

# Reorder the DataFrame columns
column_order_user = ["ind", "user_name", "age", "date_joined"]
df_user_cleaned = df_user_cleaned.select(column_order_user)

# Get the current timestamp
timestamp_str_user = spark.sql("SELECT current_timestamp() AS timestamp").collect()[0].timestamp

# Create a new output path with a timestamp
output_path_user = f"/mnt/diane_mount/cleaned_data/df_user_cleaned_{timestamp_str_user}"

# Save the cleaned DataFrame to the new output path
df_user_cleaned.write.mode("overwrite").parquet(output_path_user)

# Show the cleaned DataFrame
df_user_cleaned.show()


##Data Transform & Query the DataFrames

In [None]:
#Find the most Popular Catergory in Each Country

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime

# Get current timestamp
current_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

# Join df_pin_cleaned and df_geo_cleaned on 'ind'
joined_df_pin_geo = df_pin_cleaned.join(df_geo_cleaned, 'ind')

# Join the combined dataframe with df_user_cleaned on 'ind'
joined_df = joined_df_pin_geo.join(df_user_cleaned, 'ind')

# Group by 'country' and 'category' and count occurrences
category_counts = joined_df.groupBy('country', 'category').count()

# Rank categories within each country based on count
window_spec = Window.partitionBy('country').orderBy(F.desc('count'))
ranked_df = category_counts.withColumn('category_rank', F.rank().over(window_spec))

# Filter for the top-ranked category in each country
most_popular_category = ranked_df.filter('category_rank == 1').drop('category_rank', 'count')

# Save the DataFrame to the specified mount point with a timestamp
output_path_popular_category = f"/mnt/diane_mount/transformed_data/most_popular_category_{current_timestamp}"
most_popular_category.write.mode("overwrite").parquet(output_path_popular_category)

# Show the DataFrame
most_popular_category.show()


In [None]:
#Find Which was The Most Popular Category Each Year between 2018 and 2022

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime

# Join df_pin_cleaned and df_geo_cleaned on 'ind'
joined_df = df_pin_cleaned.join(df_geo_cleaned, 'ind')

# Extract the year from the timestamp column
df_pin_year = joined_df.withColumn("post_year", F.year("timestamp"))

# Filter for posts between 2018 and 2022
df_pin_filtered = df_pin_year.filter((F.col("post_year") >= 2018) & (F.col("post_year") <= 2022))

# Group by 'post_year', 'category' and count occurrences
category_counts_by_year = df_pin_filtered.groupBy("post_year", "category").count()

# Rank categories within each year based on count
window_spec = Window.partitionBy('post_year').orderBy(F.desc('count'))
ranked_df = category_counts_by_year.withColumn('category_rank', F.rank().over(window_spec))

# Filter for the top-ranked category in each year
most_popular_category_by_year = ranked_df.filter('category_rank == 1').drop('category_rank', 'count')

# Get current timestamp
current_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

# Save the DataFrame to the specified mount point with timestamp
output_path_category_counts_by_year = f"/mnt/diane_mount/transformed_data/category_counts_by_year_{current_timestamp}"
most_popular_category_by_year.write.mode("overwrite").parquet(output_path_category_counts_by_year)

# Show the DataFrame
most_popular_category_by_year.show()




In [None]:
#find the user with the most followers in each country

from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Assuming 'ind' is a common column between df_pin_cleaned and df_geo_cleaned
joined_df = df_pin_cleaned.join(df_geo_cleaned.select("ind", "country"), "ind")

# Now you can use the 'country' column from df_geo_cleaned in df_pin_cleaned
df_pin_cleaned_with_country = joined_df.select("ind", "unique_id", "title", "description", "follower_count",
                                              "poster_name", "tag_list", "is_image_or_video", "image_src",
                                              "save_location", "category", "country")

# Rank users within each country based on follower count
window_spec = Window.partitionBy('country').orderBy(F.desc('follower_count'))
ranked_df = df_pin_cleaned_with_country.withColumn('user_rank', F.rank().over(window_spec))

# Get the user with the most followers in each country
top_user_per_country = ranked_df.filter('user_rank == 1').drop('user_rank')

# Save the DataFrame to the specified mount point with timestamp
output_path_top_user_per_country = f"/mnt/diane_mount/transformed_data/top_user_per_country_{current_timestamp}"
top_user_per_country.write.mode("overwrite").parquet(output_path_top_user_per_country)

# Show the DataFrame
top_user_per_country.show()

# Now, find the country with the user with the most followers
most_followed_country = top_user_per_country.select("country", "follower_count").orderBy(F.desc("follower_count")).limit(1)

# Save the DataFrame to the specified mount point with timestamp
output_path_most_followed_country = f"/mnt/diane_mount/transformed_data/most_followed_country_{current_timestamp}"
most_followed_country.write.mode("overwrite").parquet(output_path_most_followed_country)

# Show the DataFrame
most_followed_country.show()


In [None]:
#Task 7 Find the Most Popular Category for Different Age Groups

from pyspark.sql import functions as F

# Extract age group from the age column
df_user_age_group = df_user_cleaned.withColumn("age_group", F.when((col("age") >= 18) & (col("age") <= 24), "18-24")
                                               .when((col("age") >= 25) & (col("age") <= 35), "25-35")
                                               .when((col("age") >= 36) & (col("age") <= 50), "36-50")
                                               .when(col("age") > 50, "+50")
                                               .otherwise("Unknown"))

# Join df_pin_cleaned and df_user_age_group on 'ind'
joined_df_age_group = df_pin_cleaned.join(df_user_age_group, 'ind')

# Group by 'age_group' and 'category' and count occurrences
category_counts_by_age_group = joined_df_age_group.groupBy('age_group', 'category').count()

# Rank categories within each age group based on count
window_spec_age_group = Window.partitionBy('age_group').orderBy(F.desc('count'))
ranked_df_age_group = category_counts_by_age_group.withColumn('category_rank', F.rank().over(window_spec_age_group))

# Filter for the top-ranked category in each age group
most_popular_category_by_age_group = ranked_df_age_group.filter('category_rank == 1').drop('category_rank', 'count')

# Get current timestamp
current_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

# Save the DataFrame to the specified mount point
output_path_category_counts_by_age_group = f"/mnt/diane_mount/transformed_data/category_counts_by_age_group_{current_timestamp}"
most_popular_category_by_age_group.write.mode("overwrite").parquet(output_path_category_counts_by_age_group)

# Show the DataFrame
most_popular_category_by_age_group.show()


In [None]:
#Find the Median Follower Count for Different Age Groups

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Extract age group from the age column in df_user
df_user_age_group = df_user_cleaned.withColumn("age_group", F.when((col("age") >= 18) & (col("age") <= 24), "18-24")
                                               .when((col("age") >= 25) & (col("age") <= 35), "25-35")
                                               .when((col("age") >= 36) & (col("age") <= 50), "36-50")
                                               .when(col("age") > 50, "+50")
                                               .otherwise("Unknown"))

# Join df_pin_cleaned and df_user_age_group on 'ind'
joined_df_age_group_follower_count = df_pin_cleaned.join(df_user_age_group, 'ind')

# Group by 'age_group' and calculate median follower count
median_follower_count_by_age_group = joined_df_age_group_follower_count.groupBy('age_group') \
    .agg(F.expr('percentile(follower_count, 0.5)').alias('median_follower_count'))

# Get current timestamp
current_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

# Save the DataFrame to the specified mount point
output_path_median_follower_count_by_age_group = f"/mnt/diane_mount/transformed_data/median_follower_count_by_age_group_{current_timestamp}"
median_follower_count_by_age_group.write.mode("overwrite").parquet(output_path_median_follower_count_by_age_group)

# Show the DataFrame
median_follower_count_by_age_group.show()


In [None]:
#Find out How Many Users Have Joined Each Year Between 2015 and 2020

from pyspark.sql.functions import year

# Extract the year from the date_joined column in df_user_cleaned
df_user_joined_year = df_user_cleaned.withColumn("join_year", year("date_joined"))

# Filter for users who joined between 2015 and 2020
df_user_filtered_joined_year = df_user_joined_year.filter((col("join_year") >= 2015) & (col("join_year") <= 2020))

# Group by 'join_year' and count the number of users
number_users_joined_by_year = df_user_filtered_joined_year.groupBy("join_year") \
    .agg(F.count("ind").alias("number_users_joined"))

# Get current timestamp
current_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

# Save the DataFrame to the specified mount point
output_path_number_users_joined_by_year = f"/mnt/diane_mount/transformed_data/number_users_joined_by_year_{current_timestamp}"
number_users_joined_by_year.write.mode("overwrite").parquet(output_path_number_users_joined_by_year)

# Show the DataFrame
number_users_joined_by_year.show()




In [None]:
#Find the median follower count of users have joined between 2015 and 2020.

from pyspark.sql.functions import year, expr

# Join df_user_cleaned and df_pin_cleaned on 'ind'
joined_df_user_pin = df_user_cleaned.join(df_pin_cleaned, 'ind')

# Extract the year from the date_joined column in df_user_cleaned
df_user_joined_year = joined_df_user_pin.withColumn("join_year", year("date_joined"))

# Filter for users who joined between 2015 and 2020
df_user_filtered_joined_year = df_user_joined_year.filter((col("join_year") >= 2015) & (col("join_year") <= 2020))

# Group by 'join_year' and calculate the median follower count
median_follower_count_by_year = df_user_filtered_joined_year.groupBy("join_year") \
    .agg(expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count"))

# Get current timestamp
current_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

# Save the DataFrame to the specified mount point
output_path_median_follower_count_by_year = f"/mnt/diane_mount/transformed_data/median_follower_count_by_year_{current_timestamp}"
median_follower_count_by_year.write.mode("overwrite").parquet(output_path_median_follower_count_by_year)

# Show the DataFrame
median_follower_count_by_year.show()


In [None]:
# Task 11: Find the Median Follower Count of Users based on their joining year and age group

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr, percentile_approx, year

# Get current timestamp
current_timestamp = datetime.now().strftime("%Y%m%d%H%M%S")

# Filter for users who joined between 2015 and 2020
df_user_filtered_joined_year = df_user_cleaned.filter((col("date_joined").between('2015-01-01', '2020-12-31')))

# Define age groups and corresponding labels
age_groups = [
    (col("age") >= 18) & (col("age") <= 24),
    (col("age") >= 25) & (col("age") <= 35),
    (col("age") >= 36) & (col("age") <= 50),
    (col("age") > 50)
]

age_group_labels = ["18-24", "25-35", "36-50", "+50"]

# Use a when statement to categorize users into age groups
for age_condition, label in zip(age_groups, age_group_labels):
    df_user_filtered_joined_year = df_user_filtered_joined_year \
        .withColumn("age_group", when(age_condition, label))

# Extract the year from the timestamp column in df_user_cleaned
df_user_filtered_joined_year = df_user_filtered_joined_year \
    .withColumn("post_year", year("date_joined"))

# Calculate median follower count for each age group and post_year
median_follower_count_by_age_and_year = df_user_filtered_joined_year \
    .join(df_pin_cleaned, 'ind') \
    .groupBy("age_group", "post_year") \
    .agg(expr("percentile_approx(follower_count, 0.5)").alias("median_follower_count"))

# Drop null values if any
median_follower_count_by_age_and_year = median_follower_count_by_age_and_year.na.drop()

# Save the DataFrame to the specified mount point
output_path_median_follower_count_by_age_and_year = f"/mnt/diane_mount/transformed_data/median_follower_count_by_age_and_year_{current_timestamp}"
median_follower_count_by_age_and_year.write.mode("overwrite").parquet(output_path_median_follower_count_by_age_and_year)

# Show the DataFrame
median_follower_count_by_age_and_year.show()

