# Exercise 2: Data Cleaning

Your name: ______________

We have already seen in the last exercise that datasets oftemtimes contain "dirty" data. This can be problematic for subsequent analysis, e.g. outliers might skew results or inconsistencies might be introduced.

In this assignment, we will do the following:
 - We will again work with the New York Taxi Dataset. If you still have a local copy, great! If not, download it again following the instructions from the last exercise
 - You will use [deequ](https://github.com/awslabs/deequ), the tool we discussed in a video on Canvas, to come up and implement unit tests for the NYTD dataset. Refer also to the discussion on Canvas to implement new checks!
 - As we have already seen, the whole dataset will not pass the unit tests because it contains a lot of dirty data. However, it's also not realistic that there is a once-in-a-year batch process that uploads a whole year worth of data into your pipeline. Instead, we want to implement something more realistic: we simulate smaller batches and check them with deequ one-by-one.

We strongly advise you to use Google Colab for this exercise because setting up a correct environment is not trivial. There are a couple of setup instructions in the notebook. If you execute them in Google Colab, everything should just work ;-)


We will use the Python package for deequ:
```
pip install pyspark
pip install pydeequ
```


In [None]:
# Install required packages
!pip install pyspark
!pip install pydeequ

In [None]:
# Install Java 11. You can run this code in Google Colab. But if you use your own machine, we advise you to be careful.
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-11-openjdk-amd64/jre/bin/java

In [None]:
!java --version # This should be Java 11

In [None]:
!pyspark --version # Should be Spark 3.5.x and Scala 2.12.x

In [None]:
# Download New York Taxi trip data set (again) and convert it to CSV

import pandas as pd
df = pd.read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-08.parquet')
df.to_csv('2021_Yellow_Taxi_Trip_Data.csv')

In [None]:
import os
os.environ["SPARK_VERSION"] = "3.5"

In [None]:
from pyspark.sql import SparkSession, Row
import pydeequ

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("spark.sql.debug.maxToStringFields", 1000)
    .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Read the CSV file into a dataframe with spark
df = spark.read.option("header", True).csv("2021_Yellow_Taxi_Trip_Data.csv")
df.printSchema()

## Task 1: Strings to Types

Oh! As you can see above, all columns are interpreted as strings. That's pretty bad to work with.

Task:
1. Take a sample of roughly 10000 rows from the dataframe containing the NYTD dataset
2. Use deequ to run a profiling job over the the sample
3. Extract the inferred type information from the report using Python.
4. Automatically adjust the data types in the dataframe to reflect more fitting types
5. Print the resulting schema

Note: Outside of this exercise, you can also use Spark's `inferSchema` argument when reading a CSV, but it requires an extra pass over the data. This takes some additional time.

In [None]:
# Answer:
# <your code here>

## Task 2: Batch-process Data

When setting up a data pipeline, you rarely have all the data available from the beginning on. Instead, there will usually be inserts over time, oftentimes partitioned into batches. So let's simulate this.
We will split the dataset into many small files and look at them one-by-one with deequ.

Your task:
1. Implement at least 10 simple checks with deequ's `VerificationSuite`. Make sure some files still pass the check! You might want to draw inspiration from the discussion on Canvas.
2. Iterate over all batches and run the check on each of them. Count how many of them pass the check and output that number!

In [None]:
# Splits the given dataframe into many individual files (batches)

import tempfile
import os

tmppath = os.path.join(tempfile.gettempdir(), 'nytd_partitions')

df.write.option("maxRecordsPerFile", 1000) \
        .mode("overwrite") \
        .parquet(tmppath)

In [None]:
# Answer:
# <your code here>

In [None]:
# Shutdown Spark session
spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()

## Feedback (voluntary)

How did you like this exercise? What could be improved?

Answer:

...

Further, I feel like:
 - [ ] the exercise was too easy
 - [ ] the exercise was too hard
 - [ ] the exercise was just right
 - [x] no answer
