In [4]:
#user_data
data=[
(521,'Mirzapur',3,'2024-07-30 15:00:00','action',300),
(672,'Panchayat',3,'2024-07-30 15:00:00','comedy',200),
(197,'Family Man',2,'2024-07-30 15:00:00','action',500),
(521,'Mirzapur',2,'2024-07-29 15:00:00','action',280),
(211,'Queens Gambit',1,'2024-07-30 15:00:00','drama',170),
(521,'Mirzapur',1,'2024-07-28 15:00:00','action',230),
(844,'Westworld',3,'2024-07-30 15:00:00','sci-fi',310),
(672,'Panchayat',3,'2024-07-29 15:00:00','comedy',210),
(256,'Homecoming',2,'2024-07-30 15:00:00','thriller',310),
(489,'Outer Range',1,'2024-07-30 15:00:00','sci-fi',340),
(200,'Black Mirror',2,'2024-07-30 15:00:00','sci-fi',140),
(256,'Outer Range',2,'2024-07-30 15:00:00','thriller',250),
(489,'Outer Range',2,'2024-07-28 15:00:00','sci-fi',170),
(200,'Black Mirror',3,'2024-07-29 15:00:00','sci-fi',190),
(672,'Panchayat',2,'2024-07-28 15:00:00','comedy',160),
(672,'Outer Range',1,'2024-07-25 15:00:00','sci-fi',250),
(200,'Black Mirror',4,'2024-07-28 15:00:00','sci-fi',200),
(844,'Westworld',2,'2024-07-29 15:00:00','sci-fi',300),
(672,'Black Mirror',5,'2024-07-28 15:00:00','sci-fi',150),
(672,'Panchayat',1,'2024-07-27 15:00:00','comedy',190)]

In [10]:
sch = ('user_id','series','season','timestamp','genre','duration_mins')

In [3]:
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *



In [9]:
#Create SparkSession for app (Streaming Analysis)
spark = SparkSession.builder.appName('Streaming Analysis').getOrCreate()





In [13]:
#Create Dataframe series_df
df = spark.createDataFrame(data,schema = sch)
df.show()

+-------+-------------+------+-------------------+--------+-------------+
|user_id|       series|season|          timestamp|   genre|duration_mins|
+-------+-------------+------+-------------------+--------+-------------+
|    521|     Mirzapur|     3|2024-07-30 15:00:00|  action|          300|
|    672|    Panchayat|     3|2024-07-30 15:00:00|  comedy|          200|
|    197|   Family Man|     2|2024-07-30 15:00:00|  action|          500|
|    521|     Mirzapur|     2|2024-07-29 15:00:00|  action|          280|
|    211|Queens Gambit|     1|2024-07-30 15:00:00|   drama|          170|
|    521|     Mirzapur|     1|2024-07-28 15:00:00|  action|          230|
|    844|    Westworld|     3|2024-07-30 15:00:00|  sci-fi|          310|
|    672|    Panchayat|     3|2024-07-29 15:00:00|  comedy|          210|
|    256|   Homecoming|     2|2024-07-30 15:00:00|thriller|          310|
|    489|  Outer Range|     1|2024-07-30 15:00:00|  sci-fi|          340|
|    200| Black Mirror|     2|2024-07-

In [18]:
#Find the user with maximum watchtime
df1 = df.groupBy('user_id').agg({'duration_mins': 'max'}).orderBy('max(duration_mins)',ascending=False).show(1)

+-------+------------------+
|user_id|max(duration_mins)|
+-------+------------------+
|    197|               500|
+-------+------------------+
only showing top 1 row



In [26]:
#Calculate overall total Watchtime
df2 = df.groupBy('user_id').agg({'duration_mins' : 'sum'}).show(1)



+-------+------------------+
|user_id|sum(duration_mins)|
+-------+------------------+
|    844|               610|
+-------+------------------+
only showing top 1 row



In [28]:
#Find most popular shows (based on watchtime)
df3 = df.groupBy('series').agg({'duration_mins':'sum'}).orderBy('sum(duration_mins)',ascending = False).show(1)


+-----------+------------------+
|     series|sum(duration_mins)|
+-----------+------------------+
|Outer Range|              1010|
+-----------+------------------+
only showing top 1 row



In [30]:
#Find most popular shows (based on user popularity)
df4 = df.groupBy('series').agg({'user_id':'count'}).show(1)


+-----------+--------------+
|     series|count(user_id)|
+-----------+--------------+
|Outer Range|             4|
+-----------+--------------+
only showing top 1 row



In [54]:
#Find the most popular genre
df.groupBy('genre').agg({'user_id': 'count'}).orderBy('count(user_id)',ascending = False).show()


