In [1]:
!wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.0.jre8/mssql-jdbc-9.4.0.jre8.jar

--2023-02-26 02:53:17--  https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.0.jre8/mssql-jdbc-9.4.0.jre8.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1334599 (1.3M) [application/java-archive]
Saving to: ‘mssql-jdbc-9.4.0.jre8.jar’


2023-02-26 02:53:17 (23.4 MB/s) - ‘mssql-jdbc-9.4.0.jre8.jar’ saved [1334599/1334599]



In [28]:
# Installing pyspark e as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark
# Importing SparkSession
from pyspark.sql import SparkSession
# Creating a Spark Session
spark = SparkSession.builder.appName("myApp").config("spark.jars", "mssql-jdbc-9.4.0.jre8.jar").getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# Checking Spark Session Information
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [99]:
# Importing Spark functions from library
from pyspark.sql.functions import to_date, col, regexp_extract, regexp_replace, when
from pyspark.sql.types import *
import re

In [100]:
# Downloadig from http to local
!wget --quiet --show-progress -O netflix.zip "https://storage.googleapis.com/kaggle-data-sets/434238/2654038/compressed/netflix_titles.csv.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20230223%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20230223T223500Z&X-Goog-Expires=259200&X-Goog-SignedHeaders=host&X-Goog-Signature=3e0f8a87eab1f539319a4f7da5e373720365fdda63b10fc4d4a478b5c844dcd7500901f8867688f8d567ebb6df2f0e2f36d561ab314a690f090cf002b9114faa294c13e4a495f9ccb9ff2f3dddf7acfb40acd435d9aa15f227c58592c7ea71639c082c88b9760078c53ffb4759ca665a39b8e708efeecea3e9c7eda5439a5d89d720fb1eaff1f056b23aff29ae14d3029eacafcdf775bae58f2946ffeac5483151b7fa9e5555d291a3f541d837e39bc436cd6112693fec8082b12d898500762b7329468f068d1c97ed6e4ecc48bfa8504c464ac13b54da2054b9490675f415bd84518b43a246c6e2b9196aa50039a62236dcf8dfe98100f9790a631910912cba"
!unzip netflix.zip


Archive:  netflix.zip
replace netflix_titles.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: netflix_titles.csv      


In [101]:
# Loading json file with credentials 
import json

# Reading information from config.json file
with open('config.json', 'r') as f:
    config = json.load(f)

In [102]:
# Loading Netflix data and turning into Spark Dataframe
netflix_df01 = spark.read.csv("./netflix_titles.csv", inferSchema=True, header=True)

# Viewing data types from each column
netflix_df01.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [103]:
# Visualizing first 5 rows
netflix_df01.show(5)

+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|       director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                null|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|           null|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglands|Julien Leclercq|Sami Bouajila, Tr...|         null|Septem

#### Checking the number of rows in the table

In [104]:
netflix_df01.count()

8809

#### Checking data types

In [105]:
netflix_df01.dtypes

[('show_id', 'string'),
 ('type', 'string'),
 ('title', 'string'),
 ('director', 'string'),
 ('cast', 'string'),
 ('country', 'string'),
 ('date_added', 'string'),
 ('release_year', 'string'),
 ('rating', 'string'),
 ('duration', 'string'),
 ('listed_in', 'string'),
 ('description', 'string')]

## Cleaning Columns Individually

##### Using distinct function to view unique values ​​of column 'type'

In [106]:
netflix_df01.select('type').distinct().show()

+-------------+
|         type|
+-------------+
|         null|
|      TV Show|
|        Movie|
|William Wyler|
+-------------+



Checking if column "release_year" contains only numbers

In [107]:
# Defining the regular expression to extract lines with only numbers
regex_pattern_numbers = r"^\d+$"

# Extracting lines with only numbers from "release_year" column
numbers_only = netflix_df01.filter(regexp_extract(col("release_year"), regex_pattern_numbers, 0) != "")

if numbers_only.count() == 0:
    print("Column 'release_year' does not have rows with only numbers.")
else:
    print(f"Column 'release_year' has {numbers_only.count()} rows with numbers only")

Column 'release_year' has 8787 rows with numbers only


In [108]:
# Running SQL query to count blank rows in column "release_year"
netflix_df01.createOrReplaceTempView("netflix_df01")
sql_query = """
SELECT COUNT(*)
FROM netflix_df01
WHERE `release_year` IS NULL OR `release_year` = ''
"""
result = spark.sql(sql_query)
result.show()

+--------+
|count(1)|
+--------+
|       2|
+--------+



It can be concluded that the "release_year" column has only numeric or blank data, which will be replaced at the end of the notebook by "non_informed".
Now we will check the column "title"

In [109]:
# Defining the regular expression to extract lines with only numbers
regex_pattern_numbers = r"^\d+$"

