<a href="https://colab.research.google.com/github/Mr-Hackrr/pySpark/blob/main/movies%20pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%writefile movies.csv
user_id,series,season,timestamp,genre,duration_mins
521,”Mirzapur”,3,2024-07-30 15:00:00,action,300
672,”Panchayat”,3,2024-07-30 15:00:00,comedy,200
197,”Family Man”,2,2024-07-30 15:00:00,action,500
521,”Mirzapur”,2,2024-07-29 15:00:00,action,280
211,”Queens Gambit”,1,2024-07-30 15:00:00,drama,170
521,”Mirzapur”,1,2024-07-28 15:00:00,action,230
844,”Westworld”,3,2024-07-30 15:00:00,sci-fi,310
672,”Panchayat”,3,2024-07-29 15:00:00,comedy,210
256,”Homecoming”,2,2024-07-30 15:00:00,thriller,310
489,”Outer Range”,1,2024-07-30 15:00:00,sci-fi,340
200,”Black Mirror”,2,2024-07-30 15:00:00,sci-fi,140
256,”Outer Range”,2,2024-07-30 15:00:00,thriller,250
489,”Outer Range”,2,2024-07-28 15:00:00,sci-fi,170
200,”Black Mirror”,3,2024-07-29 15:00:00,sci-fi,190
672,”Panchayat”,2,2024-07-28 15:00:00,comedy,160
672,”Outer Range”,1,2024-07-25 15:00:00,sci-fi,250
200,”Black Mirror”,4,2024-07-28 15:00:00,sci-fi,200
844,”Westworld”,2,2024-07-29 15:00:00,sci-fi,300
672,”Black Mirror”,5,2024-07-28 15:00:00,sci-fi,150
672,”Panchayat”,1,2024-07-27 15:00:00,comedy,190

Writing movies.csv


In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=cdfb3b05867188e05dd96bedbd5f0c47df34f371504cc1044afdca89d328e27f
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
#Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [4]:
#Create SparkSession for app (Streaming Analysis)
spark = SparkSession.builder.appName('Movie analysis').getOrCreate()

In [5]:
#Create Dataframe series_df
moviesdf = spark.read.csv("movies.csv",header=True)

In [6]:
moviesdf.show()

+-------+---------------+------+-------------------+--------+-------------+
|user_id|         series|season|          timestamp|   genre|duration_mins|
+-------+---------------+------+-------------------+--------+-------------+
|    521|     ”Mirzapur”|     3|2024-07-30 15:00:00|  action|          300|
|    672|    ”Panchayat”|     3|2024-07-30 15:00:00|  comedy|          200|
|    197|   ”Family Man”|     2|2024-07-30 15:00:00|  action|          500|
|    521|     ”Mirzapur”|     2|2024-07-29 15:00:00|  action|          280|
|    211|”Queens Gambit”|     1|2024-07-30 15:00:00|   drama|          170|
|    521|     ”Mirzapur”|     1|2024-07-28 15:00:00|  action|          230|
|    844|    ”Westworld”|     3|2024-07-30 15:00:00|  sci-fi|          310|
|    672|    ”Panchayat”|     3|2024-07-29 15:00:00|  comedy|          210|
|    256|   ”Homecoming”|     2|2024-07-30 15:00:00|thriller|          310|
|    489|  ”Outer Range”|     1|2024-07-30 15:00:00|  sci-fi|          340|
|    200| ”B

In [7]:
#Find the user with maximum watchtime
#moviesdf.filter(moviesdf.duration_mins == moviesdf.duration_mins.max())
moviesdf.groupBy('user_id').agg(sum('duration_mins')).sort(col('sum(duration_mins)').desc()).first()[0]

'672'

In [8]:
#Calculate overall total Watchtime
moviesdf.agg({'duration_mins':'sum'}).collect()[0][0]

4850.0

In [9]:
#Find most popular shows (based on watchtime)
popular = moviesdf.groupBy('series').agg({'duration_mins':'sum'}).orderBy(col('sum(duration_mins)').desc()).limit(3)
popular.rdd.flatMap(lambda x : [x[0]]).collect()

['”Outer Range”', '”Mirzapur”', '”Panchayat”']

In [10]:
#Find most popular shows (based on user popularity)
user_popularity = moviesdf.groupBy('series').agg({'user_id':'count'}).orderBy(col('count(user_id)').desc()).limit(3)
user_popularity.rdd.flatMap(lambda x : [x[0]]).collect()

['”Outer Range”', '”Panchayat”', '”Black Mirror”']

In [11]:
#Find the most popular genre
genre = moviesdf.groupBy('genre').agg(count('user_id'))
genre.rdd.flatMap(lambda x : [x[0]]).collect()[0]

'action'

In [12]:
#Calculate total watchtime per user
moviesdf.groupBy('user_id').agg(sum('duration_mins')).select(col("user_id"),col("sum(duration_mins)").alias('total_watch_time')).show()

+-------+----------------+
|user_id|total_watch_time|
+-------+----------------+
|    521|           810.0|
|    200|           530.0|
|    672|          1160.0|
|    256|           560.0|
|    197|           500.0|
|    211|           170.0|
|    844|           610.0|
|    489|           510.0|
+-------+----------------+



In [13]:
#Find most popular genre (based on engagement count)


In [21]:
# Find most popular genre (based on total watch time)
genre_engagement = moviesdf.groupBy('genre').agg(sum('duration_mins').alias('total_watch_time')) \
                          .orderBy(col('total_watch_time').desc())

