In [1]:
# Importing the necessary modules
import findspark
findspark.init()

# Initialize a SparkSession
from pyspark.sql import SparkSession

# Creating SparkSession
spark = SparkSession.builder.appName('TP').getOrCreate()

# Calling the session variable object
spark

In [88]:
netflix_url = "https://raw.githubusercontent.com/AmandaClinnie/DS625-TeamProject/main/netflix_titles.csv"
disney_url = "https://raw.githubusercontent.com/AmandaClinnie/DS625-TeamProject/main/disney_plus_titles.csv"

from pyspark import SparkFiles
spark.sparkContext.addFile(netflix_url)
spark.sparkContext.addFile(disney_url)

netflix_df = spark.read.csv("file:///"+SparkFiles.get("netflix_titles.csv"), header=True, inferSchema= True)
disney_df = spark.read.csv("file:///"+SparkFiles.get("disney_plus_titles.csv"), header=True, inferSchema= True)

In [76]:
netflix_df.printSchema()
disney_df.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [77]:
netflix_df.show(3)
disney_df.show(3)

+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|       director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                null|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|           null|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglands|Julien Leclercq|Sami Bouajila, Tr...|         null|Septem

In [78]:
netflix_df.createOrReplaceTempView("Netflix")
disney_df.createOrReplaceTempView("Disney")

In [79]:
# remove (do not select) show_id and date_added columns
# convert columns with multiple values to arrays
netflix_clean = spark.sql("SELECT type, \
                  title, \
                  SPLIT(director, ',') AS director, \
                  SPLIT(cast, ',') AS cast, \
                  SPLIT(country, ',') AS country, \
                  release_year, \
                  rating, \
                  duration, \
                  SPLIT(listed_in, ',') AS listed_in, \
                  description \
                  FROM Netflix")
netflix_clean.createOrReplaceTempView("Netflix1")

disney_clean = spark.sql("SELECT type, \
                  title, \
                  SPLIT(director, ',') AS director, \
                  SPLIT(cast, ',') AS cast, \
                  SPLIT(country, ',') AS country, \
                  release_year, \
                  rating, \
                  duration, \
                  SPLIT(listed_in, ',') AS listed_in, \
                  description \
                  FROM Disney")
disney_clean.createOrReplaceTempView("Disney1")

In [80]:
# test duration and listed_in columns to see if they vary between platforms
overlap = spark.sql("SELECT Netflix1.title, Netflix1.duration, Disney1.duration,\
                     Netflix1.listed_in, Disney1.listed_in FROM Netflix1 \
                     INNER JOIN Disney1 ON Netflix1.title = Disney1.title")
overlap.show(5, truncate=False)
# there is variation - worth keeping each dataset separate

