In [77]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import pyspark.sql
from pyspark.sql.types import *
import datetime
import pyspark.sql.functions as F
import plotly.graph_objects as go


## Duration trends

In [78]:
spark:SparkSession= SparkSession.builder.getOrCreate()

v_schema = StructType([
StructField('video_id', StringType()),
StructField('channel_id', StringType()),
StructField('title', StringType()),
StructField('video_type', StringType()),
StructField('published_date', DateType()),
StructField('published_time', TimestampType()),
StructField('duration', StringType()),

])

vl_schema = StructType([
StructField('video_id', StringType()),
StructField('view_count', IntegerType()),
StructField('like_count', IntegerType()),
StructField('comment_count', IntegerType()),
StructField('created_date', DateType()),
StructField('created_at', TimestampType()),
])
# video_id,channel_id,title,video_type,published_date,published_time,duration,thumbnail_url,description,tags,categoryid,created_at,active
v = spark.read.option("multiLine", "true").csv('../sample_data/video.csv',header=True,schema=v_schema)
vl = spark.read.option("multiLine", "true").csv('../sample_data/video_log.csv',header=True,schema=vl_schema)

In [118]:
video = vl.join(v.filter(v.video_type=='video').select(
    ['video_id','published_date','duration']),
    on='video_id',how='left')

In [119]:
video = video.sort('created_date',ascending=False
                   ).drop_duplicates(subset=['video_id'])
video = video.withColumn('day_since_published',
                         video.created_date-video.published_date)
video = video.filter(video.day_since_published >= datetime.timedelta(days=7))
video = video.withColumn("duration_list", F.split(F.col('duration'),':')).withColumn(
    'duration_hour',F.col('duration_list')[0].cast('int')).withColumn(
    'duration_minute',F.col('duration_list')[1].cast('int'))
video = video.drop('duration_list','duration')
video = video.filter(video.duration_hour ==0)


In [120]:
video = video.join(video.groupBy('duration_minute').count(),'duration_minute')
video = video.filter(F.col('count')>20)

In [121]:
pd_video = video.toPandas()

24/03/26 09:40:47 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 13, schema size: 7
CSV file: file:///Users/Harry/Documents/GitHub/top_youtuber_dashboard_tw/sample_data/video.csv


In [126]:
video_count = video.drop_duplicates(subset=['duration_minute']
                                 ).select(['duration_minute','count']
                                 ).toPandas()

24/03/26 09:44:44 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 13, schema size: 7
CSV file: file:///Users/Harry/Documents/GitHub/top_youtuber_dashboard_tw/sample_data/video.csv


In [135]:
avg_view = video.groupBy('duration_minute').mean().sort('duration_minute').select(
    ['duration_minute','avg(view_count)']).toPandas()

24/03/26 09:48:16 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 13, schema size: 7
CSV file: file:///Users/Harry/Documents/GitHub/top_youtuber_dashboard_tw/sample_data/video.csv


In [137]:
trace1 = go.Bar(x = video_count['duration_minute'],
                     y=video_count['count'],
                     name='Number of video',
                     yaxis='y1',
                     marker=dict(color='#1D1B86')
                     )
trace2 = go.Scatter(x=avg_view['duration_minute'],
                      y=avg_view['avg(view_count)'],
                      name='Video views median',
                      yaxis='y2',
                      mode='lines+markers',
                      marker_size=8
                      )
data = [trace1, trace2]
layout = go.Layout(title='Video performance base on duration (min)',
                    yaxis=dict(title='num'),
                    yaxis2=dict(title='views',
                                overlaying='y',
                                side='right'),
                    template="plotly_dark")
fig = go.Figure(data=data,layout=layout,)

fig.update_layout(legend=dict(
    y=1.15,
    x=0.75),
    autosize=False,
    width=800,
    height=500
    )

fig.show()

In [710]:
fig.write_image('images/duration_view_relation.png')
# fig.write_html('images/duration_view_relation.html')