# Extract the most popular genre
most_popular_genre = genre_engagement.first()[0]

print(f"Most popular genre based on engagement: {most_popular_genre}")


Most popular genre based on engagement: sci-fi


In [32]:
#Find average watchtime per genre
moviesdf.groupBy('genre').agg(avg('duration_mins').alias('avg_duration')).withColumn('avg_duration',round(col("avg_duration"),2)).show()

+--------+------------+
|   genre|avg_duration|
+--------+------------+
|  action|       327.5|
|   drama|       170.0|
|thriller|       280.0|
|  sci-fi|      227.78|
|  comedy|       190.0|
+--------+------------+



In [50]:
# prompt: #Find peak traffic days
# #(Output 1 = Full Date) print first date print like yyyy-mm-dd

from pyspark.sql.functions import to_date

# Assuming 'timestamp' column is a string representing date and time
moviesdf_with_date = moviesdf.withColumn('date', to_date(col('timestamp')))

# Find peak traffic days
peak_traffic_days = moviesdf_with_date.groupBy('date').agg(count('user_id').alias('user_count')) \
                                    .orderBy(col('user_count').desc())

# Extract the first peak traffic day (full date)
first_peak_day = peak_traffic_days.first()[0]

print(f"First peak traffic day (full date): {first_peak_day.strftime('%Y-%m-%d')}")


First peak traffic day (full date): 2024-07-30


In [47]:
#(Output 2 = Only Day)
moviesdf = moviesdf.withColumn('day',dayofmonth("timestamp"))
daydf = moviesdf.groupBy('day').agg(count('day').alias("cnt_day")).orderBy(col('cnt_day').desc()).limit(1)
daydf.rdd.flatMap(lambda x : [x[0]]).collect()[0]

30

In [45]:
#Find the user with most diverse show preference
moviesdf.groupBy('user_id').agg(countDistinct('genre').alias('cnt')).orderBy(col('cnt').desc()).first()[0]

'672'

In [58]:
moviesdf.show()

+-------+---------------+------+-------------------+--------+-------------+----------+---+
|user_id|         series|season|          timestamp|   genre|duration_mins|      date|day|
+-------+---------------+------+-------------------+--------+-------------+----------+---+
|    521|     ”Mirzapur”|     3|2024-07-30 15:00:00|  action|          300|2024-07-30| 30|
|    672|    ”Panchayat”|     3|2024-07-30 15:00:00|  comedy|          200|2024-07-30| 30|
|    197|   ”Family Man”|     2|2024-07-30 15:00:00|  action|          500|2024-07-30| 30|
|    521|     ”Mirzapur”|     2|2024-07-29 15:00:00|  action|          280|2024-07-29| 29|
|    211|”Queens Gambit”|     1|2024-07-30 15:00:00|   drama|          170|2024-07-30| 30|
|    521|     ”Mirzapur”|     1|2024-07-28 15:00:00|  action|          230|2024-07-28| 28|
|    844|    ”Westworld”|     3|2024-07-30 15:00:00|  sci-fi|          310|2024-07-30| 30|
|    672|    ”Panchayat”|     3|2024-07-29 15:00:00|  comedy|          210|2024-07-29| 29|

In [63]:
#Find the binge-watchers
moviesdf.groupBy('user_id','series','date').agg(count('season')).show()

+-------+---------------+----------+-------------+
|user_id|         series|      date|count(season)|
+-------+---------------+----------+-------------+
|    672|    ”Panchayat”|2024-07-30|            1|
|    672|    ”Panchayat”|2024-07-28|            1|
|    211|”Queens Gambit”|2024-07-30|            1|
|    521|     ”Mirzapur”|2024-07-29|            1|
|    200| ”Black Mirror”|2024-07-29|            1|
|    256|  ”Outer Range”|2024-07-30|            1|
|    200| ”Black Mirror”|2024-07-28|            1|
|    200| ”Black Mirror”|2024-07-30|            1|
|    256|   ”Homecoming”|2024-07-30|            1|
|    672|    ”Panchayat”|2024-07-29|            1|
|    489|  ”Outer Range”|2024-07-28|            1|
|    521|     ”Mirzapur”|2024-07-28|            1|
|    672|    ”Panchayat”|2024-07-27|            1|
|    197|   ”Family Man”|2024-07-30|            1|
|    844|    ”Westworld”|2024-07-30|            1|
|    672|  ”Outer Range”|2024-07-25|            1|
|    844|    ”Westworld”|2024-0

In [60]:
#Find the user with longest watching streak


In [70]:
#Total Seasons available
series_df = moviesdf.groupBy('series').agg(max('season').alias('no_of_seasons'))
series_df.show()

+---------------+-------------+
|         series|no_of_seasons|
+---------------+-------------+
| ”Black Mirror”|            5|
|   ”Family Man”|            2|
|   ”Homecoming”|            2|
|     ”Mirzapur”|            3|
|  ”Outer Range”|            2|
|    ”Panchayat”|            3|
|”Queens Gambit”|            1|
|    ”Westworld”|            3|
+---------------+-------------+



In [82]:
moviesdf.show()

AttributeError: 'NoneType' object has no attribute 'show'

In [83]:
pattern = r"([a-zA-Z\s]+)"
moviesdf = moviesdf.withColumn('series',regexp_extract('series',pattern,0))

AttributeError: 'NoneType' object has no attribute 'withColumn'

AttributeError: 'NoneType' object has no attribute 'show'

In [71]:
#Fetch a list of all series
from pyspark.sql.window import Window

In [None]:
moviesdf.groupBy('series','date')