# Extracting lines with only numbers from "title" column
numbers_only = netflix_df01.filter(regexp_extract(col("title"), regex_pattern_numbers, 0) != "")

if numbers_only.count() == 0:
    print("Column 'title' has no rows with numbers only.")
else:
    print(f"The 'title' column has {numbers_only.count()} lines with only numbers:")

The 'title' column has 13 lines with only numbers:


In [110]:
# Replacing the lines with only numeric values ​​with "non_informed" using regex
netflix_df01 = netflix_df01.withColumn("title", regexp_replace(col("title"), regex_pattern_numbers, "non_informed"))


In [111]:
# Checking if replacement was successful through sql query
netflix_df01.createOrReplaceTempView("netflix_df01")
spark.sql("select title from netflix_df01 where title='non_informed'").count()

13

Checking the 'duration' column data we can verify the presence of invalid data

In [112]:
# Extracting lines with only numbers from "duration" column
filtered_rows = netflix_df01.filter(regexp_extract(col("duration"), regex_pattern_numbers, 0) != "")

if filtered_rows.count() == 0:
    print("Column 'duration' has no rows with only numbers.")
else:
    print(f"The 'duration' column has {filtered_rows.count()} lines with only numbers")
    filtered_rows.show(truncate=False)

The 'duration' column has 1 lines with only numbers
+-------+-----+-------------------+----------------+-------------------------------------------------------------------------------------------------------+------------------+----------------+-------------+----------------+--------+---------+-----------+
|show_id|type |title              |director        |cast                                                                                                   |country           |date_added      |release_year |rating          |duration|listed_in|description|
+-------+-----+-------------------+----------------+-------------------------------------------------------------------------------------------------------+------------------+----------------+-------------+----------------+--------+---------+-----------+
|s1769  |Movie|The Next Karate Kid|Christopher Cain|"Pat Morita, Hilary Swank, Michael Ironside, Constance Towers, Chris Conrad, Arsenio ""Sonny"" Trinidad| Michael Cavalieri| Walton 

In [113]:
# Extracting lines with only numbers from "description" column
filtered_rows = netflix_df01.filter(regexp_extract(col("description"), regex_pattern_numbers, 0) != "")

if filtered_rows.count() == 0:
    print("Column 'description' has no rows with numbers only.")
else:
    print(f"Column 'description' has {filtered_rows.count()} lines with only numbers")
    filtered_rows.show(truncate=False)

Column 'description' has 3 lines with only numbers
+-------+-----+--------------------+------------------+------------------------------------------------------------------------------------------------+----------------+-------------------+---------------+----------------+-------------+----------------+-----------+
|show_id|type |title               |director          |cast                                                                                            |country         |date_added         |release_year   |rating          |duration     |listed_in       |description|
+-------+-----+--------------------+------------------+------------------------------------------------------------------------------------------------+----------------+-------------------+---------------+----------------+-------------+----------------+-----------+
|s5892  |Movie|Beasts of No Nation |Cary Joji Fukunaga|"Idris Elba, Abraham Attah, Kurt Egyiawan, Jude Akuwudike, Emmanuel ""King Kong"" Nii Adom Quaye

In [114]:
# Deleting the lines found above

rows_to_discard = ["s1769", "s5892", "s6210", "s6840"]
netflix_df01 = netflix_df01.filter(~col("show_id").isin(rows_to_discard))

In [115]:
# Creating a new row with the correct values ​​for each column
karate_kid_row = spark.createDataFrame([("s1769", "Movie", "The Next Karate Kid", "Christopher Cain", "Pat Morita, Hilary Swank, Michael Ironside, Constance Towers, Chris Conrad, Arsenio Sonny Trinidad, Michael Cavalieri", "", "2020-11-01", "1994", "PG", "107 min", "", "")],
                                ["show_id", "type", "title", "director", "cast", "country", "date_added", "release_year", "rating", "duration", "listed_in", "description"])

beasts_row = spark.createDataFrame([("s5892", "Movie", "Beasts of No Nation", "Cary Joji Fukunaga", "Idris Elba, Abraham Attah, Kurt Egyiawan, Jude Akuwudike, Emmanuel ""King Kong"" Nii Adom Quaye, Ama K. Abebrese, Richard Pepple", "United States", "2015-10-16", "2015", "", "", "", "")],
                                ["show_id", "type", "title", "director", "cast", "country", "date_added", "release_year", "rating", "duration", "listed_in", "description"])


backfire_row = spark.createDataFrame([("s6210", "Movie", "Backfire", "Dave Patten", "Black Deniro, Byron ""Squally"" Vinson, Dominic Costa, Jowharah Jones", "United States", "2019-04-05", "2019", "", "", "", "")],
                                ["show_id", "type", "title", "director", "cast", "country", "date_added", "release_year", "rating", "duration", "listed_in", "description"])

