In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=a17cb67c5e4b2176267b5a04cba5af1e38df726699807eb1bc2722e7eb592c2c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
#user_data
data = [
(521,'Mirzapur',3,'2024-07-30 15:00:00','action',300),
(672,'Panchayat',3,'2024-07-30 15:00:00','comedy',200),
(197,'Family Man',2,'2024-07-30 15:00:00','action',500),
(521,'Mirzapur',2,'2024-07-29 15:00:00','action',280),
(211,'Queens Gambit',1,'2024-07-30 15:00:00','drama',170),
(521,'Mirzapur',1,'2024-07-28 15:00:00','action',230),
(844,'Westworld',3,'2024-07-30 15:00:00','sci-fi',310),
(672,'Panchayat',3,'2024-07-29 15:00:00','comedy',210),
(256,'Homecoming',2,'2024-07-30 15:00:00','thriller',310),
(489,'Outer Range',1,'2024-07-30 15:00:00','sci-fi',340),
(200,'Black Mirror',2,'2024-07-30 15:00:00','sci-fi',140),
(256,'Outer Range',2,'2024-07-30 15:00:00','thriller',250),
(489,'Outer Rang',2,'2024-07-28 15:00:00','sci-fi',170),
(200,'Black Mirror',3,'2024-07-29 15:00:00','sci-fi',190),
(672,'Panchayat',2,'2024-07-28 15:00:00','comedy',160),
(672,'Outer Range',1,'2024-07-25 15:00:00','sci-fi',250),
(200,'Black Mirror',4,'2024-07-28 15:00:00','sci-fi',200),
(844,'Westworld',2,'2024-07-29 15:00:00','sci-fi',300),
(672,'Black Mirror',5,'2024-07-28 15:00:00','sci-fi',15),
(672,'Panchayat',1,'2024-07-27 15:00:00','comedy',190)
]

In [4]:
schema = ['user_id','series','season','timestamp','genre','duration_mins']

In [6]:
s1 = SparkSession.builder.appName('Streaming Analysis').getOrCreate()

In [7]:
series_df = s1.createDataFrame(data,schema)

In [8]:
series_df.show()

+-------+-------------+------+-------------------+--------+-------------+
|user_id|       series|season|          timestamp|   genre|duration_mins|
+-------+-------------+------+-------------------+--------+-------------+
|    521|     Mirzapur|     3|2024-07-30 15:00:00|  action|          300|
|    672|    Panchayat|     3|2024-07-30 15:00:00|  comedy|          200|
|    197|   Family Man|     2|2024-07-30 15:00:00|  action|          500|
|    521|     Mirzapur|     2|2024-07-29 15:00:00|  action|          280|
|    211|Queens Gambit|     1|2024-07-30 15:00:00|   drama|          170|
|    521|     Mirzapur|     1|2024-07-28 15:00:00|  action|          230|
|    844|    Westworld|     3|2024-07-30 15:00:00|  sci-fi|          310|
|    672|    Panchayat|     3|2024-07-29 15:00:00|  comedy|          210|
|    256|   Homecoming|     2|2024-07-30 15:00:00|thriller|          310|
|    489|  Outer Range|     1|2024-07-30 15:00:00|  sci-fi|          340|
|    200| Black Mirror|     2|2024-07-

In [27]:
#Find the user with maximum watchtime
series_df.groupBy('user_id').agg({'duration_mins':'sum'}).orderBy('sum(duration_mins)',ascending = False).limit(1).show()

+-------+------------------+
|user_id|sum(duration_mins)|
+-------+------------------+
|    672|              1025|
+-------+------------------+



In [28]:
#Calculate overall total Watchtime
series_df.agg({'duration_mins':'sum'}).show()

+------------------+
|sum(duration_mins)|
+------------------+
|              4715|
+------------------+



In [19]:
#Find most popular shows (based on watchtime)
series_df.groupBy('series').agg({'duration_mins':'sum'}).orderBy('sum(duration_mins)',ascending = False).show()

+-------------+------------------+
|       series|sum(duration_mins)|
+-------------+------------------+
|  Outer Range|               840|
|     Mirzapur|               810|
|    Panchayat|               760|
|    Westworld|               610|
| Black Mirror|               545|
|   Family Man|               500|
|   Homecoming|               310|
|Queens Gambit|               170|
|   Outer Rang|               170|
+-------------+------------------+



In [20]:
#Find most popular shows (based on user popularity)
series_df.groupBy('series').agg({'user_id':'count'}).orderBy('count(user_id)',ascending = False).show()

+-------------+--------------+
|       series|count(user_id)|
+-------------+--------------+
|    Panchayat|             4|
| Black Mirror|             4|
|  Outer Range|             3|
|     Mirzapur|             3|
|    Westworld|             2|
|   Family Man|             1|
|   Homecoming|             1|
|Queens Gambit|             1|
|   Outer Rang|             1|
+-------------+--------------+



In [25]:
#Find the most popular genre
series_df.groupBy('genre').agg({'genre':'count'}).orderBy('count(genre)',ascending = False).show()

