In [54]:
#user_data
%%writefile user_data.csv
user_id,series,season,timestamp,genre,duration_mins
521,”Mirzapur”,3,2024-07-30 15:00:00,action,300
672,”Panchayat”,3,2024-07-30 15:00:00,comedy,200
197,”Family Man”,2,2024-07-30 15:00:00,action,500
521,”Mirzapur”,2,2024-07-29 15:00:00,action,280
211,”Queens Gambit”,1,2024-07-30 15:00:00,drama,170
521,”Mirzapur”,1,2024-07-28 15:00:00,action,230
844,”Westworld”,3,2024-07-30 15:00:00,sci-fi,310
672,”Panchayat”,3,2024-07-29 15:00:00,comedy,210
256,”Homecoming”,2,2024-07-30 15:00:00,thriller,310
489,”Outer Range”,1,2024-07-30 15:00:00,sci-fi,340
200,”Black Mirror”,2,2024-07-30 15:00:00,sci-fi,140
256,”Outer Range”,2,2024-07-30 15:00:00,thriller,250
489,”Outer Range”,2,2024-07-28 15:00:00,sci-fi,170
200,”Black Mirror”,3,2024-07-29 15:00:00,sci-fi,190
672,”Panchayat”,2,2024-07-28 15:00:00,comedy,160
672,”Outer Range”,1,2024-07-25 15:00:00,sci-fi,250
200,”Black Mirror”,4,2024-07-28 15:00:00,sci-fi,200
844,”Westworld”,2,2024-07-29 15:00:00,sci-fi,300
672,”Black Mirror”,5,2024-07-28 15:00:00,sci-fi,150
672,”Panchayat”,1,2024-07-27 15:00:00,comedy,190

Overwriting user_data.csv


In [55]:
#Import Libraries
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *



In [56]:
#Create SparkSession for app (Streaming Analysis)
spark=SparkSession.builder.appName('data').getOrCreate()

In [57]:
#Create Dataframe series_df
df=spark.read.option('header',True).csv('/content/user_data.csv')

In [27]:
df.show()

+-------+---------------+------+-------------------+--------+-------------+
|user_id|         series|season|          timestamp|   genre|duration_mins|
+-------+---------------+------+-------------------+--------+-------------+
|    521|     ”Mirzapur”|     3|2024-07-30 15:00:00|  action|          300|
|    672|    ”Panchayat”|     3|2024-07-30 15:00:00|  comedy|          200|
|    197|   ”Family Man”|     2|2024-07-30 15:00:00|  action|          500|
|    521|     ”Mirzapur”|     2|2024-07-29 15:00:00|  action|          280|
|    211|”Queens Gambit”|     1|2024-07-30 15:00:00|   drama|          170|
|    521|     ”Mirzapur”|     1|2024-07-28 15:00:00|  action|          230|
|    844|    ”Westworld”|     3|2024-07-30 15:00:00|  sci-fi|          310|
|    672|    ”Panchayat”|     3|2024-07-29 15:00:00|  comedy|          210|
|    256|   ”Homecoming”|     2|2024-07-30 15:00:00|thriller|          310|
|    489|  ”Outer Range”|     1|2024-07-30 15:00:00|  sci-fi|          340|
|    200| ”B

In [58]:
#Find the user with maximum watchtime
arr1=df.groupBy('user_id').agg({'duration_mins':'sum'}).orderBy('sum(duration_mins)',ascending=False).show()

+-------+------------------+
|user_id|sum(duration_mins)|
+-------+------------------+
|    672|            1160.0|
|    521|             810.0|
|    844|             610.0|
|    256|             560.0|
|    200|             530.0|
|    489|             510.0|
|    197|             500.0|
|    211|             170.0|
+-------+------------------+



In [59]:
#Calculate overall total Watchtime
arr2=df.groupBy('user_id').agg(sum('duration_mins')).show()

+-------+------------------+
|user_id|sum(duration_mins)|
+-------+------------------+
|    521|             810.0|
|    200|             530.0|
|    672|            1160.0|
|    256|             560.0|
|    197|             500.0|
|    211|             170.0|
|    844|             610.0|
|    489|             510.0|
+-------+------------------+



In [60]:
#Find most popular shows (based on watchtime)
arr3=df.groupBy('series','genre').agg({'duration_mins':'sum'}).orderBy('sum(duration_mins)',ascending=False).show()

+---------------+--------+------------------+
|         series|   genre|sum(duration_mins)|
+---------------+--------+------------------+
|     ”Mirzapur”|  action|             810.0|
|  ”Outer Range”|  sci-fi|             760.0|
|    ”Panchayat”|  comedy|             760.0|
| ”Black Mirror”|  sci-fi|             680.0|
|    ”Westworld”|  sci-fi|             610.0|
|   ”Family Man”|  action|             500.0|
|   ”Homecoming”|thriller|             310.0|
|  ”Outer Range”|thriller|             250.0|
|”Queens Gambit”|   drama|             170.0|
+---------------+--------+------------------+



In [61]:
#Find most popular shows (based on user popularity)
arr4=df.groupBy('series').agg(count('user_id')).orderBy('count(user_id)',ascending=False).show()

