# ADS2 - Assignment 2 - Data Handling and Processing with PySpark

In this assignment, you will be analysing the popularity of films and TV shows on the streaming platform, Netflix. Using your knowledge of PySpark DataFrames and Spark SQL, you will produce a number of "downstream" data products to analyse trends in global streaming habits.

Download the dataset from this [Kaggle](https://www.kaggle.com/dhruvildave/netflix-top-10-tv-shows-and-films) page. A copy of the `all_weeks_countries.csv` file is also available on the canvas page for this assignment.

Your task is to load in the data and produce a number of "downstream" data products and plots as described below.

The PySpark installation and setup is provided below for conveinience.

In [None]:
# Apache Spark uses Java, so first we must install that
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Mount Google Drive and unpack Spark
from google.colab import drive
drive.mount('/content/drive')
!tar xzf /content/drive/MyDrive/spark-3.3.0-bin-hadoop3.tgz

In [None]:
# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

In [None]:
# Install findspark, which helps python locate the psyspark module files
!pip install -q findspark
import findspark
findspark.init()

In [None]:
# Finally, we initialse a "SparkSession", which handles the computations
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Exercise 1 - Data Preparation (5 Marks)


1.   Load the `all_weeks_countries.csv` into your Colab Notebook as a DataFrame.
2.   Create two separate DataFrames for Films and TV.
3.   For the Films data, drop the column containing the season names.
4.   For the TV data, replace any null values in the season name column with the show name.

In [2]:
# pyspark.sql.functions countains all the transformations and actions you will
# need
from pyspark.sql import functions as F

In [53]:
import pandas as pd
import numpy as np

In [44]:
df=pd.read_csv("all-weeks-countries.csv")

In [52]:
df["season_title"].isnull()

0          True
1          True
2          True
3          True
4          True
          ...  
112295    False
112296    False
112297    False
112298    False
112299    False
Name: season_title, Length: 112300, dtype: bool

In [64]:
type(np.nan)

float

In [68]:
df.apply(lambda x: x["show_title"] if type(x["season_title"])==np.float else x["season_title"],axis=1)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  df.apply(lambda x: x["show_title"] if type(x["season_title"])==np.float else x["season_title"],axis=1)


0                             Look Both Ways
1                                  Day Shift
2         Bank Robbers: The Last Great Heist
3                          The Next 365 Days
4                    The Angry Birds Movie 2
                         ...                
112295                  Reply 1988: Season 1
112296               Nevertheless,: Season 1
112297           Too Hot to Handle: Season 2
112298          Record of Ragnarok: Season 1
112299        Crash Landing on You: Season 1
Length: 112300, dtype: object

In [57]:
df

Unnamed: 0,country_name,country_iso2,week,category,weekly_rank,show_title,season_title,cumulative_weeks_in_top_10,A
0,Argentina,AR,2022-08-21,Films,1,Look Both Ways,,1,
1,Argentina,AR,2022-08-21,Films,2,Day Shift,,2,
2,Argentina,AR,2022-08-21,Films,3,Bank Robbers: The Last Great Heist,,2,
3,Argentina,AR,2022-08-21,Films,4,The Next 365 Days,,1,
4,Argentina,AR,2022-08-21,Films,5,The Angry Birds Movie 2,,1,
...,...,...,...,...,...,...,...,...,...
112295,Vietnam,VN,2021-07-04,TV,6,Reply 1988,Reply 1988: Season 1,1,Reply 1988: Season 1
112296,Vietnam,VN,2021-07-04,TV,7,"Nevertheless,","Nevertheless,: Season 1",1,"Nevertheless,: Season 1"
112297,Vietnam,VN,2021-07-04,TV,8,Too Hot to Handle,Too Hot to Handle: Season 2,1,Too Hot to Handle: Season 2
112298,Vietnam,VN,2021-07-04,TV,9,Record of Ragnarok,Record of Ragnarok: Season 1,1,Record of Ragnarok: Season 1


In [69]:
### Load the all_weeks_countries.csv into your Colab Notebook as a DataFrame.
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

# Read CSV file
df = spark.read.csv("all-weeks-countries.csv", header=True, inferSchema=True)

# Show the first 20 rows
df.show(20)


+------------+------------+----------+--------+-----------+--------------------+--------------------+--------------------------+
|country_name|country_iso2|      week|category|weekly_rank|          show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+----------+--------+-----------+--------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21|   Films|          1|      Look Both Ways|                null|                         1|
|   Argentina|          AR|2022-08-21|   Films|          2|           Day Shift|                null|                         2|
|   Argentina|          AR|2022-08-21|   Films|          3|Bank Robbers: The...|                null|                         2|
|   Argentina|          AR|2022-08-21|   Films|          4|   The Next 365 Days|                null|                         1|
|   Argentina|          AR|2022-08-21|   Films|          5|The Angry Birds M...|                n

In [70]:
### Display the table and its schema
df.printSchema()
# Display the table
df.show()

root
 |-- country_name: string (nullable = true)
 |-- country_iso2: string (nullable = true)
 |-- week: date (nullable = true)
 |-- category: string (nullable = true)
 |-- weekly_rank: integer (nullable = true)
 |-- show_title: string (nullable = true)
 |-- season_title: string (nullable = true)
 |-- cumulative_weeks_in_top_10: integer (nullable = true)

+------------+------------+----------+--------+-----------+--------------------+--------------------+--------------------------+
|country_name|country_iso2|      week|category|weekly_rank|          show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+----------+--------+-----------+--------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21|   Films|          1|      Look Both Ways|                null|                         1|
|   Argentina|          AR|2022-08-21|   Films|          2|           Day Shift|                null|                         2

In [71]:
df.columns

['country_name',
 'country_iso2',
 'week',
 'category',
 'weekly_rank',
 'show_title',
 'season_title',
 'cumulative_weeks_in_top_10']

In [72]:
### Seperate the data into two DataFrames for Films and TV
films_df = df.filter(df["category"] == "Films")
tv_df = df.filter(df["category"] == "TV")

# Display the two DataFrames
films_df.show()
tv_df.show()

+------------+------------+----------+--------+-----------+--------------------+------------+--------------------------+
|country_name|country_iso2|      week|category|weekly_rank|          show_title|season_title|cumulative_weeks_in_top_10|
+------------+------------+----------+--------+-----------+--------------------+------------+--------------------------+
|   Argentina|          AR|2022-08-21|   Films|          1|      Look Both Ways|        null|                         1|
|   Argentina|          AR|2022-08-21|   Films|          2|           Day Shift|        null|                         2|
|   Argentina|          AR|2022-08-21|   Films|          3|Bank Robbers: The...|        null|                         2|
|   Argentina|          AR|2022-08-21|   Films|          4|   The Next 365 Days|        null|                         1|
|   Argentina|          AR|2022-08-21|   Films|          5|The Angry Birds M...|        null|                         1|
|   Argentina|          AR|2022-

In [73]:
### Drop the 'season_title' column from the Films DataFrame, display the table

films_df = films_df.drop("season_title")

# Display the updated Films DataFrame
films_df.show()

+------------+------------+----------+--------+-----------+--------------------+--------------------------+
|country_name|country_iso2|      week|category|weekly_rank|          show_title|cumulative_weeks_in_top_10|
+------------+------------+----------+--------+-----------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21|   Films|          1|      Look Both Ways|                         1|
|   Argentina|          AR|2022-08-21|   Films|          2|           Day Shift|                         2|
|   Argentina|          AR|2022-08-21|   Films|          3|Bank Robbers: The...|                         2|
|   Argentina|          AR|2022-08-21|   Films|          4|   The Next 365 Days|                         1|
|   Argentina|          AR|2022-08-21|   Films|          5|The Angry Birds M...|                         1|
|   Argentina|          AR|2022-08-21|   Films|          6|       Purple Hearts|                         4|
|   Argentina|          AR|2

In [103]:
null_count = TV.filter(TV["season_title"].isNull()).count()

In [104]:
null_count

0

+------------+------------+----------+--------+-----------+-------------------+--------------------+--------------------------+
|country_name|country_iso2|      week|category|weekly_rank|         show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+----------+--------+-----------+-------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21|      TV|          1|Pasión de Gavilanes|Pasión de Gavilan...|                         5|
+------------+------------+----------+--------+-----------+-------------------+--------------------+--------------------------+
only showing top 1 row



In [102]:
# Save the table in CSV format


In [106]:
### Replace any null values in 'season_name' in the TV DataFrame with the
### corresponding 'show_name', display the table

#from pyspark.sql.functions import coalesce

# Replace null values in 'season_name' with 'show_name'
tv_df = tv_df.withColumn('season_title', F.coalesce(tv_df['season_title'], tv_df['show_title']))

# Show the updated table
tv_df.show(1)


+------------+------------+----------+--------+-----------+-------------------+--------------------+--------------------------+
|country_name|country_iso2|      week|category|weekly_rank|         show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+----------+--------+-----------+-------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21|      TV|          1|Pasión de Gavilanes|Pasión de Gavilan...|                         5|
+------------+------------+----------+--------+-----------+-------------------+--------------------+--------------------------+
only showing top 1 row



# Exercise 2 - Most popular films by country (10 Marks)

1.   For each country in the data, find the highest rank each film reached, and the number of weeks it spent in the top 10. (4 marks)
2.   Produce a DataFrame that contains the film which spent the most time in each country's top 10. (4 marks)
3.   Produce a pie chart which shows how many countries the most popular films occurred in. (2 marks)


In [None]:
### Group the data by 'country_name' and 'show_title' and find the highest
### weekly rank and total weeks spent in the top 10 for each film, display the
### data
# .groupBy, .agg, F.max, F.min, .sort, .show



In [82]:
highest_rating_df = tv_df.groupBy("country_name", "show_title").agg(F.max("weekly_rank"))

# Display the resulting DataFrame
highest_rating_df.show()

+--------------+--------------------+----------------+
|  country_name|          show_title|max(weekly_rank)|
+--------------+--------------------+----------------+
|       Austria|            For Life|               9|
|       Austria|Demon Slayer: Kim...|               8|
|       Bahamas|        Virgin River|              10|
|       Bahamas|   The Longest Night|               4|
|       Bahrain|       L.A.’s Finest|               9|
|       Belgium|          After Life|               9|
|       Bolivia|      Inventing Anna|               9|
|       Bolivia|The King's Affection|              10|
|       Bolivia|The Snitch Cartel...|               9|
|        Brazil|Cúmplices de um R...|              10|
|      Bulgaria|          True Story|               6|
|      Bulgaria|           Big Mouth|               7|
|         Chile|            Intimacy|               4|
|      Colombia|              Arcane|               7|
|      Colombia|               Lupin|               9|
|        C

In [88]:
highest_rating_df=highest_rating_df.sort("max(weekly_rank)",ascending=False)
highest_rating_df.show()

+--------------+--------------------+----------------+
|  country_name|          show_title|max(weekly_rank)|
+--------------+--------------------+----------------+
|    Bangladesh|         Thirty-Nine|              10|
|      Honduras|My ID is Gangnam ...|              10|
|        Canada|Hunter X Hunter (...|              10|
|       Bolivia|The King's Affection|              10|
|         Chile|     Stranger Things|              10|
|Czech Republic|Love, Death & Robots|              10|
|      Colombia|     Stranger Things|              10|
|       Ecuador|         Rebelde Way|              10|
|        Cyprus|       The Blacklist|              10|
|       Iceland|      Disenchantment|              10|
|         Egypt|                Alba|              10|
|     Lithuania|       Love Is Blind|              10|
|       Finland|     The Good Doctor|              10|
|     Mauritius|          Good Girls|              10|
|       Hungary|       Downton Abbey|              10|
|     Maur

Produce a DataFrame which shows, for each country, the most popular film by the number of weeks it spent in the top 10, how many weeks that film was in the top 10, and the highest weekly rank it reached.

In [None]:
### Find the film that spent the most time in each country's top 10 list.
### Produce and display a table showing that information, along with the number
### of weeks spent in the top 10, and the highest weekly ranking
# .groupBy, .sort, .desc, .agg, F.first, .show



Group the DataFrame by `season_title` and count the number of countries where each season is the most popular. Convert the DataFrame to Pandas, and use the `DataFrame.plot.pie` method to produce a pie chart of the most popular shows.

In [None]:
### Convert the DataFrame to a Pandas DataFrame, and display it as a pie chart
# .toPandas, .plot.pie



# Exercise 3 - Most popular shows by country (10 Marks)


1.   Calculate the number of weeks each TV show spent in each country's top 10, across all seasons, such that you don't double count a show when two or more seasons appear in one week. (5 marks)
2.   Produce a DataFrame that contains the show which spent the most time in each country's top 10. (5 marks)

In [None]:
### Group the data by 'country_name' and 'show_title' and count the number of
### weeks each show appears in each country's top 10 lists, as well as the
### highest weekly ranking. Display the data.
# .groupBy, .agg, F.count_distinct, F.min, .sort, .show



In [None]:
### Find the show that spent the most time in each country's top 10 list.
### Produce and display a table showing that information, along with the number
### of weeks spent in the top 10, and the highest weekly ranking
# .groupBy, .sort, .desc, .agg, F.first, .show

