<p align="center">
<img src="https://github.com/datacamp/python-live-training-template/blob/master/assets/datacamp.svg?raw=True" alt = "DataCamp icon" width="50%">
</p>
<br><br>

## **Cleaning Data with Pyspark**

Welcome to this hands-on training where we will investigate cleaning a dataset using Python and Apache Spark! During this training, we will cover:

* Efficiently loading data into a Spark DataFrame
* Handling errant rows / columns from the dataset, including comments, missing data, combined or misinterpreted columns, etc.
* Using Python UDFs to run advanced transformations on data


## **The Dataset**

The dataset used in this webinar is a set of CSV files named `netflix_titles_raw*.csv`. These contain information related to the movies and television shows available on Netflix. These are the *dirty* versions of the dataset - we will cover the individual problems as we work through the notebook.

Given that this is a data cleaning webinar, let's look at our intended result.  The dataset will contain the follwing information:

- `show_id`: A unique identifier for the show
- `type`: The type of content, `Movie` or `TV Show`
- `title`: The title of the content
- `director`: The director (or directors)
- `cast`: The cast
- `country`: Country (or countries) where the content is available
- `date_added`: Date added to Netflix
- `release_year`: Year of content release
- `rating`: Content rating
- `duration`: The duration
- `listed_in`: The genres the content is listed in
- `description`: A description of the content



## **Setting up a PySpark session**

Before we can start processing our data, we need to configure a Pyspark session for Google Colab. Note that this is specific for using Spark and Python in Colab and likely is not required for other environments. 

In [0]:
# Run this code as is to install Spark in Colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
# Run this code to setup the environment
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
# Finally, setup our Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## **Getting started**

Before doing anything else, lets copy our data files locally. Run the follwing cell to pull the *dirty* files locally.

In [5]:
# Copy our dataset locally

!wget -O /tmp/netflix_titles_dirty_01.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_01.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_02.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_02.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_03.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_03.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_04.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_04.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_05.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_05.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_06.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_06.csv.gz?raw=True'
!wget -O /tmp/netflix_titles_dirty_07.csv.gz 'https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_07.csv.gz?raw=True'



--2020-06-12 02:32:58--  https://github.com/datacamp/data-cleaning-with-pyspark-live-training/blob/master/data/netflix_titles_dirty_01.csv.gz?raw=True
Resolving github.com (github.com)... 140.82.118.4
Connecting to github.com (github.com)|140.82.118.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/datacamp/data-cleaning-with-pyspark-live-training/raw/master/data/netflix_titles_dirty_01.csv.gz [following]
--2020-06-12 02:32:58--  https://github.com/datacamp/data-cleaning-with-pyspark-live-training/raw/master/data/netflix_titles_dirty_01.csv.gz
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/datacamp/data-cleaning-with-pyspark-live-training/master/data/netflix_titles_dirty_01.csv.gz [following]
--2020-06-12 02:32:58--  https://raw.githubusercontent.com/datacamp/data-cleaning-with-pyspark-live-training/master/data/netflix_titles_dirty_01.csv.gz
Re

## Now, let's verify that we have all 7 files we expect

In [6]:
!ls /tmp/netflix_titles*

/tmp/netflix_titles_dirty_01.csv.gz  /tmp/netflix_titles_dirty_05.csv.gz
/tmp/netflix_titles_dirty_02.csv.gz  /tmp/netflix_titles_dirty_06.csv.gz
/tmp/netflix_titles_dirty_03.csv.gz  /tmp/netflix_titles_dirty_07.csv.gz
/tmp/netflix_titles_dirty_04.csv.gz


## And then, we'll take a look at the first 20 rows of one of the files

In [7]:
!gunzip -c /tmp/netflix_titles_dirty_03.csv.gz | head -20

80142103,Movie,Bottom of the World,Richard Sears,"Jena Malone, Douglas Smith, Ted Levine, Tamara Duarte, Kelly Pendygraft, Mark Sivertsen, Jon McLaren","Canada, United States","March 31, 2017",2017,TV-MA,84 min,"Dramas, Independent Movies, Thrillers","En route to a fresh start in Los Angeles, young couple Alex and Scarlett stop over in a sleepy Southwestern town that loosens their grip on reality."
80179907,Movie,Bridget Christie: Stand Up for Her,,Bridget Christie,United Kingdom,"March 31, 2017",2016,TV-MA,51 min,Stand-Up Comedy,"Performing stand-up for a packed house in London's Hoxton Hall, comedian Bridget Christie dives into the politics of gender, sex and equality."
80152842,Movie,FirstBorn,Nirpal Bhogal,"Antonia Thomas, Luke Norris, Thea Petrie, Eileen Davies, Jonathan Hyde",United Kingdom,"March 31, 2017",2016,TV-MA,90 min,"Horror Movies, International Movies",A young couple fights supernatural foes in an attempt to save their daughter from the dark and mysterious forces that f

# Loading our initial DataFrame

Let's take a look at what Spark does with our data and see if it can properly parse the output. To do this, we'll first load the content into a DataFrame using the `spark.read.csv()` method. We'll pass in two arguments - the path to the file(s) and an entry for `header=False`. Our files do not have a header row, so we must specify this or risk a data row being interpreted as a header.

In [0]:
titles_df = spark.read.csv('/tmp/netflix_titles_dirty*.csv.gz', header=False)

## Initial analysis

Let's look at the first 100 rows using the `.show()` method on the DataFrame, and we'll pass in the number of rows to display and send set the `truncate` option to False so we can see all the DataFrame content.

In [10]:
titles_df.show(100, truncate=False)