+----------------+---------+---------+--------------------------------------------------------+-----------------------------------------------------+
|title           |duration |duration |listed_in                                               |listed_in                                            |
+----------------+---------+---------+--------------------------------------------------------+-----------------------------------------------------+
|PJ Masks        |3 Seasons|5 Seasons|[Kids' TV]                                              |[Action-Adventure,  Animation,  Kids]                |
|Once Upon a Time|1 Season |7 Seasons|[International TV Shows,  Romantic TV Shows,  TV Dramas]|[Action-Adventure,  Fantasy,  Soap Opera / Melodrama]|
|Gigantosaurus   |1 Season |2 Seasons|[Kids' TV]                                              |[Action-Adventure,  Animation,  Kids]                |
|Becoming        |89 min   |1 Season |[Documentaries]                                         |[Anth

In [81]:
netflix_nulls = spark.sql("SELECT COUNT (*) AS type_nulls FROM Netflix1 WHERE type IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS title_nulls FROM Netflix1 WHERE title IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS director_nulls FROM Netflix1 WHERE director IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS cast_nulls FROM Netflix1 WHERE cast IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS country_nulls FROM Netflix1 WHERE country IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS release_year_nulls FROM Netflix1 WHERE release_year IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS rating_nulls FROM Netflix1 WHERE rating IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS duration_nulls FROM Netflix1 WHERE duration IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS listed_in_nulls FROM Netflix1 WHERE listed_in IS NULL")
netflix_nulls.show()

netflix_nulls = spark.sql("SELECT COUNT (*) AS desctiption_nulls FROM Netflix1 WHERE description IS NULL")
netflix_nulls.show()

+----------+
|type_nulls|
+----------+
|         1|
+----------+

+-----------+
|title_nulls|
+-----------+
|          2|
+-----------+

+--------------+
|director_nulls|
+--------------+
|          2636|
+--------------+

+----------+
|cast_nulls|
+----------+
|       826|
+----------+

+-------------+
|country_nulls|
+-------------+
|          832|
+-------------+

+------------------+
|release_year_nulls|
+------------------+
|                 2|
+------------------+

+------------+
|rating_nulls|
+------------+
|           6|
+------------+

+--------------+
|duration_nulls|
+--------------+
|             5|
+--------------+

+---------------+
|listed_in_nulls|
+---------------+
|              3|
+---------------+

+-----------------+
|desctiption_nulls|
+-----------------+
|                3|
+-----------------+



In [82]:
# remove rows for columns having <10 null values
# director, cast & country columns not as important to be NOT NULL/
#  can be dealt with later
netflix_clean_nulls = spark.sql("SELECT type, \
                  title, \
                  director, \
                  cast, \
                  country, \
                  release_year, \
                  rating, \
                  duration, \
                  listed_in, \
                  description \
                  FROM Netflix \
                  WHERE type IS NOT NULL AND \
                  title IS NOT NULL AND \
                  release_year IS NOT NULL AND \
                  rating IS NOT NULL AND \
                  duration IS NOT NULL AND \
                  listed_in IS NOT NULL AND \
                  description IS NOT NULL")
netflix_clean_nulls.createOrReplaceTempView("Netflix2")

In [83]:
disney_nulls = spark.sql("SELECT COUNT (*) AS type_nulls FROM Disney1 WHERE type IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS title_nulls FROM Disney1 WHERE title IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS director_nulls FROM Disney1 WHERE director IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS cast_nulls FROM Disney1 WHERE cast IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS country_nulls FROM Disney1 WHERE country IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS release_year_nulls FROM Disney1 WHERE release_year IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS rating_nulls FROM Disney1 WHERE rating IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS duration_nulls FROM Disney1 WHERE duration IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS listed_in_nulls FROM Disney1 WHERE listed_in IS NULL")
disney_nulls.show()

disney_nulls = spark.sql("SELECT COUNT (*) AS desctiption_nulls FROM Disney1 WHERE description IS NULL")
disney_nulls.show()

+----------+
|type_nulls|
+----------+
|         0|
+----------+

+-----------+
|title_nulls|
+-----------+
|          0|
+-----------+

+--------------+
|director_nulls|
+--------------+
|           473|
+--------------+

+----------+
|cast_nulls|
+----------+
|       190|
+----------+

+-------------+
|country_nulls|
+-------------+
|          218|
+-------------+

+------------------+
|release_year_nulls|
+------------------+
|                 0|
+------------------+

+------------+
|rating_nulls|
+------------+
|           3|
+------------+

+--------------+
|duration_nulls|
+--------------+
|             0|
+--------------+

+---------------+
|listed_in_nulls|
+---------------+
|              1|
+---------------+

+-----------------+
|desctiption_nulls|
+-----------------+
|                0|
+-----------------+



In [84]:
# remove rows for columns having <10 null values
# director, cast & country columns not as important to be NOT NULL/
#  can be dealt with later
disney_clean_nulls = spark.sql("SELECT type, \
                  title, \
                  director, \
                  cast, \
                  country, \
                  release_year, \
                  rating, \
                  duration, \
                  listed_in, \
                  description \
                  FROM Netflix \
                  WHERE rating IS NOT NULL AND \
                  listed_in IS NOT NULL")
disney_clean_nulls.createOrReplaceTempView("Disney2")

In [85]:
# NETFLIX table: Netflix2
# DISNEY table: Disney2

In [89]:
# table counts
spark.sql("SELECT COUNT (*) FROM Netflix2").show()
spark.sql("SELECT COUNT (*) FROM Disney2").show()

+--------+
|count(1)|
+--------+
|    8799|
+--------+

+--------+
|count(1)|
+--------+
|    8802|
+--------+

