# Import thư viện.

In [None]:
# Tran Hong Dang.
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Đặt seed ngẫu nhiên.
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed

# Khởi tạo Spark Session.

In [None]:
spark = SparkSession.builder.master("local[2]").appName("Youtube-Data-Analysis").getOrCreate()
spark

In [None]:
sc = spark.sparkContext
sc

# Đọc file dữ liệu và tổng quan.

In [None]:
time_df = spark.read.csv("./data/preprocessed_data.csv", header=True, inferSchema=True).cache()
time_df.take(5)

In [None]:
time_df.show(5)

In [None]:
# show the schema of the dataframe
time_df.printSchema()

## Format lại dữ liệu ngày giờ.

In [None]:
youtube_df = time_df.withColumn("publish_time", F.to_timestamp("publish_time"))
# Tách publish_time thành publish_time và publish_time_only
youtube_df = time_df.withColumn("publish_date", F.to_date("publish_time")) \
             .withColumn("publish_time_only", F.date_format("publish_time", "HH:mm:ss"))

In [None]:
# Tách publish_time thành publish_time và publish_time_only
youtube_df = time_df.withColumn("publish_date", F.to_date("publish_time")) \
             .withColumn("publish_time_only", F.date_format("publish_time", "HH:mm:ss"))

In [None]:
youtube_df.show(5)

In [None]:
youtube_df.printSchema()

In [None]:
# Visualization
from IPython.core.interactiveshell import InteractiveShell
import seaborn as sns
import matplotlib.pyplot as plt
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [None]:
views_time_df = youtube_df.select("video_id", "publish_date", "views", "likes", "comment_count", "trending_date", "publish_time_only")
views_time_df.show(5)

# Phân tích.

In [None]:
# Xếp hạng số view khi vào trending_date của mỗi video
from pyspark.sql.window import Window
# Tạo một cửa sổ để nhóm theo video_id và sắp xếp theo trending_date tăng dần
window_spec = Window.partitionBy("video_id").orderBy("trending_date")
# Thêm cột rank để xác định hàng đầu tiên cho mỗi video_id
ranked_df = views_time_df.withColumn("rank", F.row_number().over(window_spec))
# Lọc chỉ giữ lại hàng đầu tiên của mỗi video_id
first_trending_df = ranked_df.filter(F.col("rank") == 1).drop("rank")
# df chứa thông tin khi video vừa vào trending
first_trending_df.orderBy(F.desc("views")).limit(100).show(30)

## Lượng views khi bắt đầu trending.

In [None]:
import matplotlib.ticker as mtick
from matplotlib.ticker import FuncFormatter

# Plot giữa số view của các video khi vào trending_date ngày đầu xếp giảm dần lấy 100 cái đầu
first_trending_plot = first_trending_df.orderBy(F.desc("views")).limit(50).select("trending_date", "views").toPandas()
ax = first_trending_plot.plot.bar(x="trending_date", y = 'views', figsize=(20, 9))
ax.yaxis.set_major_formatter(mtick.FuncFormatter(lambda x, _: f'{x:,.0f}')) 
ax.set_xlabel('Trending Date')
ax.set_ylabel('Number of Views')

## Số videos trending ngay trong ngày đăng.

#### Tính thời gian từ khi upload đến khi trending.

In [None]:
first_trending_df = first_trending_df.withColumn("days_since_upload", F.datediff(F.col("trending_date"), F.col("publish_date")))
first_trending_df.show(10)

In [None]:
# Đếm số videos.
first_trending_df.count()

In [None]:
# đếm số lượng video upload được vào trending luôn
publish_trending = first_trending_df.filter(F.col("days_since_upload") == 0).count()
print(publish_trending)

#### Plot biểu đồ.

In [None]:
publish_trending = first_trending_df.filter(F.col("days_since_upload") == 0).orderBy(F.asc("trending_date"))
ax = publish_trending.select("trending_date", "views").toPandas().plot.line(x= "trending_date", y = 'views', figsize=(20, 9))
ax.yaxis.set_major_formatter(mtick.FuncFormatter(lambda x, _: f'{x:,.0f}')) 
ax.set_xlabel('Trending Date')
ax.set_ylabel('Number of Views')

## Giờ đăng tải có số video trending nhiều nhất.

#### Tạo cột giá trị giờ (hour).

In [None]:
from pyspark.sql.functions import substring
hour_only = first_trending_df.withColumn("hour", substring(col("publish_time_only"), 1, 2))
hour_only.show(5)

#### Đếm số videos đăng nhóm theo giờ đăng.

In [None]:
hour_trending = hour_only.groupBy("hour").count().orderBy(F.asc("hour"))
hour_trending.show(10)

#### Plot biểu đồ.

In [None]:
# Plot hour_trending
ax = hour_trending.toPandas().plot.bar(x="hour", y="count", figsize = (20, 9))
ax.set_xlabel('Hours')
ax.set_ylabel('Number of Videos')

## Giờ đăng để trending nhanh nhất.

In [None]:
# Plot upload giờ nào thì vào trending NHANH nhất, bé hơn hoặc bằng 5 ngày
max_days = hour_only.filter(F.col("days_since_upload") <= 5).groupBy("hour").count().orderBy(F.asc("hour"))
ax = max_days.toPandas().plot.bar(x="hour", y="count")
ax.set_xlabel('Hours')
ax.set_ylabel('Number of Fast-trending Videos')

In [None]:
# Phân tích thời gian tồn tại trên trending dựa trên thời điểm upload
from pyspark.sql.functions import countDistinct
trending_duration_df = views_time_df.groupBy("publish_time_only").agg(
    countDistinct("trending_date").alias("days_on_trending")
)
trending_duration_df = trending_duration_df.orderBy(F.desc("days_on_trending")).limit(100)
ax = trending_duration_df.toPandas().plot.line(x="publish_time_only", y="days_on_trending")
ax.set_xlabel('Publish Date')
ax.set_ylabel('Trending Durations')

## Giờ đăng để trending lâu nhất.

In [None]:
# Thời gian nào trong ngày upload thì được trên trending LÂU nhất, dùng hour cho dễ, lâu được tính là các video trên trending > 50 ngày.
hour_trending_duration = trending_duration_df.withColumn("hour", substring(col("publish_time_only"), 1, 2))
hour_trending_duration = hour_trending_duration.filter(F.col("days_on_trending") > 50).groupBy("hour").count()
hour_trending_duration = hour_trending_duration.toPandas().sort_values(by='hour')

ax = hour_trending_duration.plot.bar(x="hour")
ax.set_xlabel('Hours')
ax.set_ylabel('Number of Long-trending Videos')