#  Data Handling and Processing with PySpark


In this Code, you will be analysing the popularity of films and TV shows on the streaming platform, Netflix. Using your knowledge of PySpark DataFrames and Spark SQL, you will produce a number of "downstream" data products to analyse trends in global streaming habits.

Download the dataset from this [Kaggle](https://www.kaggle.com/dhruvildave/netflix-top-10-tv-shows-and-films) page. A copy of the `all_weeks_countries.csv` file is also available on the canvas page for this assignment.

Your task is to load in the data and produce a number of "downstream" data products and plots as described below.

The PySpark installation and setup is provided below for conveinience.

IMPORTANT: DO NOT EDIT OR REMOVE THE COMMENT TAGS IN THE CODE CELLS

In [None]:



!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:

# Mount Google Drive and unpack Spark
from google.colab import drive
drive.mount('/content/drive')
!tar xzf /content/drive/MyDrive/spark-3.3.0-bin-hadoop3.tgz

Mounted at /content/drive


In [None]:

# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

In [None]:

# Install findspark, which helps python locate the pyspark module files
!pip install -q findspark
import findspark
findspark.init()

In [None]:
# Finally, we initialse a "SparkSession", which handles the computations
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
# Load the all_weeks_countries.csv into your Colab Notebook as a DataFrame.
netflixcsvpath = '/content/all-weeks-countries.csv'

# Data is loaded with header: True and an inferred schema
netflixDF = (spark
           .read
           .option('header', 'True')
           .option('inferSchema', 'True')
           .csv(netflixcsvpath)
          )

In [None]:
# pyspark.sql.functions countains all the transformations and actions you will
# need
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window

# Data Preparation


1.   Create two separate DataFrames for Films and TV.
2.   For the Films data, drop the column containing the season names.
3.   For the TV data, replace any null values in the season name column with the show name.

In [None]:



# Display the table and its schema
netflixDF.show()

netflixDF.printSchema()





+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|                null|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|                null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|                null|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|                null|                         1|
|   Argentina|          AR|2022-08

In [None]:


### Seperate the data into two DataFrames for Films and TV
### Call the dataframes tvDF and filmsDF

# Separate the data into TV shows and films DataFrames
tvDF = netflixDF.filter(col('category') == 'TV')
filmsDF = netflixDF.filter(col('category') == 'Films')

# Display the first few rows of each DataFrame
print("TV Shows DataFrame:")
tvDF.show()

print("Films DataFrame:")
filmsDF.show()

TV Shows DataFrame:
+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          1| Pasión de Gavilanes|Pasión de Gavilan...|                         5|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          2|        Another Self|Another Self: Sea...|                         4|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          3| Pasión de Gavilanes|Pasión de Gavilan...|                        58|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          4|            Manifest|  Manifest: Season 1|                         6|
|   Argentina|

In [None]:


### Drop the 'season_title' column from the Films DataFrame, display the table

filmsDF = filmsDF.drop('season_title')
filmsDF.show()

+------------+------------+-------------------+--------+-----------+--------------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|                         2|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          5|The Angry Birds M...|                         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|        

In [None]:


### Use the F.isnull function to create a column showing where there are null
### values in the 'season_title' column. Replace the null values with the
### corresponding value from the 'show_title' column, then replace the
### 'season_title' column in the tvDF DataFrame.

tvDF = tvDF.withColumn('season_title_null',F.isnull('season_title'))

# Replace null values in 'season_title' with corresponding values from 'show_title'
tvDF = tvDF.withColumn('season_title', F.when(col('season_title_null'), col('show_title')).otherwise(col('season_title')))

# Drop the 'season_title_null' column
tvDF = tvDF.drop('season_title_null')

# Display the updated TV Shows DataFrame
print("Updated TV Shows DataFrame:")
tvDF.show()


Updated TV Shows DataFrame:
+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|        season_title|cumulative_weeks_in_top_10|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------+--------------------------+
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          1| Pasión de Gavilanes|Pasión de Gavilan...|                         5|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          2|        Another Self|Another Self: Sea...|                         4|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          3| Pasión de Gavilanes|Pasión de Gavilan...|                        58|
|   Argentina|          AR|2022-08-21 00:00:00|      TV|          4|            Manifest|  Manifest: Season 1|                         6|
|   Ar

# Part 2

Making use of the "groupBy" and "where" methods, find the number of weeks the show "Stranger Things" was in the Top 10 for the United Kingdom across all seasons. Store your result in a variable named "STWeeks"


In [None]:

### Use the "where" method to create a new dataframe containing the data for
### the show Stranger Things in the Uniter Kingdom. Call this dataframe STDF.

STDF = (
    tvDF
    .where((F.col('show_title') == 'Stranger Things') & (F.col('country_name') == 'United Kingdom'))
    .groupBy('show_title')
    .agg(F.countDistinct('week').alias('total_weeks_in_top10'))
)

STWeeks = STDF.first()['total_weeks_in_top10']

print("Total Number of Weeks Stranger Things Spent in the Top 10 in the UK:", STWeeks)




Total Number of Weeks Stranger Things Spent in the Top 10 in the UK: 13


# Part 3

Produce a dataframe containing only the Top 25 TV seasons in the UK, based on the number of weeks they spent in the Top 10.

In [None]:

### Produce a dataframe containing the top 25 seasons by number of weeks in the
### top 10 of the United Kingdom, sorted by number of weeks. Store the dataframe
### in a variable called Top25

# Including only the ones in UK
tvDF_UK = tvDF.where(F.col('country_name') == 'United Kingdom')

# Calculating the number of weeks in the top 10 for each TV season in the UK
tvSeasonsWeeksUK = (
    tvDF_UK
    .groupBy('country_name', 'show_title', 'season_title')
    .agg(F.max('cumulative_weeks_in_top_10').alias('weeks_in_top10'))
)

# Sorting the DataFrame by the total weeks in the Top 10 and limiting to the top 25 entries
#In PySpark, the DataFrame class does not have a sortBy method.
#I used orderBy instead, which is the correct method in my opinion for sorting a DataFrame
Top25 = (
    tvSeasonsWeeksUK
    .orderBy(F.desc('weeks_in_top10'))
    .limit(25)
    .groupBy('country_name', 'season_title')  # Group by country_name and season title
    .agg(F.max('weeks_in_top10').alias('max_weeks_in_top10'))  # Find the max weeks within each season
    .orderBy(F.desc('max_weeks_in_top10'))  # Sort by the max weeks in the Top 10 using sortBy
)

# Display the result
Top25.show(25,truncate=False)






+--------------+------------------------------+------------------+
|country_name  |season_title                  |max_weeks_in_top10|
+--------------+------------------------------+------------------+
|United Kingdom|Stranger Things 4             |13                |
|United Kingdom|Stranger Things 3             |12                |
|United Kingdom|Ozark: Season 4               |12                |
|United Kingdom|Stranger Things 2             |11                |
|United Kingdom|Better Call Saul: Season 6    |10                |
|United Kingdom|Squid Game: Season 1          |10                |
|United Kingdom|Stranger Things               |10                |
|United Kingdom|PAW Patrol: Season 6          |9                 |
|United Kingdom|Maid: Limited Series          |9                 |
|United Kingdom|Bridgerton: Season 2          |9                 |
|United Kingdom|Inventing Anna: Limited Series|8                 |
|United Kingdom|PIECES OF HER: Season 1       |7              

# Part 4

For the show "Young Sheldon", find the country where each season spent the most time in the Top 10.

In [None]:

### For each season of the show "Young Sheldon" find the countries where it spent
### the most time in the Top 10

# Filter the data for the show "Young Sheldon"
youngSheldonDF = tvDF.where(F.col('show_title') == 'Young Sheldon')

# Define a window specification partitioned by season and ordered by the maximum cumulative weeks in top 10
#using this method because the code gets confused in so many season title selections
#simplifiying it for understanding of the reader
windowSpecYS = Window.partitionBy('season_title').orderBy(F.desc('max_cumulative_weeks_in_top_10'))

# Calculating the maximum cumulative weeks in the top 10 for each country for young sheldons all seasons
ysAggregatedDF = (
    youngSheldonDF
    .groupBy('season_title', 'country_name')
    .agg(F.max('cumulative_weeks_in_top_10').alias('max_cumulative_weeks_in_top_10'))
)

# Sorting the DataFrame by the maximum cumulative weeks in the top 10
sortedYSDF = ysAggregatedDF.sort('max_cumulative_weeks_in_top_10', ascending=True)


resultYSDF = (
    sortedYSDF
    .withColumn('rank', F.row_number().over(windowSpecYS))
    .filter(F.col('rank') == 1)
    .drop('rank')
)

# Display the result
resultYSDF.show(truncate=False)



+-------------------------+--------------+------------------------------+
|season_title             |country_name  |max_cumulative_weeks_in_top_10|
+-------------------------+--------------+------------------------------+
|Young Sheldon: : Season 1|Canada        |6                             |
|Young Sheldon: : Season 2|United Kingdom|6                             |
|Young Sheldon: : Season 3|United Kingdom|4                             |
|Young Sheldon: Season 4  |New Zealand   |1                             |
+-------------------------+--------------+------------------------------+



# Part 5

For each country, find the film that spent the most time in the Top 10.

In [None]:

### For each country, find the film that spent the most time in the Top 10
### Display the results in a Dataframe ordered by country name.


# Grouping by country_name and show_title
countryShowGroup = filmsDF.groupBy('country_name', 'show_title')

# Calculating the maximum cumulative weeks in the top 10 for each country and show_title
cumulativeWeeksDF = countryShowGroup.agg(F.max('cumulative_weeks_in_top_10').alias('max_cumulative_weeks_in_top_10'))

# Sorting the DataFrame by the maximum cumulative weeks in the top 10
sortedCumulativeWeeksDF = cumulativeWeeksDF.sort('max_cumulative_weeks_in_top_10', ascending=True)


countryWindowSpec = Window.partitionBy('country_name').orderBy(F.desc('max_cumulative_weeks_in_top_10'))

# Adding a row number to each row within the window
rankedFilmsDF = (
    sortedCumulativeWeeksDF
    .withColumn('rank', F.row_number().over(countryWindowSpec))
    .filter(F.col('rank') == 1)
    .select('country_name', 'show_title', 'max_cumulative_weeks_in_top_10')
    .orderBy('country_name', 'show_title')
)

# Display the result
print("Film that spent the most time in the Top 10 for each country:")
rankedFilmsDF.show(truncate=False)





Film that spent the most time in the Top 10 for each country:
+------------------+-------------------------------------+------------------------------+
|country_name      |show_title                           |max_cumulative_weeks_in_top_10|
+------------------+-------------------------------------+------------------------------+
|Argentina         |Sonic the Hedgehog                   |7                             |
|Australia         |Back to the Outback                  |8                             |
|Austria           |Harry Potter and the Sorcerer's Stone|9                             |
|Bahamas           |A Madea Homecoming                   |9                             |
|Bahrain           |Red Notice                           |8                             |
|Bangladesh        |Sooryavanshi                         |21                            |
|Belgium           |The Adam Project                     |8                             |
|Bolivia           |Shrek             

# Part 6

Calculate the number of weeks each film spent at the number 1 spot of each country's Top 10 list. Then find the films that spent the most time in the number 1 spot for each country.

In [None]:


### Create a column using the F.when function to calculate the number of weeks a
### films spens in the number 1 spot of the Top 10. Use the .otherwise method to
### set rows with no number 1 spots to zero. Use the .alias metod to call this
### column "weeks_at_1"

FilmsDF = filmsDF.withColumn('weeks_at_1',
                             F.when(F.col('weekly_rank') == 1, 1)
                             .otherwise(0)
                             .alias("weeks_at_1")
                           )

FilmsDF.show()


+------------+------------+-------------------+--------+-----------+--------------------+--------------------------+----------+
|country_name|country_iso2|               week|category|weekly_rank|          show_title|cumulative_weeks_in_top_10|weeks_at_1|
+------------+------------+-------------------+--------+-----------+--------------------+--------------------------+----------+
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          1|      Look Both Ways|                         1|         1|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          2|           Day Shift|                         2|         0|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          3|Bank Robbers: The...|                         2|         0|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          4|   The Next 365 Days|                         1|         0|
|   Argentina|          AR|2022-08-21 00:00:00|   Films|          5|The Angry Birds M...|               

In [None]:


### Group by country name and show title, and use the .agg method and your new
### column to find the number of weeks each film spent in the top spot for each
### country.
# Group by show title and country name
filmsTopSpotDF = (
    filmsDF
    .groupBy('country_name', 'show_title')
    .agg(F.sum('weeks_at_1').alias('total_weeks_at_1'))
    .orderBy(F.desc('total_weeks_at_1'))
)

filmsTopSpotDF.show()




+------------+-------------------+----------------+
|country_name|         show_title|total_weeks_at_1|
+------------+-------------------+----------------+
|      Russia|      Don't Look Up|               6|
|   Hong Kong|      Don't Look Up|               6|
|      Taiwan|      Don't Look Up|               5|
|      Taiwan|Gangubai Kathiawadi|               5|
|     Vietnam|         Red Notice|               5|
|      Israel|The Tinder Swindler|               4|
|     Estonia|      Don't Look Up|               4|
|  Bangladesh|    Haseen Dillruba|               4|
|     Ukraine|The Tinder Swindler|               4|
|     Vietnam|      Don't Look Up|               4|
|     Croatia|The Tinder Swindler|               4|
|    Bulgaria|         Red Notice|               4|
|       India|    Haseen Dillruba|               4|
|      Jordan|         Red Notice|               4|
|      Russia|The Tinder Swindler|               4|
|    Slovenia|      Don't Look Up|               4|
|    Pakista

In [None]:


### Produce a dataframe grouped by country name that contains the show title and
### number of weeks at the number 1 spot of the top performing film in each
### country.

FilmsDF = filmsDF.groupBy('country_name') \
                 .agg(F.first('show_title'), F.sum('weeks_at_1'))\
                 .drop('sum(weeks_at_1)')

FilmsDF.show()

+------------------+-----------------+
|      country_name|first(show_title)|
+------------------+-----------------+
|         Argentina|   Look Both Ways|
|         Australia|        Day Shift|
|           Austria|        Day Shift|
|           Bahamas|        Day Shift|
|           Bahrain|        Day Shift|
|        Bangladesh|          Nikamma|
|           Belgium|        Day Shift|
|           Bolivia|   Look Both Ways|
|            Brazil|The Next 365 Days|
|          Bulgaria|The Next 365 Days|
|            Canada|        Day Shift|
|             Chile|   Look Both Ways|
|          Colombia|        Day Shift|
|        Costa Rica|   Look Both Ways|
|           Croatia|   Look Both Ways|
|            Cyprus|The Next 365 Days|
|    Czech Republic|          Jackpot|
|           Denmark|        Day Shift|
|Dominican Republic|The Next 365 Days|
|           Ecuador|   Look Both Ways|
+------------------+-----------------+
only showing top 20 rows

