In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("YTTrendingAnalysis").getOrCreate()

base_path = "s3://msbx5420-2025/teams/group2/project/"
countries = ['US', 'IN', 'GB']

# Function to load and join video & category data
def load_country_data(country_code):
    videos_path = f"{base_path}{country_code}videos.csv"
    category_path = f"{base_path}{country_code}_category_id.json"
    
    df = spark.read.option("header", True).csv(videos_path)
    cat = spark.read.option("multiline", True).json(category_path)
    
    cat_exploded = cat.withColumn("items", F.explode("items"))
    cat_df = cat_exploded.selectExpr("items.id as category_id", "items.snippet.title as category_name")

    df = df.join(cat_df, on="category_id", how="left")
    df = df.withColumn("country", F.lit(country_code))
    return df

# Load all
df_all = load_country_data("US").union(load_country_data("IN")).union(load_country_data("GB"))
df_all.cache().count()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

129960

In [4]:
df_all.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-----------+-------------+--------------------+--------------------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+--------------------+-------+
|category_id|   video_id|trending_date|               title|       channel_title|        publish_time|                tags|  views| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|video_error_or_removed|         description|       category_name|country|
+-----------+-----------+-------------+--------------------+--------------------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+--------------------+-------+
|         22|2kyS6SvSYSE|     17.14.11|WE WANT TO TALK A...|        CaseyNeistat|2017-11-13T17:13:...|     SHANtell martin| 748

In [7]:
from pyspark.sql.functions import to_date, col

# Convert numeric columns
df_all= df_all.withColumn("views", col("views").cast("long")) \
       .withColumn("likes", col("likes").cast("long")) \
       .withColumn("dislikes", col("dislikes").cast("long")) \
       .withColumn("comment_count", col("comment_count").cast("long"))

# Convert trending_date to proper format (it’s in dd.mm.yy format)
df_all= df_all.withColumn("trending_date", to_date("trending_date", "yy.dd.MM"))

# Convert publish_time to date (only the date part)
df_all = df_all.withColumn("publish_date", to_date(col("publish_time").substr(1, 10), "yyyy-MM-dd"))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
%%spark
from pyspark.sql.functions import datediff, col, round, avg

# Calculate days to trend
df_all = df_all.withColumn(
    "days_to_trend", datediff("trending_date", "publish_date")
).filter(
    col("days_to_trend").isNotNull() & (col("days_to_trend") >= 0)
)

# ✅ Group by category_name (not category)
avg_days_by_category = df_all.groupBy("category_name").agg(
    round(avg("days_to_trend"), 2).alias("avg_days_to_trend")
).orderBy("avg_days_to_trend")

avg_days_by_category.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+
|       category_name|avg_days_to_trend|
+--------------------+-----------------+
|              Movies|              3.0|
|               Shows|             4.06|
|Nonprofits & Acti...|             5.39|
|                null|             6.59|
|     News & Politics|             7.61|
|       Howto & Style|              8.7|
|       Entertainment|            10.86|
|      Pets & Animals|            11.71|
|              Gaming|            15.14|
|Science & Technology|            15.19|
|              Comedy|            23.26|
|     Travel & Events|            25.11|
|               Music|            25.33|
|              Sports|            26.01|
|      People & Blogs|            29.44|
|    Film & Animation|            30.57|
|           Education|            39.86|
|    Autos & Vehicles|            39.91|
+--------------------+-----------------+

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("YTTrendingAnalysis").getOrCreate()#.config("spark.executor.memory", "1g").config("spark.driver.memory", "1g").config("spark.executor.instances", "1").getOrCreate()

base_path = "s3://msbx5420-2025/teams/group2/project/"
countries = ['US', 'IN', 'GB']