+---------------+--------------+
|         series|count(user_id)|
+---------------+--------------+
|    ”Panchayat”|             4|
|  ”Outer Range”|             4|
| ”Black Mirror”|             4|
|     ”Mirzapur”|             3|
|    ”Westworld”|             2|
|   ”Family Man”|             1|
|   ”Homecoming”|             1|
|”Queens Gambit”|             1|
+---------------+--------------+



In [62]:

#Find the most popular genre
arr5=df.groupBy('genre').agg(count('genre')).orderBy('count(genre)',ascending=False).show()

+--------+------------+
|   genre|count(genre)|
+--------+------------+
|  sci-fi|           9|
|  action|           4|
|  comedy|           4|
|thriller|           2|
|   drama|           1|
+--------+------------+



In [63]:
#Calculate total watchtime per user
arr6=df.groupBy('user_id').agg({'duration_mins':'sum'}).show()

+-------+------------------+
|user_id|sum(duration_mins)|
+-------+------------------+
|    521|             810.0|
|    200|             530.0|
|    672|            1160.0|
|    256|             560.0|
|    197|             500.0|
|    211|             170.0|
|    844|             610.0|
|    489|             510.0|
+-------+------------------+



In [64]:
#Find most popular genre (based on engagement count)
arr7=df.groupBy('genre').agg(count('duration_mins')).orderBy('count(duration_mins)',ascending=False).show(1)

+------+--------------------+
| genre|count(duration_mins)|
+------+--------------------+
|sci-fi|                   9|
+------+--------------------+
only showing top 1 row



In [65]:
arr7=df.groupBy('genre').agg(count('genre')).show()

+--------+------------+
|   genre|count(genre)|
+--------+------------+
|  action|           4|
|   drama|           1|
|thriller|           2|
|  sci-fi|           9|
|  comedy|           4|
+--------+------------+



In [66]:
#Find average watchtime per genre
arr8=df.groupBy('genre').agg({'duration_mins':'avg'}).show()

+--------+------------------+
|   genre|avg(duration_mins)|
+--------+------------------+
|  action|             327.5|
|   drama|             170.0|
|thriller|             280.0|
|  sci-fi|227.77777777777777|
|  comedy|             190.0|
+--------+------------------+



In [89]:
#Find peak traffic days
#(Output 1 = Full Date)
arr10=df.groupBy(to_date('timestamp')).agg(count('timestamp')).orderBy('count(timestamp)',ascending=False)
arr10.select('to_date(timestamp)').show(1)

+------------------+
|to_date(timestamp)|
+------------------+
|        2024-07-30|
+------------------+
only showing top 1 row



In [90]:
#(Output 2 = Only Day)

arr10=df.groupBy(dayofmonth('timestamp')).agg(count('timestamp')).orderBy('count(timestamp)',ascending=False)
arr10.select('dayofmonth(timestamp)').show(1)

+---------------------+
|dayofmonth(timestamp)|
+---------------------+
|                   30|
+---------------------+
only showing top 1 row



In [114]:
#Find the user with most diverse show preference
df.groupBy('user_id').agg(countDistinct('series')).orderBy('count(DISTINCT series)',ascending=False).show(1)

+-------+----------------------+
|user_id|count(DISTINCT series)|
+-------+----------------------+
|    672|                     3|
+-------+----------------------+
only showing top 1 row



In [None]:
#Find the binge-watchers
df.groupBy()

In [96]:
#Find the user with longest watching streak


+-------+---------------+------+-------------------+--------+-------------+
|user_id|         series|season|          timestamp|   genre|duration_mins|
+-------+---------------+------+-------------------+--------+-------------+
|    521|     ”Mirzapur”|     3|2024-07-30 15:00:00|  action|          300|
|    672|    ”Panchayat”|     3|2024-07-30 15:00:00|  comedy|          200|
|    197|   ”Family Man”|     2|2024-07-30 15:00:00|  action|          500|
|    521|     ”Mirzapur”|     2|2024-07-29 15:00:00|  action|          280|
|    211|”Queens Gambit”|     1|2024-07-30 15:00:00|   drama|          170|
|    521|     ”Mirzapur”|     1|2024-07-28 15:00:00|  action|          230|
|    844|    ”Westworld”|     3|2024-07-30 15:00:00|  sci-fi|          310|
|    672|    ”Panchayat”|     3|2024-07-29 15:00:00|  comedy|          210|
|    256|   ”Homecoming”|     2|2024-07-30 15:00:00|thriller|          310|
|    489|  ”Outer Range”|     1|2024-07-30 15:00:00|  sci-fi|          340|
|    200| ”B

In [112]:
#Total Seasons available

df.groupBy('series').agg(max('season')).show()

+---------------+-----------+
|         series|max(season)|
+---------------+-----------+
| ”Black Mirror”|          5|
|   ”Family Man”|          2|
|   ”Homecoming”|          2|
|     ”Mirzapur”|          3|
|  ”Outer Range”|          2|
|    ”Panchayat”|          3|
|”Queens Gambit”|          1|
|    ”Westworld”|          3|
+---------------+-----------+



In [95]:
#Fetch a list of all series
arr11=df.select('series').distinct()
arr11.rdd.flatMap(lambda x:[x[0]]).collect()

['”Family Man”',
 '”Homecoming”',
 '”Panchayat”',
 '”Outer Range”',
 '”Black Mirror”',
 '”Mirzapur”',
 '”Westworld”',
 '”Queens Gambit”']