greek_row = spark.createDataFrame([("s6840", "Movie", "Get Him to the Greek", "Nicholas Stoller", "Jonah Hill, Russell Brand, Elisabeth Moss, Rose Byrne, Colm Meaney, Sean ""P. Diddy"" Combs, Aziz Ansari, Kristen Schaal", "United States", "2020-01-16", "2010", "", "", "", "")],
                                ["show_id", "type", "title", "director", "cast", "country", "date_added", "release_year", "rating", "duration", "listed_in", "description"])

# Adding the new rows to the original DataFrame
netflix_df01 = netflix_df01.union(karate_kid_row)
netflix_df01 = netflix_df01.union(beasts_row)
netflix_df01 = netflix_df01.union(backfire_row)
netflix_df01 = netflix_df01.union(greek_row)

In [116]:
# Checking for deletion of rows that had data in non-matching columns
netflix_df01.createOrReplaceTempView("netflix_df01")
sql_query = """
SELECT *
FROM netflix_df01
WHERE `title` = 'Backfire'
"""
result = spark.sql(sql_query)
result.show()

+-------+-----+--------+-----------+--------------------+-------------+----------+------------+------+--------+---------+-----------+
|show_id| type|   title|   director|                cast|      country|date_added|release_year|rating|duration|listed_in|description|
+-------+-----+--------+-----------+--------------------+-------------+----------+------------+------+--------+---------+-----------+
|  s6210|Movie|Backfire|Dave Patten|Black Deniro, Byr...|United States|2019-04-05|        2019|      |        |         |           |
+-------+-----+--------+-----------+--------------------+-------------+----------+------------+------+--------+---------+-----------+



Checking if there are more rows with incorrect show_id

In [117]:
# Selecting rows from column "show_id" that don't start with letter "s"
rows_to_discard2 = netflix_df01.filter(~col("show_id").startswith("s"))
rows_to_discard2.show(truncate=False)


+--------------------+-------------+-----+-------------+--------------+-------+----------+------------+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----------+
|show_id             |type         |title|director     |cast          |country|date_added|release_year|rating                       |duration                                                                                                                                          |listed_in|description|
+--------------------+-------------+-----+-------------+--------------+-------+----------+------------+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----------+
| and probably will."|null         |null |null         |null          |null   |null      |n

In [118]:
# Deleting the two lines found above
rows_to_discard2 = [' and probably will."', 'Flying Fortress"']
netflix_df01 = netflix_df01.filter(~col("show_id").isin(rows_to_discard2))

Creating a new line duly organized for the work "Flying Fortress" and assigning an id. As the movie did not have an id, the code always defines its id as the last line of the dataframe + 1, ensuring that such line is always at the end of the table, even when the dataframe receives updates.

In [119]:
from pyspark.sql.functions import lit
rows_nbr_updated = netflix_df01.count()
new_show_id = "s" + str(rows_nbr_updated+1)

# Creating a new row with the correct values ​​for each column
flying_fortress_row = spark.createDataFrame([(new_show_id, "", "Flying Fortress", "William Wyler", "", "United States", "2017-03-31", "1944", "", "", "Classic Movies, Documentaries", "This documentary centers on the crew of the B-17 Flying Fortress Memphis Belle as it prepares to execute a strategic bombing mission over Germany")],
                                ["show_id", "type", "title", "director", "cast", "country", "date_added", "release_year", "rating", "duration", "listed_in", "description"])

# Adding the new row to the original DataFrame
netflix_df01 = netflix_df01.union(flying_fortress_row)


In [120]:
#Checking the creation of the row with the correct id and checking the deletion of the one that had data in non-matching columns
netflix_df01.createOrReplaceTempView("netflix_df01")
sql_query = """
SELECT *
FROM netflix_df01
WHERE `title` = 'Flying Fortress'
"""
result = spark.sql(sql_query)
result.show()

+-------+----+---------------+-------------+----+-------------+----------+------------+------+--------+--------------------+--------------------+
|show_id|type|          title|     director|cast|      country|date_added|release_year|rating|duration|           listed_in|         description|
+-------+----+---------------+-------------+----+-------------+----------+------------+------+--------+--------------------+--------------------+
|  s8808|    |Flying Fortress|William Wyler|    |United States|2017-03-31|        1944|      |        |Classic Movies, D...|This documentary ...|
+-------+----+---------------+-------------+----+-------------+----------+------------+------+--------+--------------------+--------------------+



Changing the data type of column "date_added" from String to Date and column "release_year" from String to Integer

In [121]:
netflix_df01 = netflix_df01.withColumn("date_added", to_date(col("date_added"), "MMMM dd, yyyy")) \
    .withColumn("release_year",netflix_df01["release_year"].cast(IntegerType()))

