In [1]:
from datetime import datetime
from dateutil.relativedelta import relativedelta, MO, SU

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql import types as t
from pyspark.sql.window import Window as w

In [2]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("SparkDemo")\
    .getOrCreate()

24/08/30 02:38:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Explore data

In [3]:
video_categories_df = spark.read.format("json") \
    .option("multiline","true") \
    .load(f"./data/GB_category_id.json")

In [4]:
video_categories_df.printSchema()

root
 |-- etag: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- etag: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- kind: string (nullable = true)
 |    |    |-- snippet: struct (nullable = true)
 |    |    |    |-- assignable: boolean (nullable = true)
 |    |    |    |-- channelId: string (nullable = true)
 |    |    |    |-- title: string (nullable = true)
 |-- kind: string (nullable = true)



In [6]:
video_categories_df = (
    video_categories_df
        .withColumn("categories", F.explode(F.arrays_zip("items.id", "items.snippet.title")))
        .select(
            col("categories")["id"].alias("id"),
            col("categories")["title"].alias("title"),
        )
)
video_categories_df.limit(5).toPandas().head()

Unnamed: 0,id,title
0,1,Film & Animation
1,2,Autos & Vehicles
2,10,Music
3,15,Pets & Animals
4,17,Sports


In [7]:
videos_df = spark.read.format("csv") \
    .option("multiline", True) \
    .option("sep", ",") \
    .option("header", True) \
    .load("./data/GBvideos.csv")