+--------+------------+
|   genre|count(genre)|
+--------+------------+
|  sci-fi|           9|
|  action|           4|
|  comedy|           4|
|thriller|           2|
|   drama|           1|
+--------+------------+



In [26]:
#Calculate total watchtime per user
series_df.groupBy('user_id').agg({'duration_mins':'sum'}).orderBy('sum(duration_mins)',ascending = False).show()

+-------+------------------+
|user_id|sum(duration_mins)|
+-------+------------------+
|    672|              1025|
|    521|               810|
|    844|               610|
|    256|               560|
|    200|               530|
|    489|               510|
|    197|               500|
|    211|               170|
+-------+------------------+



In [32]:
#Find most popular genre (based on engagement count)
series_df.groupBy(['genre']).agg({'timestamp':'count'}).orderBy('count(timestamp)',ascending = False).show()

+--------+----------------+
|   genre|count(timestamp)|
+--------+----------------+
|  sci-fi|               9|
|  action|               4|
|  comedy|               4|
|thriller|               2|
|   drama|               1|
+--------+----------------+



In [31]:
#Find average watchtime per genre
series_df.groupBy('genre').agg({'duration_mins':'avg'}).show()

+--------+------------------+
|   genre|avg(duration_mins)|
+--------+------------------+
|  action|             327.5|
|   drama|             170.0|
|thriller|             280.0|
|  sci-fi|212.77777777777777|
|  comedy|             190.0|
+--------+------------------+



In [74]:
#(Output 2 = Only Day)
series_df.groupBy('timestamp').agg({'timestamp':'count'}).orderBy('count(timestamp)',ascending = False).limit(1).collect()[0][0]

'2024-07-30 15:00:00'

In [71]:
#Find peak traffic days
#(Output 1 = Full Date)
series_df.groupBy('timestamp').agg({'timestamp':'count'}).orderBy('count(timestamp)',ascending = False).first()[0]


'2024-07-30 15:00:00'

In [75]:
#Find the user with most diverse show preference
series_df.groupBy('user_id').agg(countDistinct('series')).show()

+-------+----------------------+
|user_id|count(DISTINCT series)|
+-------+----------------------+
|    844|                     1|
|    521|                     1|
|    197|                     1|
|    200|                     1|
|    672|                     3|
|    256|                     2|
|    489|                     2|
|    211|                     1|
+-------+----------------------+



In [68]:
series_df.show()

+-------+-------------+------+-------------------+--------+-------------+
|user_id|       series|season|          timestamp|   genre|duration_mins|
+-------+-------------+------+-------------------+--------+-------------+
|    521|     Mirzapur|     3|2024-07-30 15:00:00|  action|          300|
|    672|    Panchayat|     3|2024-07-30 15:00:00|  comedy|          200|
|    197|   Family Man|     2|2024-07-30 15:00:00|  action|          500|
|    521|     Mirzapur|     2|2024-07-29 15:00:00|  action|          280|
|    211|Queens Gambit|     1|2024-07-30 15:00:00|   drama|          170|
|    521|     Mirzapur|     1|2024-07-28 15:00:00|  action|          230|
|    844|    Westworld|     3|2024-07-30 15:00:00|  sci-fi|          310|
|    672|    Panchayat|     3|2024-07-29 15:00:00|  comedy|          210|
|    256|   Homecoming|     2|2024-07-30 15:00:00|thriller|          310|
|    489|  Outer Range|     1|2024-07-30 15:00:00|  sci-fi|          340|
|    200| Black Mirror|     2|2024-07-

In [69]:
#Find the binge-watchers
df1 = series_df.groupBy('series').agg(max('season'))
series_df.where

+-------------+-----------+
|       series|max(season)|
+-------------+-----------+
|  Outer Range|          2|
|    Westworld|          3|
|   Family Man|          2|
|   Homecoming|          2|
|     Mirzapur|          3|
|    Panchayat|          3|
|Queens Gambit|          1|
| Black Mirror|          5|
|   Outer Rang|          2|
+-------------+-----------+



In [46]:
#Find the user with longest watching streak
series_df.orderBy('duration_mins',ascending = False).first()[0]

197

In [47]:
#Total Seasons available
series_df.groupBy('series').agg({'season':'max'}).show()

+-------------+-----------+
|       series|max(season)|
+-------------+-----------+
|  Outer Range|          2|
|    Westworld|          3|
|   Family Man|          2|
|   Homecoming|          2|
|     Mirzapur|          3|
|    Panchayat|          3|
|Queens Gambit|          1|
| Black Mirror|          5|
|   Outer Rang|          2|
+-------------+-----------+



In [24]:
#Fetch a list of all series
series_df.rdd.flatMap(lambda x:[x[1]]).distinct().collect()


['Mirzapur',
 'Family Man',
 'Queens Gambit',
 'Westworld',
 'Homecoming',
 'Panchayat',
 'Outer Range',
 'Black Mirror',
 'Outer Rang']