In [2]:
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import pandas as pd

In [3]:
spark = SparkSession.builder \
    .appName("YouTubeTrendAnalysis") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localstack:4566") \
    .config("spark.hadoop.fs.s3a.access.key", "test") \
    .config("spark.hadoop.fs.s3a.secret.key", "test") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

In [4]:
df = spark.read.parquet("s3a://youtube-trend-data/transformed/")
df.show(12)

+-----------+--------------------------+-----------+-------------+--------+------+--------+------+--------------------+--------------------+
|         id|                     title|category_id|category_name|   views| likes|comments|region|           timestamp|    engagement_ratio|
+-----------+--------------------------+-----------+-------------+--------+------+--------+------+--------------------+--------------------+
|de6UvFKbuZQ|      Em Xinh Say Hi Tậ...|         10|      Unknown| 7304635| 80209|    8624|    VN|2025-06-20T06:52:...|0.010980562341581748|
|Ry1rYSCIfBg|      Cách để uống trà ...|         22|      Unknown| 6454374| 81020|     206|    VN|2025-06-20T06:52:...|0.012552727809079548|
|TuZpmKmF7Ps|      Nắng nóng mà gặp ...|         24|      Unknown| 1966387| 31776|     136|    VN|2025-06-20T06:52:...|0.016159586083512555|
|KjflhsdN170|      $1 vs $1000 Water...|         22|      Unknown|45967089|697859|     801|    VN|2025-06-20T06:52:...|0.015181709679288153|
|OZuZhGxkvD0|

In [5]:
df.select("category_id", "category_name") \
    .distinct() \
    .orderBy("category_id") \
    .show(truncate=False)

NameError: name 'categoryname' is not defined

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when, col

In [None]:
category_mapping = categoryname.filter(
    (F.col("category_name").isNotNull()) & 
    (F.col("category_name") != "Unknown")
).select(
    "category_id", 
    F.col("category_name").alias("valid_category_name")  # Đặt tên mới
).distinct()

In [None]:
category_filled = categoryname.join(
    category_mapping,
    on="category_id",
    how="left"
).withColumn(
    "category_name_cleaned",
    F.when(
        (F.col("category_name").isNull()) | (F.col("category_name") == "Unknown"),
        F.col("valid_category_name")
    ).otherwise(F.col("category_name"))
).drop("category_name").withColumnRenamed("category_name_cleaned", "category_name")

In [None]:
category_filled.select("category_id", "category_name").distinct().show(truncate=False)

In [None]:
category_mapping_df = spark.createDataFrame([
    (28, "Science & Technology"),
    (24, "Entertainment"),
    (19, "Travel & Events"),
    (1,  "Film & Animation"),
    (20, "Gaming"),
    (25, "News & Politics"),
    (10, "Music"),
    (27, "Education"),
    (23, "Comedy"),
    (17, "Sports"),
    (22, "People & Blogs"),
    (26, "Howto & Style"),
    (15, "Pets & Animals"),
    (2, "Autos & Vehicles")
], ["category_id", "category_name_mapped"])

In [None]:
df = df.join(
    category_mapping_df,
    on="category_id",
    how="left"
).withColumn(
    "category_name",
    when(col("category_name") == "Unknown", col("category_name_mapped"))
    .otherwise(col("category_name"))
).drop("category_name_mapped")

In [None]:
df.select("category_id", "category_name").distinct().show(truncate=False)

In [None]:
df.show()

In [None]:
df.createOrReplaceTempView("youtube_trends")

In [None]:
count_videos_region = spark.sql("""SELECT region, COUNT(*) as video_count
FROM youtube_trends
GROUP BY region""").show()

In [None]:
engagement_by_region = spark.sql("""
    SELECT region, AVG(engagement_ratio) as avg_engagement_ratio
    FROM youtube_trends
    WHERE views > 0
    GROUP BY region
    ORDER BY avg_engagement_ratio DESC
""")

In [None]:
top_categories = spark.sql("""
    SELECT category_name, AVG(views) as avg_views
    FROM youtube_trends
    GROUP BY category_name
    ORDER BY avg_views DESC
    LIMIT 5
""")

In [None]:
top_categories_pd = top_categories.toPandas()
engagement_by_region_pd = engagement_by_region.toPandas()

In [None]:
plt.figure(figsize=(10, 6))
plt.bar(top_categories_pd['category_name'], top_categories_pd['avg_views'], color='skyblue')
plt.xlabel('Category ID')
plt.ylabel('Average Views')
plt.title('Top 5 Categories by Average Views')
plt.xticks(rotation=45)
for i, v in enumerate(top_categories_pd['avg_views']):
    plt.text(i, v, f'{v:,.0f}', ha='center', va='bottom')
plt.tight_layout()
plt.show()

In [None]:
# Phân tích lượt xem trung bình theo danh mục
avg_views = spark.sql("""
    SELECT category_name, region, AVG(views) as avg_views
    FROM youtube_trends
    GROUP BY category_name, region
    ORDER BY avg_views DESC
""")