videos_df.limit(10).toPandas().head()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,Jw1Y-zhQURU,17.14.11,John Lewis Christmas Ad 2017 - #MozTheMonster,John Lewis,26,2017-11-10T07:38:29.000Z,"""christmas""|""john lewis christmas""|""john lewis...",7224515,55681,10247,9479,https://i.ytimg.com/vi/Jw1Y-zhQURU/default.jpg,False,False,False,Click here to continue the story and make your...
1,3s1rvMFUweQ,17.14.11,Taylor Swift: …Ready for It? (Live) - SNL,Saturday Night Live,24,2017-11-12T06:24:44.000Z,"""SNL""|""Saturday Night Live""|""SNL Season 43""|""E...",1053632,25561,2294,2757,https://i.ytimg.com/vi/3s1rvMFUweQ/default.jpg,False,False,False,Musical guest Taylor Swift performs …Ready for...
2,n1WpP7iowLc,17.14.11,Eminem - Walk On Water (Audio) ft. Beyoncé,EminemVEVO,10,2017-11-10T17:00:03.000Z,"""Eminem""|""Walk""|""On""|""Water""|""Aftermath/Shady/...",17158579,787420,43420,125882,https://i.ytimg.com/vi/n1WpP7iowLc/default.jpg,False,False,False,Eminem's new track Walk on Water ft. Beyoncé i...
3,PUTEiSjKwJU,17.14.11,Goals from Salford City vs Class of 92 and Fri...,Salford City Football Club,17,2017-11-13T02:30:38.000Z,"""Salford City FC""|""Salford City""|""Salford""|""Cl...",27833,193,12,37,https://i.ytimg.com/vi/PUTEiSjKwJU/default.jpg,False,False,False,Salford drew 4-4 against the Class of 92 and F...
4,rHwDegptbI4,17.14.11,Dashcam captures truck's near miss with child ...,Cute Girl Videos,25,2017-11-13T01:45:13.000Z,[none],9815,30,2,30,https://i.ytimg.com/vi/rHwDegptbI4/default.jpg,False,False,False,Dashcam captures truck's near miss with child ...


### Check nulls in each column in dataframes

In [8]:
video_categories_df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in video_categories_df.columns]).toPandas()

Unnamed: 0,id,title
0,0,0


In [9]:
videos_df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in videos_df.columns]).toPandas()

                                                                                

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [10]:
videos_df.select(['trending_date']).distinct().head(10)

[Row(trending_date='18.08.05'),
 Row(trending_date='17.20.11'),
 Row(trending_date='17.09.12'),
 Row(trending_date='18.14.02'),
 Row(trending_date='18.20.03'),
 Row(trending_date='18.04.05'),
 Row(trending_date='18.21.02'),
 Row(trending_date='18.30.04'),
 Row(trending_date='18.06.05'),
 Row(trending_date='18.11.02')]

In [11]:
def df_print(df, n_rows=5):
    return df.limit(n_rows).toPandas().head(n_rows)


def convert_df_to_column_array(old_df, from_col, to_col):
    new_df = (
        old_df
            .agg(F.collect_list(col(from_col)).alias(to_col))
            .select([to_col])
    )
    return new_df

## Query 1

**Description:** Find Top 10 videos that were amongst the trending videos for the highest
number of days (it doesn't need to be a consecutive period of time).
You should also include information about different metrics for each day
the video was trending.

In [12]:
# Create initial dataframe for the query
df = videos_df.select(
    col('video_id'),
    col("title"),
    col("description"),
    F.struct(
        F.to_date(F.from_unixtime(F.unix_timestamp(col("trending_date"), "yy.dd.MM"))).alias("date"), 
        col("likes").cast(t.LongType()).alias("likes"), 
        col("dislikes").cast(t.LongType()).alias("dislikes"),
        col("views").cast(t.LongType()).alias("views")
    ).alias('trending_day')
)

# Get top 10 most trending video ids
most_trending_df = df.groupBy(df.video_id)\
                        .agg(F.count('video_id').alias('num_trending_days'))\
                        .sort(col('num_trending_days').desc()).limit(10)
df_print(most_trending_df, 10)

Unnamed: 0,video_id,num_trending_days
0,2z3EUY1aXdY,38
1,NooW_RbfdWI,38
2,Il-an3K9pjg,38
3,BhIEIO0vaBE,38
4,u_C4onVrr8U,38
5,8h--kFui1JA,37
6,tGRzz0oqgUE,37
7,5GHXEGz3PJg,37
8,tsp7IOr7Q9A,37
9,dzxFdtWmjto,37


In [13]:
# Get title, description and trending_day for each video from top 10
detailed_most_trending_df = most_trending_df.alias('df1').join(
    df.alias('df2'), col('df1.video_id') == col('df2.video_id'), 'inner')\
        .select(
            col('df1.video_id').alias('video_id'),
            col('num_trending_days'),
            col('title'),
            col('description'),
            col('trending_day')
        )
df_print(detailed_most_trending_df)

Unnamed: 0,video_id,num_trending_days,title,description,trending_day
0,BhIEIO0vaBE,38,To Our Daughter,Directed by Tyler Ross @wttyler\nMusic by Jaco...,"(2018-02-05, 0, 0, 20921796)"
1,NooW_RbfdWI,38,Jurassic World: Fallen Kingdom - Official Trai...,Jurassic World: Fallen Kingdom \nIn Theaters J...,"(2018-02-05, 61833, 1416, 1999326)"
2,2z3EUY1aXdY,38,Justin Timberlake’s FULL Pepsi Super Bowl LII ...,Justin Timberlake breaks it down during the Pe...,"(2018-02-05, 50251, 15239, 2027569)"
3,BhIEIO0vaBE,38,To Our Daughter,Directed by Tyler Ross @wttyler\nMusic by Jaco...,"(2018-02-06, 0, 0, 35832484)"
4,2z3EUY1aXdY,38,Justin Timberlake’s FULL Pepsi Super Bowl LII ...,Justin Timberlake breaks it down during the Pe...,"(2018-02-06, 123302, 39422, 8313413)"


In [14]:
# Find the latest day video statistics
window = w.partitionBy("video_id").orderBy(col("trending_day.date").desc())
latest_day_info_df = detailed_most_trending_df.withColumn("rank", F.row_number().over(window)) \
                        .where(col("rank") == 1) \
                        .select(
                            col("video_id"), 
                            col("title"), 
                            col("description"), 
                            col("trending_day.likes").alias("latest_likes"), 
                            col("trending_day.dislikes").alias("latest_dislikes"),
                            col("trending_day.views").alias("latest_views")
                        )
df_print(latest_day_info_df, 15)

Unnamed: 0,video_id,title,description,latest_likes,latest_dislikes,latest_views
0,2z3EUY1aXdY,Justin Timberlake’s FULL Pepsi Super Bowl LII ...,Justin Timberlake breaks it down during the Pe...,169320,54005,14251760
1,5GHXEGz3PJg,Florence + The Machine - Hunger,HungerDirected by AG RojasProduced by Park Pic...,171071,3972,12293782
2,8h--kFui1JA,Sam Smith - Pray (Official Video) ft. Logic,"Stream, Download and Listen to Pray feat. Logi...",370027,9680,22999573
3,BhIEIO0vaBE,To Our Daughter,Directed by Tyler Ross @wttyler\nMusic by Jaco...,0,0,62338362
4,Il-an3K9pjg,Anne-Marie - 2002 [Official Video],Get 2002 by Anne-Marie HERE ▶ http://ad.gt/200...,394830,8892,29641412
5,NooW_RbfdWI,Jurassic World: Fallen Kingdom - Official Trai...,Jurassic World: Fallen Kingdom \nIn Theaters J...,275268,7806,24293173
6,dzxFdtWmjto,VENOM - Official Teaser Trailer (HD),Watch the #Venom teaser trailer now. 10.5.18.\...,219113,25700,16416756
7,tGRzz0oqgUE,Janelle Monáe – Make Me Feel [Official Music V...,“Make Me Feel” and “Django Jane” available now...,150947,9745,8585663
8,tsp7IOr7Q9A,BHAD BHABIE feat. Lil Yachty - Gucci Flip Flop...,BHAD BHABIE Gucci Flip Flops ft. Lil Yachty ⛵️...,873431,119401,50122125
9,u_C4onVrr8U,Miguel - Come Through and Chill (Official Vide...,“Come Through and Chill” ft. J. Cole out now! ...,152765,3266,9657370


In [15]:
# Get all trending days for top 10 videos
trending_days_df = latest_day_info_df.alias('df1')\
                            .join(df.alias('df2'),
                                    col('df1.video_id') == col('df2.video_id'), how='inner')\
                            .select(
                                col('df1.video_id').alias('video_id'),
                                col('trending_day')
                            )
trending_days_df = trending_days_df.groupBy("video_id")\
                                    .agg(F.collect_list(col("trending_day")).alias("trending_days"))
df_print(trending_days_df, 20)

Unnamed: 0,video_id,trending_days
0,2z3EUY1aXdY,"[(2018-03-14, 169320, 54005, 14251760), (2018-..."
1,5GHXEGz3PJg,"[(2018-06-09, 171071, 3972, 12293782), (2018-0..."
2,8h--kFui1JA,"[(2018-06-14, 370027, 9680, 22999573), (2018-0..."
3,BhIEIO0vaBE,"[(2018-03-14, 0, 0, 62338362), (2018-03-13, 0,..."
4,Il-an3K9pjg,"[(2018-06-14, 394830, 8892, 29641412), (2018-0..."
5,NooW_RbfdWI,"[(2018-03-14, 275268, 7806, 24293173), (2018-0..."
6,dzxFdtWmjto,"[(2018-03-17, 219113, 25700, 16416756), (2018-..."
7,tGRzz0oqgUE,"[(2018-03-31, 150947, 9745, 8585663), (2018-03..."
8,tsp7IOr7Q9A,"[(2018-06-07, 873431, 119401, 50122125), (2018..."
9,u_C4onVrr8U,"[(2018-06-01, 152765, 3266, 9657370), (2018-05..."


In [16]:
# Join and get final results
final_df = latest_day_info_df.alias('df1')\
                        .join(trending_days_df.alias('df2'),
                              col('df1.video_id') == col('df2.video_id'))\
                        .select(
                            col('df1.video_id').alias('video_id'),
                            col('title'),
                            col('description'),
                            col('latest_views'),
                            col('latest_likes'),
                            col('latest_dislikes'),
                            col('trending_days')
                        )
df_print(final_df, 20)

Unnamed: 0,video_id,title,description,latest_views,latest_likes,latest_dislikes,trending_days
0,2z3EUY1aXdY,Justin Timberlake’s FULL Pepsi Super Bowl LII ...,Justin Timberlake breaks it down during the Pe...,14251760,169320,54005,"[(2018-03-14, 169320, 54005, 14251760), (2018-..."
1,5GHXEGz3PJg,Florence + The Machine - Hunger,HungerDirected by AG RojasProduced by Park Pic...,12293782,171071,3972,"[(2018-06-09, 171071, 3972, 12293782), (2018-0..."
2,8h--kFui1JA,Sam Smith - Pray (Official Video) ft. Logic,"Stream, Download and Listen to Pray feat. Logi...",22999573,370027,9680,"[(2018-06-14, 370027, 9680, 22999573), (2018-0..."
3,BhIEIO0vaBE,To Our Daughter,Directed by Tyler Ross @wttyler\nMusic by Jaco...,62338362,0,0,"[(2018-03-14, 0, 0, 62338362), (2018-03-13, 0,..."
4,Il-an3K9pjg,Anne-Marie - 2002 [Official Video],Get 2002 by Anne-Marie HERE ▶ http://ad.gt/200...,29641412,394830,8892,"[(2018-06-14, 394830, 8892, 29641412), (2018-0..."
5,NooW_RbfdWI,Jurassic World: Fallen Kingdom - Official Trai...,Jurassic World: Fallen Kingdom \nIn Theaters J...,24293173,275268,7806,"[(2018-03-14, 275268, 7806, 24293173), (2018-0..."
6,dzxFdtWmjto,VENOM - Official Teaser Trailer (HD),Watch the #Venom teaser trailer now. 10.5.18.\...,16416756,219113,25700,"[(2018-03-17, 219113, 25700, 16416756), (2018-..."
7,tGRzz0oqgUE,Janelle Monáe – Make Me Feel [Official Music V...,“Make Me Feel” and “Django Jane” available now...,8585663,150947,9745,"[(2018-03-31, 150947, 9745, 8585663), (2018-03..."
8,tsp7IOr7Q9A,BHAD BHABIE feat. Lil Yachty - Gucci Flip Flop...,BHAD BHABIE Gucci Flip Flops ft. Lil Yachty ⛵️...,50122125,873431,119401,"[(2018-06-07, 873431, 119401, 50122125), (2018..."
9,u_C4onVrr8U,Miguel - Come Through and Chill (Official Vide...,“Come Through and Chill” ft. J. Cole out now! ...,9657370,152765,3266,"[(2018-06-01, 152765, 3266, 9657370), (2018-05..."


In [17]:
top_videos_df = final_df.select(
                            F.struct(
                                col("video_id"),
                                col('title'),
                                col('description'),
                                col('latest_views'),
                                col('latest_likes'),
                                col('latest_dislikes'),
                                col('trending_days')
                            ).alias("video")
                        )
df_print(top_videos_df, 10)

Unnamed: 0,video
0,"(2z3EUY1aXdY, Justin Timberlake’s FULL Pepsi S..."
1,"(5GHXEGz3PJg, Florence + The Machine - Hunger,..."
2,"(8h--kFui1JA, Sam Smith - Pray (Official Video..."
3,"(BhIEIO0vaBE, To Our Daughter, Directed by Tyl..."
4,"(Il-an3K9pjg, Anne-Marie - 2002 [Official Vide..."
5,"(NooW_RbfdWI, Jurassic World: Fallen Kingdom -..."
6,"(dzxFdtWmjto, VENOM - Official Teaser Trailer ..."
7,"(tGRzz0oqgUE, Janelle Monáe – Make Me Feel [Of..."
8,"(tsp7IOr7Q9A, BHAD BHABIE feat. Lil Yachty - G..."
9,"(u_C4onVrr8U, Miguel - Come Through and Chill ..."


In [18]:
q1_videos_df = convert_df_to_column_array(top_videos_df, from_col="video", to_col="videos")
df_print(q1_videos_df)

Unnamed: 0,videos
0,"[(2z3EUY1aXdY, Justin Timberlake’s FULL Pepsi ..."


## Query 2

**Description:** Find what was the most popular category for each week (7 days slices).
Popularity is decided based on the total number of views for videos of
this category. Note, to calculate it you can’t just sum up the number of views.
If a particular video appeared only once during the given period, it shouldn’t be
counted. Only if it appeared more than once you should count the number of new
views. For example, if video A appeared on day 1 with 100 views, then on day 4
with 250 views and again on day 6 with 400 views, you should count it as 400 - 100 = 300.
For our purpose, it will mean that this particular video was watched 300 times
in the given time period.

In [19]:
week_dates_schema = t.StructType([
    t.StructField("start_date", t.DateType()),
    t.StructField("end_date", t.DateType())
])


@F.udf(week_dates_schema)
def get_week_dates_udf(trending_date):
    trending_datetime = datetime.strptime(str(trending_date), "%Y-%m-%d")
    start_date = trending_datetime + relativedelta(weekday=MO(-1))
    end_date = trending_datetime + relativedelta(weekday=SU(1))
    return start_date, end_date


In [20]:
# Split video ids on 7-days chunks
df = videos_df.select(
        col("video_id"),
        F.to_date(F.from_unixtime(F.unix_timestamp(col("trending_date"), "yy.dd.MM"))).alias("trending_date"), 
        col("views").cast(t.LongType()).alias("views"), 
        col("category_id")
    )

chunks_df = df.withColumn("week_dates", get_week_dates_udf(col("trending_date"))) \
                .select(
                    col("video_id"),
                    col("views"),
                    col("category_id"),
                    col("trending_date"),
                    col("week_dates.start_date").alias("start_date"),
                    col("week_dates.end_date").alias("end_date"))

In [21]:
chunks_df.show()

24/08/30 02:39:12 ERROR Executor: Exception in task 0.0 in stage 66.0 (TID 46)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/denys_herasymuk/Programs/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 1104, in main
    "driver_version": str(version),
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 7) than that in driver 3.12, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/denys_herasymuk/Programs/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 1104, in main
    "driver_version": str(version),
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 7) than that in driver 3.12, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.


In [22]:
# Count number of video appearance during this 7 days. 
# And filter video ids, which appeared more than once in its period chunk
weekly_video_views_df = (
    chunks_df
        .groupBy("video_id", "category_id", "start_date", "end_date")
        .agg(
            F.count("video_id").alias("video_count"), 
            F.collect_list(col("views")).alias("views_lst")
        )
        .where(col("video_count") >= 2)
        .select(
            col("video_id"),
            col("start_date"),
            col("end_date"),
            col("category_id"),
            col("views_lst")
        )
)

In [23]:
# Get views_delta for videos, which relates to the same period chunk and category, sum it 
# and find the mpst popular category for each week
window = w.partitionBy("start_date", "end_date").orderBy(col("total_views").desc())
weekly_most_popular_categories_df = (
    weekly_video_views_df
        .withColumn("views_delta", F.array_max(col("views_lst")) - F.array_min(col("views_lst")))
        .groupBy("start_date", "end_date", "category_id")
        .agg(
            F.count("video_id").alias("number_of_videos"), 
            F.sum(col("views_delta")).alias("total_views"),
            F.collect_list(col("video_id")).alias("video_ids_lst")
        )
        .withColumn("rank", F.row_number().over(window))
        .where(col("rank") == 1)
)
weekly_most_popular_categories_df.toPandas().head(10)

24/08/30 02:39:35 ERROR Executor: Exception in task 0.0 in stage 67.0 (TID 47)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/denys_herasymuk/Programs/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 1104, in main
    "driver_version": str(version),
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 7) than that in driver 3.12, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/denys_herasymuk/Programs/spark-3.5.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 1104, in main
    "driver_version": str(version),
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 7) than that in driver 3.12, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.


In [None]:
# Join to get category titles
final_df = (
    weekly_most_popular_categories_df.alias("df1")
        .join(video_categories_df.alias("df2"), col("df1.category_id") == col("df2.id"), how="left")
        .sort(col("df1.start_date").desc())
        .select(
            col("df1.start_date"),
            col("df1.end_date"),
            col("df1.category_id"),
            col("df2.title").alias("category_name"),
            col("df1.number_of_videos"),
            col("df1.total_views"),
            col("df1.video_ids_lst").alias("video_ids")
        )
)
final_df.toPandas().head(50)

In [None]:
final_df = final_df.select(
                            F.struct(
                                col("start_date"),
                                col("end_date"),
                                col("category_id"),
                                col("category_name"),
                                col("number_of_videos"),
                                col("total_views"),
                                col("video_ids")
                            ).alias("week")
                        )
final_df.toPandas().head(10)

In [None]:
q2_weeks_df = convert_df_to_column_array(final_df, from_col="week", to_col="weeks")
df_print(q2_weeks_df)

## Query 3

**Description:** What were the 10 most used tags amongst trending videos for each 30days time period?
Note, if during the specified period the same video appears multiple times,
you should count tags related to that video only once.

In [22]:
# Create 30-days windows and filter periods by distinct video_ids
df = (
    videos_df
        .withColumn('tags', F.regexp_replace(col("tags"), '\"', ''))
        .withColumn('trending_date',
                    F.to_date(F.from_unixtime(F.unix_timestamp(col("trending_date"), "yy.dd.MM"))) )
        .withColumn("30_days_period", F.window("trending_date", "30 days"))
        .select(
            col('video_id'),
            col('tags'),
            col('30_days_period.start').alias('start_date'),
            col('30_days_period.end').alias('end_date')
        ).distinct()
)
df.toPandas().head(10)

TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.

In [None]:
# Check if window function worked correctly
df.select(['start_date', 'end_date']).distinct().sort(col('start_date')).show()

In [None]:
# Group by 30-days periods and count number of tags and video_ids for each period 
period_df = (
      df
        .withColumn('tags_array', F.split(col('tags'), "\|"))
        .withColumn('tag', F.explode(col('tags_array')))
        .groupBy('tag', 'start_date', 'end_date')
        .agg(
            F.count(col('video_id')).alias('number_of_videos'), 
            F.collect_list(col('video_id')).alias('video_ids')
        )
        .select(['tag', 'number_of_videos', 'video_ids', 'start_date', 'end_date'])
)
df_print(period_df, 10)

In [None]:
# Sort number_of_videos in descending order. Take top 10 most used tags
window =  w.partitionBy("start_date", "end_date").orderBy(col("number_of_videos").desc())
top_tags_df = (
    period_df
            .withColumn("rank", F.row_number().over(window))
            .where(col("rank") <= 10)
            .select(
                col("start_date"),
                col("end_date"),
                F.struct(
                    col("tag"),
                    col("number_of_videos"),
                    col("video_ids")
                ).alias("tag_stat")
            )
    )
df_print(top_tags_df, 10)

In [None]:
# Transform to month's schema
month_df = (
    top_tags_df
        .groupBy("start_date", "end_date")
        .agg(F.collect_list(col("tag_stat")).alias("tags"))
        .select(F.struct(
            col("start_date"),
            col("end_date"),
            col("tags")
        ).alias("month"))
)
df_print(month_df)

In [None]:
# Transform to months' schema
months_df = convert_df_to_column_array(month_df, from_col="month", to_col="months")
df_print(months_df)

## Query 4

**Description:** Show the top 20 channels by the number of views for the whole period.
Note, if there are multiple appearances of the same video for some channel,
you should take into account only the last appearance (with the highest
number of views).

In [None]:
# Create initial dataframe for the query
df = videos_df\
        .withColumn("trending_date", F.to_date(F.from_unixtime(F.unix_timestamp(col("trending_date"), "yy.dd.MM"))))\
        .select(
            col("channel_title"),
            col("video_id"),
            col("trending_date"),
            col("views").cast(t.LongType()).alias("views")
        )


# Group by channel_title and video_id. Take video with max number of views for the same channel
window = w.partitionBy("channel_title", "video_id").orderBy(col("views").desc())
filtered_videos_df = (
      df
        .withColumn("rank", F.row_number().over(window))
        .where(col("rank") == 1)
)
df_print(filtered_videos_df)

In [None]:
# Group by channel_title to get a number of views for all popular videos.
# Also get top 20 channels by the number of views for the whole period
most_popular_channels = (
    filtered_videos_df
        .withColumn("video_stat", 
                    F.struct(
                       col("video_id"),
                       col("views")
                   ))
        .groupBy("channel_title")
        .agg(
            F.sum(col("views")).alias("total_views"),
            F.min("trending_date").alias("start_date"),
            F.max("trending_date").alias("end_date"),
            F.collect_list(col("video_stat")).alias("videos_views")
        )
        .orderBy(col("total_views").desc())
        .select(
            F.struct(
                col("channel_title").alias("channel_name"),
                col("start_date"),
                col("end_date"),
                col("total_views"),
                col("videos_views")
            ).alias("channel")
        )
        .limit(20)
)
df_print(most_popular_channels, 20)

In [None]:
q4_channels_df = convert_df_to_column_array(most_popular_channels, from_col="channel", to_col="channels")
df_print(q4_channels_df)

## Query 5

**Description:** Show the top 10 channels with videos trending for the highest number of days
(it doesn't need to be a consecutive period of time) for the whole period.
In order to calculate it, you may use the results from the question No1.
The total_trending_days count will be a sum of the numbers of trending days
for videos from this channel.

In [None]:
# Use detailed_most_trending_df, which was created for query 1
df_print(detailed_most_trending_df, 10)

In [None]:
video_id_to_channel_df = videos_df.select(['video_id', 'channel_title']).distinct()

# Get channel names for each video_id
full_detailed_most_trending_df = (
    detailed_most_trending_df.alias('df1')
        .join(video_id_to_channel_df.alias('df2'),
                col("df1.video_id") == col("df2.video_id"), how='inner')
        .select(
            col("channel_title").alias("channel_name"),
            F.struct(
                col("df1.video_id").alias("video_id"),
                col("df1.title").alias("video_title"),
                col("df1.num_trending_days").alias("trending_days")
            ).alias("video_day")
        )
)


# Find top 10 channels with videos trending for the highest number of days
top_channels_df = (
    full_detailed_most_trending_df
        .groupBy("channel_name")
        .agg(
            F.collect_list(col("video_day")).alias("videos_days"),
            F.sum(col("video_day.trending_days")).alias("total_trending_days")
        )
        .orderBy(col("total_trending_days").desc())
        .select(
            F.struct(
                col("channel_name"),
                col("total_trending_days"),
                col("videos_days")
            ).alias("channel")
        ).limit(10)
)
df_print(top_channels_df, 10)

In [None]:
q5_channels_df = convert_df_to_column_array(top_channels_df, from_col="channel", to_col="channels")
df_print(q5_channels_df)

## Query 6

**Description:** Show the top 10 videos by the ratio of likes/dislikes for each category
for the whole period. You should consider only videos with more than 100K views.
If the same video occurs multiple times you should take the record when
the ratio was the highest.

In [None]:
# Create initial dataframe for the query. Filter videos, which have more than 100K views
df = (
    videos_df
        .where(col("views") > 100_000)
        .select(
            col('video_id'),
            col("category_id"),
            col("title").alias("video_title"),
            col("views").cast(t.LongType()).alias("views"),
            col("likes").cast(t.LongType()).alias("likes"), 
            col("dislikes").cast(t.LongType()).alias("dislikes")
        )
)


# Count the ratio of likes/dislikes for each video. 
# If the same video occurs multiple times you should take the record when the ratio was the highest
video_ratio_window = w.partitionBy("video_id").orderBy(col("ratio_likes_dislikes").desc())
video_ratios_df = (
       df
        .withColumn("ratio_likes_dislikes", col("likes") / col("dislikes"))
        .withColumn("video_ratio_rank", F.row_number().over(video_ratio_window))
        .where(col("video_ratio_rank") == 1)
)


# Group by category and get top 10 videos by the ratio of likes/dislikes for each category
category_ratio_window =  w.partitionBy("category_id").orderBy(col("ratio_likes_dislikes").desc())
category_ratios_df = (
    video_ratios_df.alias("df1")
        .withColumn("category_ratio_rank", F.row_number().over(category_ratio_window))
        .where(col("category_ratio_rank") <= 10)
        .join(video_categories_df.alias("df2"),
             col("df1.category_id") == col("df2.id"))
        .select(
            col("category_id"),
            col("df2.title").alias("category_name"),
            F.struct(
                col("video_id"),
                col("video_title"),
                col("ratio_likes_dislikes"),
                col("views")
            ).alias("video")
        )
)
df_print(category_ratios_df, 20)

In [None]:
top_category_videos_df = (
    category_ratios_df
        .groupBy("category_id", "category_name")
        .agg(
            F.collect_list(col("video")).alias("videos")
        )
        .select(
            F.struct(
                col("category_id"),
                col("category_name"),
                col("videos")
            ).alias("category")
        )
)
df_print(top_category_videos_df, 10)

In [None]:
q6_category_videos_df = convert_df_to_column_array(top_category_videos_df, from_col="category", to_col="categories")
df_print(q6_category_videos_df)