## Set up

In [1]:
# INSTALL FINDSPARK
!pip install pyspark findspark -q

In [2]:
# IMPORT LIBRARIES
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from functools import reduce
import warnings
warnings.filterwarnings('ignore')

print("Imports successful")

Imports successful


## Khởi tạo Spark Session

In [3]:
# INITIALIZE SPARK SESSION
import tempfile

spark = SparkSession.builder \
    .appName("YouTubePreprocessing") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.warehouse.dir", tempfile.gettempdir()) \
    .config("spark.ui.enabled", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
print(f"Spark {spark.version} started")

Spark 3.2.1 started


## Đọc dữ liệu

In [4]:
hdfs_path = "hdfs://namenode:9000/data_input" 
    
raw_df = spark.read.csv(
    f"{hdfs_path}/raw_data.csv",
    header=True, inferSchema=True, escape='"', multiLine=True
)

print("RAW DATA OVERVIEW")
raw_df.show(5)

print("VALID VIDEO ROWS (có video_id)")
valid_videos = raw_df.filter(col("video_id").isNotNull() & (col("video_id") != ""))
print(f"Valid videos: {valid_videos.count()} / {raw_df.count()}")
valid_videos.show(5)

=== SAMPLE TRENDING_DATE VALUES ===
+-------------------+
|trending_date      |
+-------------------+
|2021-07-08 00:00:00|
|2021-07-20 00:00:00|
|2021-08-27 00:00:00|
|2022-01-03 00:00:00|
|2022-04-07 00:00:00|
|2023-03-20 00:00:00|
|2023-04-04 00:00:00|
|2023-07-23 00:00:00|
|2021-12-13 00:00:00|
|2022-01-02 00:00:00|
+-------------------+
only showing top 10 rows

RAW DATA OVERVIEW
+-----------+--------------------+-------------------+--------------------+-------------+----------+-------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+--------------------+
|   video_id|               title|        publishedAt|           channelId| channelTitle|categoryId|      trending_date|                tags|view_count| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|         description|
+-----------+--------------------+-------------------+--------------------+-------------+---

## Kiểm tra các giá trị trending_date không hợp lệ