# Function to load and join video & category data
def load_country_data(country_code):
    videos_path = f"{base_path}{country_code}videos.csv"
    category_path = f"{base_path}{country_code}_category_id.json"
    
    df = spark.read.option("header", True).csv(videos_path)
    cat = spark.read.option("multiline", True).json(category_path)
    
    cat_exploded = cat.withColumn("items", F.explode("items"))
    cat_df = cat_exploded.selectExpr("items.id as category_id", "items.snippet.title as category_name")

    df = df.join(cat_df, on="category_id", how="left")
    df = df.withColumn("country", F.lit(country_code))
    return df

# Load all
df_all = load_country_data("US").union(load_country_data("IN").union(load_country_data("GB")))
df_all.cache().count()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
20,application_1744747065843_0065,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

129960

In [9]:
# 1. Top Trending Channels Per Country (Based on Number of Videos Trending)

from pyspark.sql import functions as F

# Filter nulls and specific countries
filtered_df = df_all.filter(
    (F.col("channel_title").isNotNull()) & 
    (F.col("country").isin("IN", "US", "GB"))
)

# Group by country and channel_title, count, and order
top_creators = (
    filtered_df.groupBy("country", "channel_title")
    .agg(F.count("*").alias("trending_video_count"))
    .orderBy("country", F.desc("trending_video_count"))
)

# Show top creators per country using window function
from pyspark.sql.window import Window

window_spec = Window.partitionBy("country").orderBy(F.desc("trending_video_count"))

ranked_creators = top_creators.withColumn("rank", F.row_number().over(window_spec))
ranked_creators.filter(F.col("rank") <= 3).select("country", "channel_title", "trending_video_count").show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------------------------+--------------------+
|country|channel_title                         |trending_video_count|
+-------+--------------------------------------+--------------------+
|GB     |The Tonight Show Starring Jimmy Fallon|208                 |
|GB     |TheEllenShow                          |207                 |
|GB     |Jimmy Kimmel Live                     |207                 |
|IN     |VikatanTV                             |284                 |
|IN     |etvteluguindia                        |282                 |
|IN     |Flowers Comedy                        |270                 |
|US     |ESPN                                  |203                 |
|US     |The Tonight Show Starring Jimmy Fallon|197                 |
|US     |Vox                                   |193                 |
+-------+--------------------------------------+--------------------+

In [7]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Filter out null categories and include only selected countries
filtered_df = df_all.filter(
    (F.col("category_name").isNotNull()) & 
    (F.col("country").isin("IN", "US", "GB"))
)

# Group by country and category, count how many videos trended
top_categories = (
    filtered_df.groupBy("country", "category_name")
    .agg(F.count("*").alias("trending_video_count"))
    .orderBy("country", F.desc("trending_video_count"))
)

# Rank categories within each country
window_spec = Window.partitionBy("country").orderBy(F.desc("trending_video_count"))

ranked_categories = top_categories.withColumn("rank", F.row_number().over(window_spec))

# Select top 5 categories per country
ranked_categories.filter(F.col("rank") <= 5) \
    .select("country", "category_name", "trending_video_count") \
    .show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------+--------------------+
|country|category_name   |trending_video_count|
+-------+----------------+--------------------+
|GB     |Music           |13754               |
|GB     |Entertainment   |9124                |
|GB     |People & Blogs  |2926                |
|GB     |Film & Animation|2577                |
|GB     |Howto & Style   |1928                |
|IN     |Entertainment   |16712               |
|IN     |News & Politics |5241                |
|IN     |Music           |3858                |
|IN     |Comedy          |3429                |
|IN     |People & Blogs  |2624                |
|US     |Entertainment   |9964                |
|US     |Music           |6472                |
|US     |Howto & Style   |4146                |
|US     |Comedy          |3457                |
|US     |People & Blogs  |3210                |
+-------+----------------+--------------------+

In [8]:
# 2. Compute Engagement Rate per Video (likes + comments / views)

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# Step 1: Compute engagement rate
df_engagement = df_all.withColumn(
    "engagement_rate",
    (col("likes").cast("double") + col("comment_count").cast("double")) / col("views").cast("double")
)