+--------+--------------+
|   genre|count(user_id)|
+--------+--------------+
|  sci-fi|             9|
|  action|             4|
|  comedy|             4|
|thriller|             2|
|   drama|             1|
+--------+--------------+



In [53]:
#Find most popular genre (based on engagement count)
df.groupBy('genre').agg({'timestamp' : 'count'}).show()


+--------+----------------+
|   genre|count(timestamp)|
+--------+----------------+
|  action|               4|
|   drama|               1|
|thriller|               2|
|  sci-fi|               9|
|  comedy|               4|
+--------+----------------+



In [52]:
#Find average watchtime per genre
df.groupBy('genre').agg({'duration_mins' : 'avg'}).show()



+--------+------------------+
|   genre|avg(duration_mins)|
+--------+------------------+
|  action|             327.5|
|   drama|             170.0|
|thriller|             280.0|
|  sci-fi|227.77777777777777|
|  comedy|             190.0|
+--------+------------------+



In [56]:
#Find peak traffic days
#(Output 1 = Full Date)
df.groupBy('timestamp').agg({'user_id' : 'count'}).orderBy('count(user_id)',ascending=False).first()[0]



#(Output 2 = Only Day)





'2024-07-30 15:00:00'

In [72]:
#Find the user with most diverse show preference
df.groupBy('user_id').agg(countDistinct('genre')).orderBy(countDistinct('genre'), ascending=False).show()


+-------+---------------------+
|user_id|count(DISTINCT genre)|
+-------+---------------------+
|    672|                    2|
|    844|                    1|
|    521|                    1|
|    197|                    1|
|    200|                    1|
|    256|                    1|
|    489|                    1|
|    211|                    1|
+-------+---------------------+



In [None]:
#Find the binge-watchers




In [78]:
#Find the user with longest watching streak
df.groupBy('user_id','timestamp').agg({'duration_mins' : 'sum'}).orderBy('sum(duration_mins)',ascending=False).show()


+-------+-------------------+------------------+
|user_id|          timestamp|sum(duration_mins)|
+-------+-------------------+------------------+
|    256|2024-07-30 15:00:00|               560|
|    197|2024-07-30 15:00:00|               500|
|    489|2024-07-30 15:00:00|               340|
|    844|2024-07-30 15:00:00|               310|
|    672|2024-07-28 15:00:00|               310|
|    521|2024-07-30 15:00:00|               300|
|    844|2024-07-29 15:00:00|               300|
|    521|2024-07-29 15:00:00|               280|
|    672|2024-07-25 15:00:00|               250|
|    521|2024-07-28 15:00:00|               230|
|    672|2024-07-29 15:00:00|               210|
|    672|2024-07-30 15:00:00|               200|
|    200|2024-07-28 15:00:00|               200|
|    200|2024-07-29 15:00:00|               190|
|    672|2024-07-27 15:00:00|               190|
|    211|2024-07-30 15:00:00|               170|
|    489|2024-07-28 15:00:00|               170|
|    200|2024-07-30 

In [73]:
df.show()

+-------+-------------+------+-------------------+--------+-------------+
|user_id|       series|season|          timestamp|   genre|duration_mins|
+-------+-------------+------+-------------------+--------+-------------+
|    521|     Mirzapur|     3|2024-07-30 15:00:00|  action|          300|
|    672|    Panchayat|     3|2024-07-30 15:00:00|  comedy|          200|
|    197|   Family Man|     2|2024-07-30 15:00:00|  action|          500|
|    521|     Mirzapur|     2|2024-07-29 15:00:00|  action|          280|
|    211|Queens Gambit|     1|2024-07-30 15:00:00|   drama|          170|
|    521|     Mirzapur|     1|2024-07-28 15:00:00|  action|          230|
|    844|    Westworld|     3|2024-07-30 15:00:00|  sci-fi|          310|
|    672|    Panchayat|     3|2024-07-29 15:00:00|  comedy|          210|
|    256|   Homecoming|     2|2024-07-30 15:00:00|thriller|          310|
|    489|  Outer Range|     1|2024-07-30 15:00:00|  sci-fi|          340|
|    200| Black Mirror|     2|2024-07-

In [81]:
#Total Seasons available
df.groupBy('series').agg({'season':'max'}).show()


+-------------+-----------+
|       series|max(season)|
+-------------+-----------+
|  Outer Range|          2|
|    Westworld|          3|
|   Family Man|          2|
|   Homecoming|          2|
|     Mirzapur|          3|
|    Panchayat|          3|
|Queens Gambit|          1|
| Black Mirror|          5|
+-------------+-----------+



In [92]:
#Fetch a list of all series
df.rdd.flatMap(lambda x:[x[1]]).distinct().collect()

['Mirzapur',
 'Family Man',
 'Queens Gambit',
 'Westworld',
 'Homecoming',
 'Panchayat',
 'Outer Range',
 'Black Mirror']