+-----------------------------------------------+-------------+---------------------------------------------------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+--------------+------------+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0                                            |_c1          |_c2                      

If we start by looking at the first column of data, we see that most of the entries are numeric IDs. If we scroll through the data we do see a few random text entries. Let's make a note of this and browse through the rest of the data, looking for anything that might be out of the ordinary.

We can also use the `.printSchema()` method to print the inferred schema associated with the data. Notice that we have 12 columns (which is expected based on our format information) but there are no column names, incorrect datatypes, and each field is nullable.

In [11]:
titles_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



## Bypassing the CSV interpeter

Our data on initial pass looks ok, but we can see that we have a few random rows even in our small sample. We know the first column should be an integer value but it looks like there are some that do not meet this requirement. Let's run a quick select statement on the DataFrame to determine the makeup of the content.

In [12]:
# Import the Pyspark SQL helper functions
from pyspark.sql import functions as F

# Determine how many rows have a column that converts properly to an integer value
titles_df.filter(F.col("_c0").cast("int").isNotNull()).count()


6170

In [13]:
# Look at rows that don't convert properly
titles_df.filter(F.col("_c0").cast("int").isNull()).show()

+--------------------+--------------------+--------------------+-----------------+---------------+---------------+---------------+-------------+--------------------+--------------------+--------------+--------------------+
|                 _c0|                 _c1|                 _c2|              _c3|            _c4|            _c5|            _c6|          _c7|                 _c8|                 _c9|          _c10|                _c11|
+--------------------+--------------------+--------------------+-----------------+---------------+---------------+---------------+-------------+--------------------+--------------------+--------------+--------------------+
|    Flying Fortress"|       William Wyler|                null|    United States| March 31, 2017|           1944|          TV-PG|       40 min|Classic Movies, D...|This documentary ...|          null|                null|
|# As attention ru...|                null|                null|             null|           null|          

In [14]:
# Look at the full count of rows
titles_df.count()

6238

Looking at our content, we notice we have several types of problems:

- Comment rows: These begin with a `#` character in the first column, and all other columns are null
- Missing first column: We have few rows that reference `TV Show` or `Movie`, which should be the 2nd column.
- Odd columns: There are a few rows included where most columns look like actor names, vs any of the other info that would be expected.

We could fairly easily remove rows that match this pattern, but we're not entirely sure what to expect here. This is a common issue when trying to parse a large amount of data, be it in native Python, in Spark, or even with command-line tools. 

What we need to do is bypass most of the CSV parser's intelligence, but still load the content into a DataFrame. One way to do this is to modify an option on the CSV loader.

# CSV loading

Our initial import relies on the defaults for the CSV import mechanism. This typically assumes an actual comma-separated value file using `,` between fields and a normal row level terminator (ie, `\r\n`, `\r`, `\n`). While this often works well, it doesn't always handle ever data cleaning process you'd like, especially if you want to save the errant data for later examination.

One way we can trick our CSV load is to specify a custom separator that we know does not exist within our dataset. The option to do this is called `sep` and takes a single character to be used as the column separator. The separator cannot be an empty string so depending on your data, you may need to determine a character that is not used. For our purposes, let's use a curly brace, `{`, which is most likely not present in our data.

In [0]:
# Load the files into a DataFrame with a single column

titles_single_df = spark.read.csv('/tmp/netflix_titles_dirty*.csv.gz', sep='{')

In [16]:
titles_single_df.count()

6238

In [17]:
titles_single_df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

In [18]:
titles_single_df.printSchema()

root
 |-- _c0: string (nullable = true)



# Q&A?

# Cleaning up our data

We know from some earlier analysis that we have comment rows in place (ie, rows that begin with a `#`. While Spark provides a DataFrame option to handle this automatically, let's consider what it would take to remove comment rows.

We need to:

- Determine if the column / line starts with a `#`
- If so, filter these out to a new DataFrame

There are many ways to accomplish this in Spark, but let's use a conceptually straight-forward option, `.startsWith()`.

In [19]:
titles_single_new_df = titles_single_df.filter(F.col('_c0').startswith('#'))
titles_single_new_df.show()

+--------------------+
|                 _c0|
+--------------------+
|# As attention ru...|
|# Heat) across co...|
|# Multi-story hot...|
|# Birds with Oxfo...|
|# Related species...|
|# Duwamish Waterw...|
|# Elements εὐρύς ...|
|# 1910, the slave...|
|# A surprising Gr...|
|# The prairies 19...|
|# Celebration. In...|
|# Itinerant trade...|
|# Exact mechanism...|
|# York, reflectin...|
|# Just a example,...|
|# By analyzing th...|
|# (with component...|
|# Warm ocean, rec...|
|# Computers proce...|
|# Now owned Freud...|
+--------------------+
only showing top 20 rows



In [20]:
titles_single_new_df.count()

47

We've determined that we have 47 rows that begin with a comment character. We can now easily filter these from our DataFrame as we do below.

*Note*: We're doing things in a more difficult fashion than is absolutely necessary to illustrate options. The Spark CSV reader has an option for a `comment` property, which actually defaults to skipping all rows starting with a `#` character. That said, it only supports a single character - consider if you were looking for multi-character options (ie, a // or /* from C-style syntax). This feature is also only available in newer versions of Spark, where our method works in any of the 2.x Spark releases.

In [0]:
# Filter out comments

titles_single_df = titles_single_df.filter(~ F.col('_c0').startswith('#'))

In [22]:
titles_single_df.count()

6191

# Checking column counts

Our next step for cleaning this dataset in Pyspark involves determining how many columns we should have 