Cleaning up leftover bad data from column 'duration' and replacing it with non_informed

In [122]:
netflix_df01 = netflix_df01.withColumn("duration", when((col("duration").like("%min")) | (col("duration").like("%Seasons")) | (col("duration").like("%Season")),
                                                        col("duration")).otherwise("non_informed"))

In [123]:
netflix_df01.select('duration').distinct().collect()

[Row(duration='100 min'),
 Row(duration='153 min'),
 Row(duration='71 min'),
 Row(duration='56 min'),
 Row(duration='13 min'),
 Row(duration='119 min'),
 Row(duration='33 min'),
 Row(duration='165 min'),
 Row(duration='10 Seasons'),
 Row(duration='12 min'),
 Row(duration='204 min'),
 Row(duration='142 min'),
 Row(duration='173 min'),
 Row(duration='27 min'),
 Row(duration='157 min'),
 Row(duration='30 min'),
 Row(duration='39 min'),
 Row(duration='8 Seasons'),
 Row(duration='82 min'),
 Row(duration='21 min'),
 Row(duration='138 min'),
 Row(duration='40 min'),
 Row(duration='133 min'),
 Row(duration='24 min'),
 Row(duration='312 min'),
 Row(duration='25 min'),
 Row(duration='11 Seasons'),
 Row(duration='167 min'),
 Row(duration='124 min'),
 Row(duration='4 Seasons'),
 Row(duration='80 min'),
 Row(duration='13 Seasons'),
 Row(duration='102 min'),
 Row(duration='113 min'),
 Row(duration='77 min'),
 Row(duration='67 min'),
 Row(duration='141 min'),
 Row(duration='168 min'),
 Row(duration='

Cleaning the "director" column

In [124]:
# extracting the lines with only numbers from the "director" column
numbers_only = netflix_df01.filter(regexp_extract(col("director"), regex_pattern_numbers, 0) != "")

if numbers_only.count() == 0:
    print("Column 'director' has no rows with only numbers.")
else:
    print(f"The 'director' column has {numbers_only.count()} lines with only numbers")

Column 'director' has no rows with only numbers.


Cleaning the "cast" column 

In [125]:
# extracting the lines with only numbers from "cast" column
numbers_only = netflix_df01.filter(regexp_extract(col("cast"), regex_pattern_numbers, 0) != "")

if numbers_only.count() == 0:
    print("Column 'cast' has no rows with numbers only.")
else:
    print(f"The 'cast' column has {numbers_only.count()} lines with only numbers")

Column 'cast' has no rows with numbers only.


Cleaning the "country" column

In [126]:
# extracting the lines with only numbers from "country" column
numbers_only = netflix_df01.filter(regexp_extract(col("country"), regex_pattern_numbers, 0) != "")

if numbers_only.count() == 0:
    print("Column 'country' does not have rows with only numbers.")
else:
    print(f"Column 'country' has {numbers_only.count()} lines with only numbers")
    numbers_only.show(truncate=False)

Column 'country' does not have rows with only numbers.


In [127]:
# extracting the lines with only numbers from "listed_in" column
numbers_only = netflix_df01.filter(regexp_extract(col("listed_in"), regex_pattern_numbers, 0) != "")

if numbers_only.count() == 0:
    print("Column 'listed_in' does not have rows with only numbers.")
else:
    print(f"The 'listed_in' column has {numbers_only.count()} lines with only numbers")
    numbers_only.show()

Column 'listed_in' does not have rows with only numbers.


In [128]:
# extracting the lines with only numbers from "description" column
numbers_only = netflix_df01.filter(regexp_extract(col("description"), regex_pattern_numbers, 0) != "")

if numbers_only.count() == 0:
    print("The 'description' column does not have rows with only numbers.")
else:
    print(f"The 'description' column has {numbers_only.count()} lines with only numbers")
    numbers_only.show()

The 'description' column does not have rows with only numbers.


### Replacing null values ​​of all columns with "not_informed"

In [129]:
netflix_df01 = netflix_df01.na.fill(value="non_informed")

### Creating the Azure SQL connection function and sending the data to the table

In [130]:
# Setting connection information to Azure SQL Server
jdbcHostname  = 'my-server-netflix.database.windows.net'
jdbcDatabase  = 'Netflix-db-3'
jdbcPort      = 1433
jdbcUrl       = f'jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30'
tableName     = 'DW_Esther_Andrade.netflix_table'
username      = config['username']
password      = config['password']

# Setting connection options
properties = {
    'user': config['username'],
    'password': config['password'],
    'driver': 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
}

# Writing the DataFrame in Azure SQL
netflix_df01.write.jdbc(url=jdbcUrl, table=tableName, mode='overwrite', properties=properties)


Checking google colab IP to authorize in database firewall rule

In [131]:
!curl ipecho.net/plain

34.73.251.109