In [95]:
#user_data
%%writefile series.csv
user_id,series,season,timestamp,genre,duration_mins
521,Mirzapur,3,2024-07-30 15:00:00,action,300
672,Panchayat,3,2024-07-30 15:00:00,comedy,200
197,Family Man,2,2024-07-30 15:00:00,action,500
521,Mirzapur,2,2024-07-29 15:00:00,action,280
211,Queens Gambit,1,2024-07-30 15:00:00,drama,170
521,Mirzapur,1,2024-07-28 15:00:00,action,230
844,Westworld,3,2024-07-30 15:00:00,sci-fi,310
672,Panchayat,3,2024-07-29 15:00:00,comedy,210
256,Homecoming,2,2024-07-30 15:00:00,thriller,310
489,Outer Range,1,2024-07-30 15:00:00,sci-fi,340
200,Black Mirror,2,2024-07-30 15:00:00,sci-fi,140
256,Outer Range,2,2024-07-30 15:00:00,thriller,250
489,Outer Range,2,2024-07-28 15:00:00,sci-fi,170
200,Black Mirror,3,2024-07-29 15:00:00,sci-fi,190
672,Panchayat,2,2024-07-28 15:00:00,comedy,160
672,Outer Range,1,2024-07-25 15:00:00,sci-fi,250
200,Black Mirror,4,2024-07-28 15:00:00,sci-fi,200
844,Westworld,2,2024-07-29 15:00:00,sci-fi,300
672,Black Mirror,5,2024-07-28 15:00:00,sci-fi,150
672,Panchayat,1,2024-07-27 15:00:00,comedy,190

Overwriting series.csv


In [1]:
#Import Libraries
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=8738ae9459ba7b7cb06d1b804999e8290e08641bdcd9c9af22afc8d826207cd9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [96]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *

In [97]:
#Create SparkSession for app (Streaming Analysis)
ls=SparkSession.builder.appName("streaming analysis").getOrCreate()

In [98]:
#Create Dataframe series_df
ser=ls.read.csv("/content/series.csv",header=True)
ser.show()

+-------+-------------+------+-------------------+--------+-------------+
|user_id|       series|season|          timestamp|   genre|duration_mins|
+-------+-------------+------+-------------------+--------+-------------+
|    521|     Mirzapur|     3|2024-07-30 15:00:00|  action|          300|
|    672|    Panchayat|     3|2024-07-30 15:00:00|  comedy|          200|
|    197|   Family Man|     2|2024-07-30 15:00:00|  action|          500|
|    521|     Mirzapur|     2|2024-07-29 15:00:00|  action|          280|
|    211|Queens Gambit|     1|2024-07-30 15:00:00|   drama|          170|
|    521|     Mirzapur|     1|2024-07-28 15:00:00|  action|          230|
|    844|    Westworld|     3|2024-07-30 15:00:00|  sci-fi|          310|
|    672|    Panchayat|     3|2024-07-29 15:00:00|  comedy|          210|
|    256|   Homecoming|     2|2024-07-30 15:00:00|thriller|          310|
|    489|  Outer Range|     1|2024-07-30 15:00:00|  sci-fi|          340|
|    200| Black Mirror|     2|2024-07-

In [99]:
#Find the user with maximum watchtime
ser.select("user_id","duration_mins").sort("duration_mins",ascending=False).limit(1).show()

+-------+-------------+
|user_id|duration_mins|
+-------+-------------+
|    197|          500|
+-------+-------------+



In [100]:
#Calculate overall total Watchtime
ser.agg({"duration_mins":"sum"}).alias("total_time").show()

+------------------+
|sum(duration_mins)|
+------------------+
|            4850.0|
+------------------+



In [101]:
#Find most popular shows (based on watchtime)
#ser.agg(f.max("duration_mins")).show()
ser.select("series","duration_mins").sort("duration_mins",ascending=False).show(1)

+----------+-------------+
|    series|duration_mins|
+----------+-------------+
|Family Man|          500|
+----------+-------------+
only showing top 1 row



In [102]:
#Find most popular shows (based on user popularity)
ser.groupby("series").count().orderBy(f.col("count").desc()).collect()[0][0]

'Outer Range'

In [103]:
#Find the most popular genre
ser.groupby("genre").count().orderBy(f.col("count").desc()).limit(1).show()

+------+-----+
| genre|count|
+------+-----+
|sci-fi|    9|
+------+-----+



In [104]:
#Calculate total watchtime per user
ser.groupby("user_id").agg(f.sum("duration_mins").alias("watch")).show()

+-------+------+
|user_id| watch|
+-------+------+
|    521| 810.0|
|    200| 530.0|
|    672|1160.0|
|    256| 560.0|
|    197| 500.0|
|    211| 170.0|
|    844| 610.0|
|    489| 510.0|
+-------+------+



In [105]:
#Find most popular genre (based on engagement count)
ser.groupby("genre").count().sort("count",ascending=False).show()

+--------+-----+
|   genre|count|
+--------+-----+
|  sci-fi|    9|
|  action|    4|
|  comedy|    4|
|thriller|    2|
|   drama|    1|
+--------+-----+



In [106]:
#Find average watchtime per genre
ser.groupby("genre").agg(f.avg("duration_mins")).show()

+--------+------------------+
|   genre|avg(duration_mins)|
+--------+------------------+
|  action|             327.5|
|   drama|             170.0|
|thriller|             280.0|
|  sci-fi|227.77777777777777|
|  comedy|             190.0|
+--------+------------------+



