<a href="https://colab.research.google.com/github/DevAdedoyin/Applied-Data-Science-Project/blob/master/PySpark_Assignment_Sep23.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ADS2 - Assignment 1 - Data Handling and Processing with PySpark

**STUDENT NAME -**

**STUDENT ID -**

In this assignment, you will be analysing the popularity of films and TV shows on the streaming platform, Netflix. Using your knowledge of PySpark DataFrames and Spark SQL, you will produce a number of "downstream" data products to analyse trends in global streaming habits.

Download the dataset from this [Kaggle](https://www.kaggle.com/dhruvildave/netflix-top-10-tv-shows-and-films) page. A copy of the `all_weeks_countries.csv` file is also available on the canvas page for this assignment.

Your task is to load in the data and produce a number of "downstream" data products and plots as described below.

The PySpark installation and setup is provided below for conveinience.

IMPORTANT: DO NOT EDIT OR REMOVE THE COMMENT TAGS IN THE CODE CELLS

In [None]:
# CodeGrade Tag Init1

# Apache Spark uses Java, so first we must install that
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# CodeGrade Tag Init2
# Mount Google Drive and unpack Spark
from google.colab import drive
drive.mount('/content/drive')
!tar xzf /content/drive/MyDrive/spark-3.3.0-bin-hadoop3.tgz

Mounted at /content/drive


In [None]:
# CodeGrade Tag Init3
# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

In [None]:
# CodeGrade Tag Init4
# Install findspark, which helps python locate the pyspark module files
!pip install -q findspark
import findspark
findspark.init()

In [None]:
# Finally, we initialse a "SparkSession", which handles the computations
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
# Load the all_weeks_countries.csv into your Colab Notebook as a DataFrame.
netflixcsvpath = '/content/all-weeks-countries.csv'

# Data is loaded with header: True and an inferred schema
netflixDF = (spark
           .read
           .option('header', 'True')
           .option('inferSchema', 'True')
           .csv(netflixcsvpath)
          )

In [None]:
# pyspark.sql.functions countains all the transformations and actions you will
# need
from pyspark.sql import functions as F

# Exercise 1 - Data Preparation


1.   Create two separate DataFrames for Films and TV.
2.   For the Films data, drop the column containing the season names.
3.   For the TV data, replace any null values in the season name column with the show name.

In [None]:
# CodeGrade Tag Ex1a

### Display the table and its schema

# Display the dataframe table
netflixDF.show()

# Display the schema
netflixDF.printSchema()


+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|                null|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|                null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|                null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|                null|                         1|
|   Argentina|          AR|2022-08

In [None]:
# CodeGrade Tag Ex1b

### Seperate the data into two DataFrames for Films and TV
### Call the dataframes tvDF and filmsDF

# Filters for "Films" category in the dataframe
filmsDF = netflixDF.filter(netflixDF["category"] == "Films")

# Filters for "TV" category in the dataframe
tvDF = netflixDF.filter(netflixDF["category"] == "TV")

# Display the top 20 rows of the Films(filmsDF) and TV(tvDF) dataframe
filmsDF.show()
tvDF.show()

+------------+------------+-------------------+--------+-----------+--------------------+------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|        null|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|        null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|        null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|        null|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          5|The Angry Birds M...| 

In [None]:
# CodeGrade Tag Ex1c

### Drop the 'season_title' column from the Films DataFrame, display the table

# Drop the 'season_title' column from the 'filmsDF' DataFrame
filmsDF = filmsDF.drop("season_title")

# Display the Films(filmsDF) dataframe
filmsDF.show()

+------------+------------+-------------------+--------+-----------+--------------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          5|The Angry Birds M...|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|        

In [None]:
# CodeGrade Tag Ex1d

### Use the F.isnull function to create a column showing where there are null
### values in the 'season_title' column. Replace the null values with the
### corresponding value from the 'show_title' column, then replace the
### 'season_title' column in the tvDF DataFrame.

# Check for null values in the 'season_title' column and create a new column
# 'season_title_is_null'
tvDF = tvDF.withColumn("season_title_is_null", F.isnull(tvDF["season_title"]))

# Replace null values in 'season_title' with values from 'show_title'
tvDF = tvDF.withColumn("season_title",
    F.when(tvDF["season_title_is_null"], tvDF["show_title"])\
    .otherwise(tvDF["season_title"]))

# Display the modified 'tvDF' DataFrame
tvDF.show()


+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+--------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|        season_title|cumulative_weeks_in_top_10|season_title_is_null|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+--------------------+
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          1| Pasión de Gavilanes|Pasión de Gavilan...|                         5|               false|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          2|        Another Self|Another Self: Sea...|                         4|               false|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          3| Pasión de Gavilanes|Pasión de Gavilan...|                        58|               false|
|   Argentina|          AR|2022-08-21 00:00:00

# Exercise 2

Making use of the "groupBy" and "where" methods, find the number of weeks the show "Stranger Things" was in the Top 10 for the United Kingdom across all seasons. Store your result in a variable named "STWeeks"


In [None]:
# CodeGrade Tag Ex2
### Use the "where" method to create a new dataframe containing the data for
### the show Stranger Things in the Uniter Kingdom. Call this dataframe STDF.


### Using "groupBy" method and "F.count_distinct" function, find the total number of weeks
### Stranger Things spent in the top 10 of the UK, across all seasons. Show the
### result.

# Create a DataFrame "STDF" for "Stranger Things" in the United Kingdom
STDF = netflixDF.where(
    (F.col("show_title") == "Stranger Things") &
     (F.col("country_name") == "United Kingdom"))

# Group by the show_title and calculate the total number of weeks in the top 10
total_weeks = STDF.groupBy("show_title")\
              .agg(F.countDistinct("week").alias("total_weeks"))

# Show the result
total_weeks.show()

+---------------+-----------+
|     show_title|total_weeks|
+---------------+-----------+
|Stranger Things|         13|
+---------------+-----------+



# Exercise 3

Produce a dataframe containing only the Top 25 TV seasons in the UK, based on the number of weeks they spent in the Top 10.

In [None]:
# CodeGrade Tag Ex3
### Produce a dataframe containing the top 25 seasons by number of weeks in the
### top 10 of the United Kingdom, sorted by number of weeks. Store the dataframe
### in a variable called Top25

# Create a DataFrame "Top25" containing the top 25 seasons in the United Kingdom
Top25 = netflixDF.filter(netflixDF["country_name"] == "United Kingdom") \
    .groupBy("show_title", "season_title") \
    .agg(F.count("week").alias("total_weeks")) \
    .orderBy(F.col("total_weeks"), ascending=False) \
    .limit(25)

# Show the "Top25" DataFrame
Top25.show()

+------------------+--------------------+-----------+
|        show_title|        season_title|total_weeks|
+------------------+--------------------+-----------+
|   Stranger Things|   Stranger Things 4|         13|
|   Stranger Things|   Stranger Things 3|         12|
|             Ozark|     Ozark: Season 4|         12|
|   Stranger Things|   Stranger Things 2|         11|
|  Better Call Saul|Better Call Saul:...|         10|
|        Squid Game|Squid Game: Season 1|         10|
|   Stranger Things|                null|         10|
|              Maid|Maid: Limited Series|          9|
|        PAW Patrol|PAW Patrol: Season 6|          9|
|        Bridgerton|Bridgerton: Season 2|          9|
|Sonic the Hedgehog|                null|          8|
|  The Adam Project|                null|          8|
|    Inventing Anna|Inventing Anna: L...|          8|
|       Money Heist| Money Heist: Part 5|          7|
|     PIECES OF HER|PIECES OF HER: Se...|          7|
|          Sex/Life|  Sex/Li

# Exercise 4

For the show "Young Sheldon", find the country where each season spent the most time in the Top 10.

In [None]:
# CodeGrade Tag Ex4
### For each season of the show "Young Sheldon" find the countries where it spent
### the most time in the Top 10

# Filter the DataFrame to include only "Young Sheldon" rows
young_sheldon_df = netflixDF.filter(netflixDF["show_title"] == "Young Sheldon")

# Group the data by season and country
grouped_df = young_sheldon_df.groupBy("season_title", "country_name")

# Calculate the cumulative weeks in the top 10 for each season in each country
max_weeks_df = grouped_df.agg(
    F.max("cumulative_weeks_in_top_10").alias("max_weeks"))

max_time_df = max_weeks_df.groupBy("season_title", "country_name").\
              agg(F.max("max_weeks").alias("max_weeks"))

# Find the country where each season spent the most time in the top 10
result_df = max_time_df.groupBy("season_title").\
              agg(F.max("max_weeks").alias("max_weeks"),
                  F.first("country_name").alias("country_with_max_time"))

# Display result
result_df.show()


+--------------------+---------+---------------------+
|        season_title|max_weeks|country_with_max_time|
+--------------------+---------+---------------------+
|Young Sheldon: : ...|        6|               Canada|
|Young Sheldon: : ...|        6|              Ireland|
|Young Sheldon: : ...|        4|            Australia|
|Young Sheldon: Se...|        1|          New Zealand|
+--------------------+---------+---------------------+



# Exercise 5

For each country, find the film that spent the most time in the Top 10.

In [None]:
# CodeGrade Tag Ex5
### For each country, find the film that spent the most time in the Top 10
### Display the results in a Dataframe ordered by country name.

# Group the data by country and film to find the total weeks in the top 10
groupedDF = filmsDF.groupBy("country_name", "show_title").agg(
    F.sum("cumulative_weeks_in_top_10").alias("total_weeks_in_top_10")
)

# Find the film that spent the most time in the top 10 for each country
maxWeeksDF = groupedDF.groupBy("country_name").agg(
    F.max("total_weeks_in_top_10").alias("max_weeks"),
    F.first("show_title").alias("top_film"))

# Display the result
maxWeeksDF.show()


+------------------+---------+--------------------+
|      country_name|max_weeks|            top_film|
+------------------+---------+--------------------+
|         Argentina|       28|       In Good Hands|
|         Australia|       36|           Bloodshot|
|           Austria|       45|                Buba|
|           Bahamas|       45|          The Healer|
|           Bahrain|       36|          Spiderhead|
|        Bangladesh|      231|             Nikamma|
|           Belgium|       36|        Heart Parade|
|           Bolivia|      171|Men in Black: Int...|
|            Brazil|       28|  The Rhythm Section|
|          Bulgaria|      120|G.I. Joe: The Ris...|
|            Canada|       28|   Indecent Proposal|
|             Chile|       36|   Bohemian Rhapsody|
|          Colombia|       78|               Lulli|
|        Costa Rica|       55|  Dangerous Liaisons|
|           Croatia|      105|              Hustle|
|            Cyprus|       78|           Bumblebee|
|    Czech R

# Exercise 6

Calculate the number of weeks each film spent at the number 1 spot of each country's Top 10 list. Then find the films that spent the most time in the number 1 spot for each country.

In [None]:
# CodeGrade Tag Ex6a

### Create a column using the F.when function to calculate the number of weeks a
### films spens in the number 1 spot of the Top 10. Use the .otherwise method to
### set rows with no number 1 spots to zero. Use the .alias metod to call this
### column "weeks_at_1"

# Find the film that spent the most time in the top 10 for each country
maxWeeksDF = groupedDF.groupBy("country_name").agg(
    F.max("total_weeks_in_top_10").alias("max_weeks"),
    F.first("show_title").alias("top_film")
)

# Create a new column "weeks_at_1" using F.when
resultDF = maxWeeksDF.withColumn(
    "weeks_at_1",
    F.when(F.col("max_weeks") == 1, 1).otherwise(0)
).orderBy("country_name")

# Show the result
resultDF.show()

+------------------+---------+--------------------+----------+
|      country_name|max_weeks|            top_film|weeks_at_1|
+------------------+---------+--------------------+----------+
|         Argentina|       28|       In Good Hands|         0|
|         Australia|       36|           Bloodshot|         0|
|           Austria|       45|                Buba|         0|
|           Bahamas|       45|          The Healer|         0|
|           Bahrain|       36|          Spiderhead|         0|
|        Bangladesh|      231|             Nikamma|         0|
|           Belgium|       36|        Heart Parade|         0|
|           Bolivia|      171|Men in Black: Int...|         0|
|            Brazil|       28|  The Rhythm Section|         0|
|          Bulgaria|      120|G.I. Joe: The Ris...|         0|
|            Canada|       28|   Indecent Proposal|         0|
|             Chile|       36|   Bohemian Rhapsody|         0|
|          Colombia|       78|               Lulli|    

In [21]:
# CodeGrade Tag Ex6b

### Group by country name and sow title, and use the .agg method and your new
### column to find the number of weeks each film spent in the top spot for each
### country.

# Create a new column "weeks_at_1" using F.when
resultDF = maxWeeksDF.withColumn(
    "weeks_at_1",
    F.when(F.col("max_weeks") == 1, 1).otherwise(0)
)

# Group by country name and show title, and aggregate the sum of "weeks_at_1"
finalResultDF = resultDF.groupBy("country_name", "top_film").agg(
    F.sum("weeks_at_1").alias("weeks_in_top_spot")
).orderBy("country_name", "top_film")

# Show the result
finalResultDF.show()

+------------------+--------------------+-----------------+
|      country_name|            top_film|weeks_in_top_spot|
+------------------+--------------------+-----------------+
|         Argentina|       In Good Hands|                0|
|         Australia|           Bloodshot|                0|
|           Austria|                Buba|                0|
|           Bahamas|          The Healer|                0|
|           Bahrain|          Spiderhead|                0|
|        Bangladesh|             Nikamma|                0|
|           Belgium|        Heart Parade|                0|
|           Bolivia|Men in Black: Int...|                0|
|            Brazil|  The Rhythm Section|                0|
|          Bulgaria|G.I. Joe: The Ris...|                0|
|            Canada|   Indecent Proposal|                0|
|             Chile|   Bohemian Rhapsody|                0|
|          Colombia|               Lulli|                0|
|        Costa Rica|  Dangerous Liaisons

In [22]:
# CodeGrade Tag Ex6c

### Produce a dataframe grouped by country name that contains the show title and
### number of weeks at the number 1 spot of the top performing film in each
### country.

# Create a new column "weeks_at_1" using F.when
resultDF = maxWeeksDF.withColumn(
    "weeks_at_1",
    F.when(F.col("max_weeks") == 1, 1).otherwise(0)
)

# Select relevant columns
finalResultDF = resultDF.select("country_name", "top_film", "weeks_at_1").orderBy("country_name")

# Show the result
finalResultDF.show()

+------------------+--------------------+----------+
|      country_name|            top_film|weeks_at_1|
+------------------+--------------------+----------+
|         Argentina|       In Good Hands|         0|
|         Australia|           Bloodshot|         0|
|           Austria|                Buba|         0|
|           Bahamas|          The Healer|         0|
|           Bahrain|          Spiderhead|         0|
|        Bangladesh|             Nikamma|         0|
|           Belgium|        Heart Parade|         0|
|           Bolivia|Men in Black: Int...|         0|
|            Brazil|  The Rhythm Section|         0|
|          Bulgaria|G.I. Joe: The Ris...|         0|
|            Canada|   Indecent Proposal|         0|
|             Chile|   Bohemian Rhapsody|         0|
|          Colombia|               Lulli|         0|
|        Costa Rica|  Dangerous Liaisons|         0|
|           Croatia|              Hustle|         0|
|            Cyprus|           Bumblebee|     