# Step 2: Create window to rank videos within each country by engagement rate
window_spec = Window.partitionBy("country").orderBy(col("engagement_rate").desc())

# Step 3: Rank videos and filter top N per country
df_top_engaged = df_engagement.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 5) \
    .select("country", "title", "channel_title", "category_name", "engagement_rate")

# Step 4: Show result
df_top_engaged.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------------------------------------------------------------------+----------------+--------------------+-------------------+
|country|title                                                                          |channel_title   |category_name       |engagement_rate    |
+-------+-------------------------------------------------------------------------------+----------------+--------------------+-------------------+
|US     |Bruno Mars - Finesse (Remix) [Feat. Cardi B] [Official Video]                  |Bruno Mars      |Music               |0.3259280997264049 |
|US     |j-hope 'Airplane' MV                                                           |ibighit         |Music               |0.29570488840094683|
|US     |Luis Fonsi, Demi Lovato - Échame La Culpa                                      |LuisFonsiVEVO   |Music               |0.294803838814592  |
|US     |5 Seconds Of Summer - Want You Back (Audio)                                    |5SOSVEVO        |Music 

In [11]:
from pyspark.sql.functions import col, avg, round

# Reuse or compute engagement rate again if needed
df_engagement = df_all.withColumn(
    "engagement_rate",
    (col("likes").cast("double") + col("comment_count").cast("double")) / col("views").cast("double")
)

# Filter out any rows where views = 0 or engagement_rate is null/NaN
df_engagement_clean = df_engagement.filter(
    (col("views").isNotNull()) & (col("views") > 0) & (col("engagement_rate").isNotNull())
)

# Group by country and compute average engagement rate as a percentage
avg_engagement_by_country = df_engagement_clean.groupBy("country") \
    .agg(round(avg("engagement_rate") * 100, 2).alias("avg_engagement_rate_percent"))

# Show result
avg_engagement_by_country.orderBy(col("avg_engagement_rate_percent").desc()).show(truncate=False)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------------------+
|country|avg_engagement_rate_percent|
+-------+---------------------------+
|US     |3.89                       |
|GB     |3.8                        |
|IN     |2.46                       |
+-------+---------------------------+

In [9]:
#3. Trend Longevity – How Long Do Videos Stay in Trending?

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Compute number of distinct trending days per video per country
trend_duration = df_all.groupBy("video_id", "title", "channel_title", "country") \
    .agg(F.countDistinct("trending_date").alias("days_trending"))

# Step 2: Define window to rank videos by days_trending within each country
window_spec = Window.partitionBy("country").orderBy(F.desc("days_trending"))