In [5]:
print("INVALID TRENDING_DATE VALUES")
raw_df.select("trending_date").filter(
    col("trending_date").isNotNull() & 
    ~col("trending_date").rlike(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$")
).distinct().show(20, False)

print("COUNT COMPARISON")
valid_dates = raw_df.filter(col("trending_date").rlike(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$")).count()
total_with_dates = raw_df.filter(col("trending_date").isNotNull()).count()
print(f"Valid dates: {valid_dates} / {total_with_dates}")

INVALID TRENDING_DATE VALUES
+-------------------+
|trending_date      |
+-------------------+
|2021-07-08 00:00:00|
|2021-07-20 00:00:00|
|2021-08-27 00:00:00|
|2022-01-03 00:00:00|
|2022-04-07 00:00:00|
|2023-03-20 00:00:00|
|2023-04-04 00:00:00|
|2023-07-23 00:00:00|
|2021-12-13 00:00:00|
|2022-01-02 00:00:00|
|2023-09-02 00:00:00|
|2023-11-27 00:00:00|
|2020-09-05 00:00:00|
|2021-07-06 00:00:00|
|2022-05-01 00:00:00|
|2022-05-18 00:00:00|
|2022-02-25 00:00:00|
|2022-08-18 00:00:00|
|2023-04-29 00:00:00|
|2023-10-10 00:00:00|
+-------------------+
only showing top 20 rows

COUNT COMPARISON
Valid dates: 0 / 268787


## Khai báo hàm dataframe_info

In [6]:
# Helper function
def dataframe_info(df):
    print(f"{'-'*40}")
    print(f"Số dòng: {df.count()}, Số cột: {len(df.columns)}")
    print(f"{'-'*40}")
    df.printSchema()
    print(f"{'-'*40}")
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

dataframe_info(raw_df)

----------------------------------------
Số dòng: 268787, Số cột: 16
----------------------------------------
root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- description: string (nullable = true)

----------------------------------------
+--------+-----+-----------+---------+------------+----------+-------------+----+----------+-----+--------+-------------+--------------+----------------

## Tiền xử lý dữ liệu

### 1, Xóa các cột không cần thiết

In [7]:
preprocessed_data = raw_df.drop('thumbnail_link', 'comments_disabled', 'video_error_or_removed', 'ratings_disabled')
dataframe_info(preprocessed_data)

----------------------------------------
Số dòng: 268787, Số cột: 13
----------------------------------------
root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- description: string (nullable = true)

----------------------------------------
+--------+-----+-----------+---------+------------+----------+-------------+----+----------+-----+--------+-------------+-----------+
|video_id|title|publishedAt|channelId|channelTitle|categoryId|trending_date|tags|view_count|likes|dislikes|comment_count|description|
+--------+-----+-----------+-

### 2, Xóa các hàng có tất cả giá trị là Null

In [8]:
preprocessed_data = preprocessed_data.filter(
    reduce(lambda a, b: a | b, (col(c).isNotNull() for c in preprocessed_data.columns))
)
dataframe_info(preprocessed_data)

----------------------------------------
Số dòng: 268787, Số cột: 13
----------------------------------------
root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- description: string (nullable = true)

----------------------------------------
+--------+-----+-----------+---------+------------+----------+-------------+----+----------+-----+--------+-------------+-----------+
|video_id|title|publishedAt|channelId|channelTitle|categoryId|trending_date|tags|view_count|likes|dislikes|comment_count|description|
+--------+-----+-----------+-

In [9]:
dataframe_info(preprocessed_data)

----------------------------------------
Số dòng: 268787, Số cột: 13
----------------------------------------
root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- description: string (nullable = true)

----------------------------------------
+--------+-----+-----------+---------+------------+----------+-------------+----+----------+-----+--------+-------------+-----------+
|video_id|title|publishedAt|channelId|channelTitle|categoryId|trending_date|tags|view_count|likes|dislikes|comment_count|description|
+--------+-----+-----------+-

### 3, Xóa các hàng có trending_date sai định dạng

In [10]:
# Lọc dữ liệu video hợp lệ - chỉ giữ những dòng có trending_date đúng format
print("Before filtering:")
print(f"Total rows: {preprocessed_data.count()}")

# Lọc chỉ những dòng có trending_date đúng format ISO timestamp
preprocessed_data = preprocessed_data.filter(
    col("trending_date").rlike(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$")
)

print("After filtering:")
print(f"Valid rows: {preprocessed_data.count()}")
dataframe_info(preprocessed_data)

Before filtering:
Total rows: 268787
After filtering:
Valid rows: 268787
----------------------------------------
Số dòng: 268787, Số cột: 13
----------------------------------------
root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- description: string (nullable = true)

----------------------------------------
+--------+-----+-----------+---------+------------+----------+-------------+----+----------+-----+--------+-------------+-----------+
|video_id|title|publishedAt|channelId|channelTitle|categoryId|trending_date|tags|view_coun

### 4, Điền giá trị Null cho description

In [11]:
preprocessed_data = preprocessed_data.fillna({"description": "No description"})
dataframe_info(preprocessed_data)

----------------------------------------
Số dòng: 268787, Số cột: 13
----------------------------------------
root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- description: string (nullable = false)

----------------------------------------
+--------+-----+-----------+---------+------------+----------+-------------+----+----------+-----+--------+-------------+-----------+
|video_id|title|publishedAt|channelId|channelTitle|categoryId|trending_date|tags|view_count|likes|dislikes|comment_count|description|
+--------+-----+-----------+

### 5, Chuẩn hóa dữ liệu

# Chuyển đổi timestamp - SỬA FORMAT CHO ĐÚNG
preprocessed_data = preprocessed_data.withColumn('trending_date', to_timestamp('trending_date', "yyyy-MM-dd'T'HH:mm:ss'Z'"))
preprocessed_data = preprocessed_data.withColumn('publishedAt', to_timestamp('publishedAt', "yyyy-MM-dd'T'HH:mm:ss'Z'"))

dataframe_info(preprocessed_data)

### 6, Thêm cột category_name

In [23]:
df_json = spark.read.json(f"{hdfs_path}/category_id.json", multiLine=True)
df_categories = df_json.select(explode(col("items")).alias("item")) \
                        .select(col("item.id").alias("categoryId"), 
                                col("item.snippet.title").alias("category_name"))
    
preprocessed_data = preprocessed_data.join(df_categories, preprocessed_data.categoryId == df_categories.categoryId, "left") \
            .drop(df_categories.categoryId)

In [24]:
preprocessed_data.show(5)

+-----------+--------------------+-------------------+--------------------+-------------+----------+-------------------+--------------------+----------+------+--------+-------------+--------------------+--------------+
|   video_id|               title|        publishedAt|           channelId| channelTitle|categoryId|      trending_date|                tags|view_count| likes|dislikes|comment_count|         description| category_name|
+-----------+--------------------+-------------------+--------------------+-------------+----------+-------------------+--------------------+----------+------+--------+-------------+--------------------+--------------+
|3C66w5Z0ixs|I ASKED HER TO BE...|2020-08-11 19:20:14|UCvtRTOMP2TqYqu51...|     Brawadis|        22|2020-08-12 00:00:00|brawadis|prank|ba...|   1514614|156908|    5855|        35313|SUBSCRIBE to BRAW...|People & Blogs|
|M9Pmf9AB4Mo|Apex Legends | St...|2020-08-11 17:00:10|UC0ZV6M2THA81QT9h...| Apex Legends|        20|2020-08-12 00:00:00|Apex

## Lưu dữ liệu đã xử lý

In [None]:
# # Convert timestamps to string để tránh lỗi khi save
# preprocessed_save = preprocessed_data.withColumn('trending_date', 
#     date_format('trending_date', 'yyyy-MM-dd HH:mm:ss')) \
#     .withColumn('publishedAt', 
#     date_format('publishedAt', 'yyyy-MM-dd HH:mm:ss'))

# ML_save = ML_data.withColumn('trending_date', 
#     date_format('trending_date', 'yyyy-MM-dd HH:mm:ss')) \
#     .withColumn('publishedAt', 
#     date_format('publishedAt', 'yyyy-MM-dd HH:mm:ss'))

# # Lưu files
# preprocessed_save.toPandas().to_csv('./data/preprocessed_data.csv', index=False)
# ML_save.toPandas().to_csv('./data/ml_data.csv', index=False)
# print("Dữ liệu đã được lưu")

In [27]:
output_path = "hdfs://namenode:9000/data_input/preprocessed_data.csv"

print(f"Đang lưu dữ liệu đã xử lý vào HDFS tại: {output_path}...")

preprocessed_data.write \
  .mode('overwrite') \
  .parquet(output_path)

print("✅ Đã lưu thành công file Parquet!")

Đang lưu dữ liệu đã xử lý vào HDFS tại: hdfs://namenode:9000/data_input/preprocessed_data.csv...
✅ Đã lưu thành công file Parquet!


In [None]:
# Stop Spark
spark.stop()