<a href="https://colab.research.google.com/github/Taqikhan99/ConsoleApp1/blob/master/%5BNAME%5D_%5BID%5D_ADS2_Assignment_2_Data_Handling_and_Processing_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ADS2 - Assignment 2 - Data Handling and Processing with PySpark

In this assignment, you will be analysing the popularity of films and TV shows on the streaming platform, Netflix. Using your knowledge of PySpark DataFrames and Spark SQL, you will produce a number of "downstream" data products to analyse trends in global streaming habits.

Download the dataset from this [Kaggle](https://www.kaggle.com/dhruvildave/netflix-top-10-tv-shows-and-films) page. A copy of the `all_weeks_countries.csv` file is also available on the canvas page for this assignment.

Your task is to load in the data and produce a number of "downstream" data products and plots as described below.

The PySpark installation and setup is provided below for conveinience.

In [1]:
# Apache Spark uses Java, so first we must install that
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# get and unpack Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz



In [3]:
!tar xzf /content/spark-3.3.0-bin-hadoop3.tgz

In [4]:
# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

In [5]:
# Install findspark, which helps python locate the psyspark module files
!pip install -q findspark
import findspark
findspark.init()

In [6]:
# Finally, we initialse a "SparkSession", which handles the computations
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Exercise 1 - Data Preparation (5 Marks)


1.   Load the `all_weeks_countries.csv` into your Colab Notebook as a DataFrame.
2.   Create two separate DataFrames for Films and TV.
3.   For the Films data, drop the column containing the season names.
4.   For the TV data, replace any null values in the season name column with the show name.

In [7]:
# pyspark.sql.functions countains all the transformations and actions you will
# need
from pyspark.sql import functions as F
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
### Load the all_weeks_countries.csv into your Colab Notebook as a DataFrame.

df=spark.read.csv('/content/drive/MyDrive/csvFiles/all-weeks-countries.csv',header=True,inferSchema=True)

In [9]:
### Display the table and its schema
df.show(10)
df.printSchema()

+------------+------------+-------------------+--------+-----------+--------------------+------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|        null|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|        null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|        null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|        null|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          5|The Angry Birds M...| 

In [10]:
### Seperate the data into two DataFrames for Films and TV

# df1 for films and df2 for Tv
df1=df.filter(df.category=="Films")
df2=df.filter(df.category=="TV")

df1.show()
df2.show()

+------------+------------+-------------------+--------+-----------+--------------------+------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|        null|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|        null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|        null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|        null|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          5|The Angry Birds M...| 

In [None]:
### Drop the 'season_title' column from the Films DataFrame, display the table
df1=df1.drop('season_title')
df1.show()


+------------+------------+-------------------+--------+-----------+--------------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          5|The Angry Birds M...|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|        

In [None]:
### Replace any null values in 'season_name' in the TV DataFrame with the
### corresponding 'show_name', display the table

# using coalesce we can replace null with other column value

    
df2=df2.withColumn("season_title",F.coalesce(df2.season_title,df2.show_title))
df2.show()

+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          1| Pasión de Gavilanes|Pasión de Gavilan...|                         5|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          2|        Another Self|Another Self: Sea...|                         4|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          3| Pasión de Gavilanes|Pasión de Gavilan...|                        58|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          4|            Manifest|  Manifest: Season 1|                         6|
|   Argentina|          AR|2022-08

# Exercise 2 - Most popular films by country (10 Marks)

1.   For each country in the data, find the highest rank each film reached, and the number of weeks it spent in the top 10. (4 marks)
2.   Produce a DataFrame that contains the film which spent the most time in each country's top 10. (4 marks)
3.   Produce a pie chart which shows how many countries the most popular films occurred in. (2 marks)


In [None]:
### Group the data by 'country_name' and 'show_title' and find the highest
### weekly rank and total weeks spent in the top 10 for each film, display the
### data
# .groupBy, .agg, F.max, F.min, .sort, .show



df1.groupBy("country_name","show_title") \
    .agg(
        F.max("weekly_rank").alias("Highest Weekly Rank"), \
    F.max("cumulative_weeks_in_top_10").alias("Total Weeks in top 10") 
    ).show(100,truncate=False)




+------------------+-----------------------------------------+-------------------+---------------------+
|country_name      |show_title                               |Highest Weekly Rank|Total Weeks in top 10|
+------------------+-----------------------------------------+-------------------+---------------------+
|Australia         |Bloodshot                                |2                  |2                    |
|Australia         |Traffik                                  |9                  |1                    |
|Australia         |Godzilla vs. Kong                        |3                  |2                    |
|Bahamas           |The Healer                               |6                  |1                    |
|Bahamas           |The Hurt Locker                          |8                  |1                    |
|Bahrain           |Spiderhead                               |3                  |2                    |
|Bahrain           |Against The Ice                    

In [None]:
df1.filter((df1.country_name=="Australia")&(df.show_title=="Bloodshot")).show()

+------------+------------+-------------------+--------+-----------+----------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|show_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+----------+--------------------------+
|   Australia|          AU|2022-04-10 00:00:00|   Films|          2| Bloodshot|                         2|
|   Australia|          AU|2022-04-03 00:00:00|   Films|          1| Bloodshot|                         1|
+------------+------------+-------------------+--------+-----------+----------+--------------------------+



Produce a DataFrame which shows, for each country, the most popular film by the number of weeks it spent in the top 10, how many weeks that film was in the top 10, and the highest weekly rank it reached.

In [20]:
### Find the film that spent the most time in each country's top 10 list.
### Produce and display a table showing that information, along with the number
### of weeks spent in the top 10, and the highest weekly ranking

# .groupBy, .sort, .desc, .agg, F.first, .show

# grouped_data = df.groupBy(['country_name', 'show_title']).agg(F.max('cumulative_weeks_in_top_10').alias('total_weeks_in_top_10'))
# grouped_data.show()

# most_popular_movies = grouped_data.groupBy('country_name').agg(
#     {'show_title': 'first', 'total_weeks_in_top_10':'max'}
# )
# most_popular_movies.show()
# df1.groupBy("country_name").count().show()
# df.groupBy("country_name").agg(
#     F.first("show_title").alias("Show_title"),
#     F.max("cumulative_weeks_in_top_10").alias("Weeks_in_top_10"),
#     F.max("weekly_rank").alias("weekly_rank")
# ).sort(F.col("country_name").asc(), F.col("Weeks_in_top_10").desc()).show()
dfTopFilm=df1.groupby("country_name").agg(
    F.max(F.struct("cumulative_weeks_in_top_10", "show_title","weekly_rank")).alias("top_movie"),  
)

dfTopFilm=dfTopFilm.select("country_name","top_movie.show_title","top_movie.weekly_rank","top_movie.cumulative_weeks_in_top_10")
dfTopFilm.show(truncate=False)



+------------------+-------------------------------------+-----------+--------------------------+
|country_name      |show_title                           |weekly_rank|cumulative_weeks_in_top_10|
+------------------+-------------------------------------+-----------+--------------------------+
|Argentina         |Sonic the Hedgehog                   |9          |7                         |
|Australia         |Back to the Outback                  |10         |8                         |
|Austria           |Harry Potter and the Sorcerer's Stone|10         |9                         |
|Bahamas           |A Madea Homecoming                   |9          |9                         |
|Bahrain           |Red Notice                           |9          |8                         |
|Bangladesh        |Sooryavanshi                         |9          |21                        |
|Belgium           |The Adam Project                     |10         |8                         |
|Bolivia           |

In [12]:
df1.filter((df1.country_name=="Bahrain")&(df1.show_title=="Red Notice")).show()
# df1.filter((df1.country_name=="Australia")).sort(df1.cumulative_weeks_in_top_10.desc()).show()

+------------+------------+-------------------+--------+-----------+----------+------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|show_title|season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+----------+------------+--------------------------+
|     Bahrain|          BH|2022-01-02 00:00:00|   Films|          9|Red Notice|        null|                         8|
|     Bahrain|          BH|2021-12-26 00:00:00|   Films|          8|Red Notice|        null|                         7|
|     Bahrain|          BH|2021-12-19 00:00:00|   Films|          5|Red Notice|        null|                         6|
|     Bahrain|          BH|2021-12-12 00:00:00|   Films|          3|Red Notice|        null|                         5|
|     Bahrain|          BH|2021-12-05 00:00:00|   Films|          2|Red Notice|        null|                         4|
|     Bahrain|          BH|2021-11-28 00

Group the DataFrame by `season_title` and count the number of countries where each season is the most popular. Convert the DataFrame to Pandas, and use the `DataFrame.plot.pie` method to produce a pie chart of the most popular shows.

In [15]:
### Convert the DataFrame to a Pandas DataFrame, and display it as a pie chart
# .toPandas, .plot.pie

count_df = df.groupBy("season_title").agg(
    
    F.first("weekly_rank").alias("weekly_rank"),
    F.count(F.when( F.col("weekly_rank")==1,F.col("weekly_rank")))
).filter(F.col("weekly_rank")==5).filter( F.col("season_title").isNotNull())

count_df.orderBy(count_df.season_title).show(100,truncate=False)
# pandasDF = 
# pandasDF.plot.pie(y='MSFT', figsize=(9,6))

+-----------------------------------------------------------+-----------+-------------------------------------------------------+
|season_title                                               |weekly_rank|count(CASE WHEN (weekly_rank = 1) THEN weekly_rank END)|
+-----------------------------------------------------------+-----------+-------------------------------------------------------+
|Animal Kingdom: Season 6                                   |5          |0                                                      |
|Bagman: Season 1                                           |5          |0                                                      |
|Blood Sisters: Limited Series                              |5          |6                                                      |
|Bulgasal: Immortal Souls: Season 1                         |5          |0                                                      |
|Café Minamdang: Season 1                                   |5          |0                

In [None]:
df.filter((df.season_title=="2020: Season 1")).show()

+------------+------------+-------------------+--------+-----------+----------+--------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|show_title|  season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+----------+--------------+--------------------------+
|      Jordan|          JO|2022-07-10 00:00:00|      TV|         10|      2020|2020: Season 1|                         3|
|      Jordan|          JO|2022-06-26 00:00:00|      TV|         10|      2020|2020: Season 1|                         2|
|      Jordan|          JO|2022-06-19 00:00:00|      TV|         10|      2020|2020: Season 1|                         1|
|      Kuwait|          KW|2022-08-07 00:00:00|      TV|          9|      2020|2020: Season 1|                         8|
|      Kuwait|          KW|2022-07-31 00:00:00|      TV|         10|      2020|2020: Season 1|                         7|
|      Kuwait|          

# Exercise 3 - Most popular shows by country (10 Marks)


1.   Calculate the number of weeks each TV show spent in each country's top 10, across all seasons, such that you don't double count a show when two or more seasons appear in one week. (5 marks)
2.   Produce a DataFrame that contains the show which spent the most time in each country's top 10. (5 marks)

In [None]:
### Group the data by 'country_name' and 'show_title' and count the number of
### weeks each show appears in each country's top 10 lists, as well as the
### highest weekly ranking. Display the data.
# .groupBy, .agg, F.count_distinct, F.min, .sort, .show
groupedDf=df2.groupBy("country_name","show_title").agg(
    F.count_distinct(df.cumulative_weeks_in_top_10),
    F.min(df2.weekly_rank)
)


groupedDf.orderBy(groupedDf.show_title).show(30,truncate=False)



+-------------+-------------------+---------------------------------+----------------+
|country_name |show_title         |count(cumulative_weeks_in_top_10)|min(weekly_rank)|
+-------------+-------------------+---------------------------------+----------------+
|South Korea  |18 Again           |3                                |4               |
|Philippines  |2 Good 2 Be True   |15                               |1               |
|Kuwait       |2020               |8                                |7               |
|Lebanon      |2020               |5                                |8               |
|Saudi Arabia |2020               |2                                |9               |
|Jordan       |2020               |3                                |10              |
|Poland       |42 Days of Darkness|1                                |10              |
|Uruguay      |42 Days of Darkness|2                                |6               |
|Chile        |42 Days of Darkness|4       

In [None]:
df2.filter((df2.country_name=="Philippines")&(df2.show_title=="2 Good 2 Be True")).show(20)

+------------+------------+-------------------+--------+-----------+----------------+----------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|      show_title|    season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+----------------+----------------+--------------------------+
| Philippines|          PH|2022-08-21 00:00:00|      TV|          2|2 Good 2 Be True|2 Good 2 Be True|                        15|
| Philippines|          PH|2022-08-14 00:00:00|      TV|          2|2 Good 2 Be True|2 Good 2 Be True|                        14|
| Philippines|          PH|2022-08-07 00:00:00|      TV|          2|2 Good 2 Be True|2 Good 2 Be True|                        13|
| Philippines|          PH|2022-07-31 00:00:00|      TV|          2|2 Good 2 Be True|2 Good 2 Be True|                        12|
| Philippines|          PH|2022-07-24 00:00:00|      TV|          2|2 Good 2 Be True|2 Goo

In [None]:
### Find the show that spent the most time in each country's top 10 list.
### Produce and display a table showing that information, along with the number
### of weeks spent in the top 10, and the highest weekly ranking
# .groupBy, .sort, .desc, .agg, F.first, .show


df2.groupby("country_name").agg(
    
    F.max(df2.cumulative_weeks_in_top_10),
    F.max(df2.weekly_rank)
).sort(df2.country_name.asc()).show(10)

# df1.filter((df1.country_name=="Australia")).describe().show()
# df2.filter((df2.country_name=="Australia")).describe().show()
df2.filter((df2.country_name=="Belgium")).sort(df2.cumulative_weeks_in_top_10.desc()).show(5)

+------------+-------------------------------+----------------+
|country_name|max(cumulative_weeks_in_top_10)|max(weekly_rank)|
+------------+-------------------------------+----------------+
|   Argentina|                             58|              10|
|   Australia|                             13|              10|
|     Austria|                             14|              10|
|     Bahamas|                             33|              10|
|     Bahrain|                             16|              10|
|  Bangladesh|                             22|              10|
|     Belgium|                             13|              10|
|     Bolivia|                             57|              10|
|      Brazil|                             51|              10|
|    Bulgaria|                             16|              10|
+------------+-------------------------------+----------------+
only showing top 10 rows

+------------+------------+-------------------+--------+-----------+----------

In [None]:
# group by country and film name and calculate the sum of weeks in top 10 list
grouped_data = df2.groupBy(['country_name', 'show_title']).agg(
    F.max('cumulative_weeks_in_top_10').alias('total_weeks_in_top_10'),
    F.max('weekly_rank').alias('weekly_rank')
    )
from pyspark.sql.window import Window

window = Window.partitionBy('country_name').orderBy(F.desc('total_weeks_in_top_10'))
most_popular_movies = grouped_data.withColumn('row', F.row_number().over(window)).filter('row == 1').drop('row')
# most_popular_movies = grouped_data.withColumn('weekly_rank', F.row_number().over(window))
most_popular_movies.show()
# most_popular_movies = grouped_data.groupBy('country_name').agg(F.max('total_weeks_in_top_10').alias('max_weeks_in_top_10'),F.max('weekly_rank').alias('weekly_rank'))
# for row in most_popular_movies.collect():
#     country = row['country_name']
#     max_weeks = row['max_weeks_in_top_10']
#     weekRank=row["weekly_rank"]
#     most_popular_movie = grouped_data.filter((grouped_data.country_name == country) & (grouped_data.total_weeks_in_top_10 == max_weeks)).select('country_name', 'show_title', 'total_weeks_in_top_10').first()
#     print(country, most_popular_movie.show_title, max_weeks,weekRank)


+------------------+--------------------+---------------------+-----------+
|      country_name|          show_title|total_weeks_in_top_10|weekly_rank|
+------------------+--------------------+---------------------+-----------+
|         Argentina| Pasión de Gavilanes|                   58|         10|
|         Australia|     Stranger Things|                   13|         10|
|           Austria|     The Good Doctor|                   14|         10|
|           Bahamas|SpongeBob SquareP...|                   33|         10|
|           Bahrain|          Squid Game|                   16|         10|
|        Bangladesh|          Squid Game|                   22|         10|
|           Belgium|     Stranger Things|                   13|          9|
|           Bolivia| Pasión de Gavilanes|                   57|         10|
|            Brazil|         Chiquititas|                   51|         10|
|          Bulgaria|          Squid Game|                   16|          8|
|           