In [107]:
#Find peak traffic days
#(Output 1 = Full Date)
ser.select(f.to_date("timestamp")).groupby("to_date(timestamp)").count().sort("count",ascending=False).show(1)

#(Output 2 = Only Day)

ser.select(f.dayofmonth("timestamp").alias("timestamp")).groupby("timestamp").count().sort("count",ascending=False).collect()[0][0]

+------------------+-----+
|to_date(timestamp)|count|
+------------------+-----+
|        2024-07-30|    9|
+------------------+-----+
only showing top 1 row



30

In [108]:
#Find the user with most diverse show preference
#ser.groupby("user_id").count().sort("count",ascending=False).show()
ser.groupby("user_id").agg(f.countDistinct("series").alias("ss")).sort("ss",ascending=False).show()

+-------+---+
|user_id| ss|
+-------+---+
|    672|  3|
|    256|  2|
|    521|  1|
|    200|  1|
|    197|  1|
|    211|  1|
|    844|  1|
|    489|  1|
+-------+---+



In [123]:
#Find the binge-watch
b=ser.groupby("user_id","series","timestamp").agg(f.count("season").alias("cnt"))
b.show()

+-------+-------------+-------------------+---+
|user_id|       series|          timestamp|cnt|
+-------+-------------+-------------------+---+
|    200| Black Mirror|2024-07-30 15:00:00|  1|
|    521|     Mirzapur|2024-07-30 15:00:00|  1|
|    489|  Outer Range|2024-07-28 15:00:00|  1|
|    672|    Panchayat|2024-07-29 15:00:00|  1|
|    489|  Outer Range|2024-07-30 15:00:00|  1|
|    197|   Family Man|2024-07-30 15:00:00|  1|
|    672|  Outer Range|2024-07-25 15:00:00|  1|
|    844|    Westworld|2024-07-29 15:00:00|  1|
|    200| Black Mirror|2024-07-28 15:00:00|  1|
|    844|    Westworld|2024-07-30 15:00:00|  1|
|    256|   Homecoming|2024-07-30 15:00:00|  1|
|    200| Black Mirror|2024-07-29 15:00:00|  1|
|    521|     Mirzapur|2024-07-28 15:00:00|  1|
|    521|     Mirzapur|2024-07-29 15:00:00|  1|
|    672|    Panchayat|2024-07-27 15:00:00|  1|
|    672|    Panchayat|2024-07-30 15:00:00|  1|
|    672| Black Mirror|2024-07-28 15:00:00|  1|
|    256|  Outer Range|2024-07-30 15:00:

In [130]:
c=ser.groupby("series").agg({"season":"max"})
c.show()

+-------------+-----------+
|       series|max(season)|
+-------------+-----------+
| Black Mirror|          5|
|   Family Man|          2|
|   Homecoming|          2|
|     Mirzapur|          3|
|  Outer Range|          2|
|    Panchayat|          3|
|Queens Gambit|          1|
|    Westworld|          3|
+-------------+-----------+



In [147]:
d=b.join(c,'series','inner')
d.show()

+-------------+-------+-------------------+---+-----------+
|       series|user_id|          timestamp|cnt|max(season)|
+-------------+-------+-------------------+---+-----------+
| Black Mirror|    200|2024-07-30 15:00:00|  1|          5|
|     Mirzapur|    521|2024-07-30 15:00:00|  1|          3|
|  Outer Range|    489|2024-07-28 15:00:00|  1|          2|
|    Panchayat|    672|2024-07-29 15:00:00|  1|          3|
|  Outer Range|    489|2024-07-30 15:00:00|  1|          2|
|   Family Man|    197|2024-07-30 15:00:00|  1|          2|
|  Outer Range|    672|2024-07-25 15:00:00|  1|          2|
|    Westworld|    844|2024-07-29 15:00:00|  1|          3|
| Black Mirror|    200|2024-07-28 15:00:00|  1|          5|
|    Westworld|    844|2024-07-30 15:00:00|  1|          3|
|   Homecoming|    256|2024-07-30 15:00:00|  1|          2|
| Black Mirror|    200|2024-07-29 15:00:00|  1|          5|
|     Mirzapur|    521|2024-07-28 15:00:00|  1|          3|
|     Mirzapur|    521|2024-07-29 15:00:

In [148]:
d.filter(d["cnt"]==d["max(season)"]).show()

+-------------+-------+-------------------+---+-----------+
|       series|user_id|          timestamp|cnt|max(season)|
+-------------+-------+-------------------+---+-----------+
|Queens Gambit|    211|2024-07-30 15:00:00|  1|          1|
+-------------+-------+-------------------+---+-----------+



In [111]:
#Find the user with longest watching streak
#ser.select(f.max("duration_mins")).show()
ser.select("user_id","duration_mins").sort("duration_mins",ascending=False).collect()[0][0]

'197'

In [112]:
#Total Seasons available
ser.groupby("series").agg(f.max("season").alias("tot_cou")).agg(f.sum("tot_cou")).show()

+------------+
|sum(tot_cou)|
+------------+
|        21.0|
+------------+



In [113]:
#Fetch a list of all series
a=ser.select("series").distinct()
a.show()

+-------------+
|       series|
+-------------+
|  Outer Range|
| Black Mirror|
|    Westworld|
|   Family Man|
|   Homecoming|
|     Mirzapur|
|    Panchayat|
|Queens Gambit|
+-------------+



In [114]:
a.rdd.flatMap(lambda x:x).collect()

['Outer Range',
 'Black Mirror',
 'Westworld',
 'Family Man',
 'Homecoming',
 'Mirzapur',
 'Panchayat',
 'Queens Gambit']