# Step 3: Rank and filter top N per country
trend_duration_ranked = trend_duration.withColumn("rank", F.row_number().over(window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("country", "title", "channel_title", "days_trending")

# Step 4: Show results
trend_duration_ranked.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------------------------------------------------------------------------------+------------------+-------------+
|country|title                                                                                    |channel_title     |days_trending|
+-------+-----------------------------------------------------------------------------------------+------------------+-------------+
|GB     |To Our Daughter                                                                          |Kylie Jenner      |38           |
|GB     |Justin Timberlake’s FULL Pepsi Super Bowl LII Halftime Show! | NFL Highlights            |NFL               |38           |
|GB     |Jurassic World: Fallen Kingdom - Official Trailer #2 [HD]                                |Universal Pictures|38           |
|GB     |Janelle Monáe – Make Me Feel [Official Music Video]                                      |Janelle Monáe     |37           |
|GB     |Maroon 5 - Wait                                             

In [10]:
#4. Cross-Country Trending Videos – Which Videos Went Viral Globally?

#(Which video IDs appear in multiple countries)

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Count how many countries each video trended in
cross_country_counts = df_all.groupBy("video_id", "title") \
    .agg(F.countDistinct("country").alias("countries_trended_in")) \
    .filter("countries_trended_in > 1")

# Step 2: Join back to original DataFrame to get per-country view
df_viral_per_country = df_all.select("video_id", "title", "country").distinct() \
    .join(cross_country_counts, on=["video_id", "title"], how="inner")

# Step 3: Rank within each country by number of countries a video trended in
window_spec = Window.partitionBy("country").orderBy(F.desc("countries_trended_in"))

ranked_viral = df_viral_per_country.withColumn("rank", F.row_number().over(window_spec)) \
    .filter(F.col("rank") <= 5) \
    .select("country", "title", "countries_trended_in")

# Step 4: Show result
ranked_viral.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------------------------------------------------+--------------------+
|country|title                                                    |countries_trended_in|
+-------+---------------------------------------------------------+--------------------+
|GB     |Calvin Harris, Dua Lipa - One Kiss (Official Video)      |3                   |
|GB     |Westworld Season 2 | Official Super Bowl LII Ad | HBO    |3                   |
|GB     |Ellen Makes 'Friends' with BTS!                          |3                   |
|GB     |SPIDER-MAN: INTO THE SPIDER-VERSE - Official Trailer (HD)|3                   |
|GB     |MOWGLI - Official 1st Trailer                            |3                   |
|IN     |Calvin Harris, Dua Lipa - One Kiss (Official Video)      |3                   |
|IN     |Westworld Season 2 | Official Super Bowl LII Ad | HBO    |3                   |
|IN     |Ellen Makes 'Friends' with BTS!                          |3                   |
|IN     |SPIDER-MAN: 

In [15]:
from pyspark.sql.functions import avg, max

# First, aggregate the metrics per unique video ID
video_stats = df_all.groupBy("video_id", "title", "channel_title", "country") \
    .agg(
        avg("likes").cast("double").alias("avg_likes"),
        avg("dislikes").cast("double").alias("avg_dislikes"),
        avg("comment_count").cast("double").alias("avg_comments"),
        avg("views").cast("double").alias("avg_views")
    )

# Then, calculate toxicity indicators
df_toxicity = video_stats.withColumn("dislike_like_ratio",
                                     (F.col("avg_dislikes") + 1) / (F.col("avg_likes") + 1)) \
                         .withColumn("comments_per_view",
                                     (F.col("avg_comments") + 1) / (F.col("avg_views") + 1)) \
                         .orderBy(F.desc("dislike_like_ratio"))

# Show top potentially controversial videos
df_toxicity.select("title", "channel_title", "country",
                   "dislike_like_ratio") \
           .show(5, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------------------------+--------------------+-------+------------------+
|title                                            |channel_title       |country|dislike_like_ratio|
+-------------------------------------------------+--------------------+-------+------------------+
|RollerCoaster Tycoon Start Engine Campaign       |RollerCoaster Tycoon|GB     |29.046770601336306|
|PSA from Chairman of the FCC Ajit Pai            |Daily Caller        |GB     |24.182069302152726|
|PSA from Chairman of the FCC Ajit Pai            |Daily Caller        |US     |23.942588168170314|
|The FCC repeals its net neutrality rules         |Washington Post     |US     |22.354766688993077|
|LuLaRoe Consultant Mocks Those With Special Needs|J S                 |US     |20.71186440677966 |
+-------------------------------------------------+--------------------+-------+------------------+
only showing top 5 rows

In [17]:
#6. Fastest Trending Videos – Which Videos Reached Trending List the Quickest?

#(How fast does a video break into the trending list after being published)
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Compute publish and trending dates, and calculate days to trend
df_speed = df_all.withColumn(
    "publish_date", F.to_date(F.col("publish_time").substr(1, 10), "yyyy-MM-dd")
).withColumn(
    "trending_date", F.to_date("trending_date", "yy.dd.MM")
).withColumn(
    "days_to_trend", F.datediff("trending_date", "publish_date")
)

# Step 2: Filter out any incorrect negative values
df_speed_filtered = df_speed.filter("days_to_trend >= 1")

# Step 3: Define window by country to rank fastest-trending videos
window_spec = Window.partitionBy("country").orderBy("days_to_trend")

# Step 4: Rank and select top N fastest-trending videos per country
fastest_trenders = df_speed_filtered.withColumn("rank", F.row_number().over(window_spec)) \
    .filter(F.col("rank") <= 3) \
    .select("country", "category_name", "channel_title", "days_to_trend")

# Step 5: Show result
fastest_trenders.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------+--------------------------+-------------+
|country|category_name  |channel_title             |days_to_trend|
+-------+---------------+--------------------------+-------------+
|US     |People & Blogs |CaseyNeistat              |1            |
|US     |Entertainment  |LastWeekTonight           |1            |
|US     |Entertainment  |Good Mythical Morning     |1            |
|GB     |Sports         |Salford City Football Club|1            |
|GB     |News & Politics|Cute Girl Videos          |1            |
|GB     |Comedy         |Jimmy Kimmel Live         |1            |
|IN     |News & Politics|HJ NEWS                   |1            |
|IN     |Entertainment  |Filmylooks                |1            |
|IN     |Entertainment  |Top Telugu Media          |1            |
+-------+---------------+--------------------------+-------------+

In [17]:
#CTA
#Use PySpark to detect and quantify promotional tactics used by creators across countries — like asking users to subscribe, click links, buy now, or watch full content. Think of it as identifying calls to action (CTAs) like an AI detective.

from pyspark.sql.functions import lower, col, when, sum as _sum

# Lowercase the text for uniformity
df_cta = df_all.withColumn("desc_lower", lower(col("description")))

# Define multiple CTA signals
df_cta = df_cta \
    .withColumn("cta_subscribe", when(col("desc_lower").contains("subscribe"), 1).otherwise(0)) \
    .withColumn("cta_link", when(col("desc_lower").contains("http"), 1).otherwise(0)) \
    .withColumn("cta_watch_full", when(col("desc_lower").contains("watch full"), 1).otherwise(0)) \
    .withColumn("cta_shop", when(col("desc_lower").rlike("buy now|shop here|order now|get yours"), 1).otherwise(0)) \
    .withColumn("cta_comment", when(col("desc_lower").contains("comment below"), 1).otherwise(0)) \
    .withColumn("cta_like", when(col("desc_lower").contains("smash that like"), 1).otherwise(0))

# Aggregate CTA signals by country
cta_insights = df_cta.groupBy("country").agg(
    _sum("cta_subscribe").alias("Subscribe Mentions"),
    _sum("cta_link").alias("Links to External Sites"),
    _sum("cta_watch_full").alias("Watch Full CTA"),
    _sum("cta_shop").alias("Shop or Buy CTA"),
    _sum("cta_comment").alias("Comment Callouts"),
    _sum("cta_like").alias("Smash That Like CTA")
)

cta_insights.show(10,truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+-----------------------+--------------+---------------+----------------+-------------------+
|country|Subscribe Mentions|Links to External Sites|Watch Full CTA|Shop or Buy CTA|Comment Callouts|Smash That Like CTA|
+-------+------------------+-----------------------+--------------+---------------+----------------+-------------------+
|US     |18749             |35369                  |1090          |424            |256             |0                  |
|IN     |21144             |30565                  |1357          |49             |205             |0                  |
|GB     |13269             |32731                  |933           |381            |109             |0                  |
+-------+------------------+-----------------------+--------------+---------------+----------------+-------------------+

In [18]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Add total CTA score per video
df_cta = df_cta.withColumn(
    "total_ctas",
    col("cta_subscribe") + col("cta_link") + col("cta_watch_full") +
    col("cta_shop") + col("cta_comment") + col("cta_like")
)

# Rank videos with highest CTA usage per country
window_spec = Window.partitionBy("country").orderBy(col("total_ctas").desc())

cta_ranked = df_cta.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 5) \
    .select("country", "title", "channel_title", "total_ctas")

cta_ranked.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------------------------------------------------------------------------+------------------------------------+----------+
|country|title                                                                                |channel_title                       |total_ctas|
+-------+-------------------------------------------------------------------------------------+------------------------------------+----------+
|US     |Flinch w/ BTS                                                                        |The Late Late Show with James Corden|3         |
|US     |Kristen Wiig Struggles with 'Hallelujah'                                             |The Late Late Show with James Corden|3         |
|US     |Saoirse Ronan Knows Why You Love 'Lady Bird'                                         |The Late Show with Stephen Colbert  |3         |
|US     |The Talk - Hilary Duff Dishes on Boyfriend Matthew Koma: 'third times a charm'       |The Talk                            |3   

In [19]:
#CTA heatmap code

from pyspark.sql.functions import lower, col, when, sum as _sum

# Recreate or reuse CTA features
df_cta = df_all.withColumn("desc_lower", lower(col("description")))

df_cta = df_cta \
    .withColumn("cta_subscribe", when(col("desc_lower").contains("subscribe"), 1).otherwise(0)) \
    .withColumn("cta_link", when(col("desc_lower").contains("http"), 1).otherwise(0)) \
    .withColumn("cta_watch_full", when(col("desc_lower").contains("watch full"), 1).otherwise(0)) \
    .withColumn("cta_shop", when(col("desc_lower").rlike("buy now|shop here|order now|get yours"), 1).otherwise(0)) \
    .withColumn("cta_comment", when(col("desc_lower").contains("comment below"), 1).otherwise(0)) \
    .withColumn("cta_like", when(col("desc_lower").contains("smash that like"), 1).otherwise(0))

# Aggregate by country (matrix-style)
cta_heatmap = df_cta.groupBy("country").agg(
    _sum("cta_subscribe").alias("Subscribe"),
    _sum("cta_link").alias("External Link"),
    _sum("cta_watch_full").alias("Watch Full"),
    _sum("cta_shop").alias("Shop"),
    _sum("cta_comment").alias("Comment"),
    _sum("cta_like").alias("Like")
)

# Show or export
cta_heatmap.show(truncate=False)
# Optionally: .toPandas().to_csv("cta_heatmap.csv", index=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+-------------+----------+----+-------+----+
|country|Subscribe|External Link|Watch Full|Shop|Comment|Like|
+-------+---------+-------------+----------+----+-------+----+
|US     |18749    |35369        |1090      |424 |256    |0   |
|IN     |21144    |30565        |1357      |49  |205    |0   |
|GB     |13269    |32731        |933       |381 |109    |0   |
+-------+---------+-------------+----------+----+-------+----+

In [26]:
#country leaderboard
from pyspark.sql.functions import count, col

# Remove null values from relevant columns
df_filtered = df_all.filter(
    col("country").isNotNull() &
    col("category_name").isNotNull() &
    col("channel_title").isNotNull()
)

# Group by country and channel to get leaderboard
df_leaderboard = df_filtered.groupBy("country", "channel_title") \
    .agg(count("video_id").alias("trending_count"))

# Rank creators within each country
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("country").orderBy(col("trending_count").desc())

df_ranked_leaderboard = df_leaderboard.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 10) \
    .select("country", "channel_title", "trending_count")

# Show or export
df_ranked_leaderboard.show(20, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------------------------+--------------+
|country|channel_title                         |trending_count|
+-------+--------------------------------------+--------------+
|GB     |The Tonight Show Starring Jimmy Fallon|208           |
|GB     |TheEllenShow                          |207           |
|GB     |Jimmy Kimmel Live                     |207           |
|GB     |Saturday Night Live                   |206           |
|GB     |WWE                                   |205           |
|GB     |The Late Late Show with James Corden  |202           |
|GB     |Late Night with Seth Meyers           |194           |
|GB     |Breakfast Club Power 105.1 FM         |193           |
|GB     |The Late Show with Stephen Colbert    |189           |
|GB     |Netflix                               |187           |
|IN     |VikatanTV                             |284           |
|IN     |etvteluguindia                        |282           |
|IN     |Flowers Comedy                 

In [29]:
#Videos that spark most conversations
from pyspark.sql.functions import col

# Filter and compute comments per view
df_convo = df_all.filter(
    col("comment_count").isNotNull() & col("views").isNotNull()
).withColumn(
    "comments_per_view",
    (col("comment_count").cast("double") + 1) / (col("views").cast("double") + 1)
)

# Top conversation-sparking videos per country
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec_convo = Window.partitionBy("country").orderBy(col("comments_per_view").desc())

df_top_convo = df_convo.withColumn("rank", row_number().over(window_spec_convo)) \
    .filter(col("rank") <= 5) \
    .select("country", "title", "channel_title", "comments_per_view", "category_name")

df_top_convo.show(10, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------------------------------------------------+---------------+--------------------+--------------+
|country|title                                                     |channel_title  |comments_per_view   |category_name |
+-------+----------------------------------------------------------+---------------+--------------------+--------------+
|US     |FULL FACE OF MAKEUP IM THROWING OUT 2018                  |Laura Lee      |0.11764423640050747 |Howto & Style |
|US     |FULL FACE USING 7-ELEVEN MAKEUP!                          |Manny Mua      |0.11348988218654066 |Howto & Style |
|US     |NEW WET N WILD 'GOTH-O-GRAPHIC' COLLECTION | HIT OR MISS?!|jeffreestar    |0.11157184015397043 |Howto & Style |
|US     |FULL FACE OF MAKEUP IM THROWING OUT 2018                  |Laura Lee      |0.11045803133151602 |Howto & Style |
|US     |NEW WET N WILD 'GOTH-O-GRAPHIC' COLLECTION | HIT OR MISS?!|jeffreestar    |0.10737782394669362 |Howto & Style |
|GB     |A Critique of Star Wars

In [31]:
#Most Polarizing videos
from pyspark.sql.functions import col

# Calculate both toxicity indicators
df_polarizing = df_all.filter(
    col("likes").isNotNull() & col("dislikes").isNotNull() & col("comment_count").isNotNull() & col("views").isNotNull()
).withColumn(
    "dislike_like_ratio",
    (col("dislikes").cast("double") + 1) / (col("likes").cast("double") + 1)
).withColumn(
    "comments_per_view",
    (col("comment_count").cast("double") + 1) / (col("views").cast("double") + 1)
)

# Combine both metrics by summing or multiplying (you can experiment)
df_polarizing = df_polarizing.withColumn(
    "polarization_score",
    col("dislike_like_ratio") * col("comments_per_view")
)

# Top polarizing videos per country
window_spec_polar = Window.partitionBy("country").orderBy(col("polarization_score").desc())

df_top_polarizing = df_polarizing.withColumn("rank", row_number().over(window_spec_polar)) \
    .filter(col("rank") <= 5) \
    .select("country", "title", "dislike_like_ratio", "comments_per_view", "polarization_score")

df_top_polarizing.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------------------------------------------------------------------------------------------+------------------+--------------------+--------------------+
|country|title                                                                                               |dislike_like_ratio|comments_per_view   |polarization_score  |
+-------+----------------------------------------------------------------------------------------------------+------------------+--------------------+--------------------+
|US     |PSA from Chairman of the FCC Ajit Pai                                                               |23.924965311132457|0.024649873247882023|0.5897473623793894  |
|US     |PSA from Chairman of the FCC Ajit Pai                                                               |24.045929018789145|0.024518066911374724|0.5895596966288394  |
|US     |PSA from Chairman of the FCC Ajit Pai                                                               |23.861589888227307|0.024579429