<h1>Part 03 - Data Management</h1>

<h2>Exercise 01 - Data Quality Test with Great Expectations</h2>


<font size="3">
Goal of this section is to get to know better data management tools, in particular Great Expectation, which allows you to perform data quality assessment & alerting on your day to day projects.

We will continue using the TLC trip record data, deep dive specifically into these datasets to catch data quality issues & encode our own set of rules & triggers.
</font>

## 1 - Getting familiar with the data

### 1-1. Downloading our data

To start with, let's download our data: we will use a larger dataset containing several details about January 2022 "for hire vehicles" trips in NYC (Uber, Lyft...). This dataset has been lightly modified for the purpose of our exercise. Let's download it & save it under our data folder. 

In [2]:
import gdown
import os

data_folder = "data"

# Check whether the specified path exists or not
isExist = os.path.exists(data_folder)
if not isExist:
    # Create a new directory because it does not exist
    os.makedirs(data_folder)
    print(f"New directory {data_folder} created!")

gdown.download(
    "https://drive.google.com/uc?id=1xQ8heQzUkKehOUPYvrHIqQ_pDJNCH9tT",
    "data/taxi-trips-2022-01.parquet",
    quiet=False,
)
gdown.download(
    "https://drive.google.com/uc?id=11kOFkDJIXSW2Hu0o2o-PWBhTJi0msYfH",
    "data/taxi-trips-2022-02.parquet",
    quiet=False,
)

Downloading...
From: https://drive.google.com/uc?id=1xQ8heQzUkKehOUPYvrHIqQ_pDJNCH9tT
To: /app/tp2/data/taxi-trips-2022-01.parquet

  0% 0.00/175M [00:00<?, ?B/s][A
  0% 524k/175M [00:00<00:36, 4.82MB/s][A
  1% 1.57M/175M [00:00<00:26, 6.62MB/s][A
  2% 2.62M/175M [00:00<00:23, 7.20MB/s][A
  2% 3.67M/175M [00:00<00:23, 7.33MB/s][A
  3% 4.72M/175M [00:00<00:22, 7.47MB/s][A
  3% 5.77M/175M [00:00<00:22, 7.48MB/s][A
  4% 6.82M/175M [00:00<00:21, 7.67MB/s][A
  5% 7.86M/175M [00:01<00:21, 7.75MB/s][A
  5% 8.91M/175M [00:01<00:21, 7.65MB/s][A
  6% 9.96M/175M [00:01<00:21, 7.81MB/s][A
  6% 11.0M/175M [00:01<00:21, 7.65MB/s][A
  7% 12.1M/175M [00:01<00:21, 7.59MB/s][A
  8% 13.1M/175M [00:01<00:21, 7.61MB/s][A
  8% 14.2M/175M [00:01<00:21, 7.42MB/s][A
  9% 15.2M/175M [00:02<00:21, 7.41MB/s][A
  9% 16.3M/175M [00:02<00:21, 7.53MB/s][A
 10% 17.3M/175M [00:02<00:20, 7.61MB/s][A
 11% 18.4M/175M [00:02<00:20, 7.67MB/s][A
 11% 19.4M/175M [00:02<00:20, 7.65MB/s][A
 12% 20.4M/175M [0

'data/taxi-trips-2022-02.parquet'

### 1-2. Loading our data

Our data is composed of several columns, the most interesting ones being:
- `hvfhs_license_num`: this is the Taxi & License Commission license number of the company operating the trip. Possible values are HV0002 (Juno), HV0003 (Uber), HV0004 (Via), HV0005 (Lyft).
- `request_datetime`, `on_scene_datetime`, `pickup_datetime`, `dropoff_datetime`: logs datetime for ride request, when driver arrived, picked-up & dropped off passenger(s).
- `PULocationID`, `DOLocationID`: where the trip began & ended. Those are `int` values.
- `trip_miles`, `trip_time`: miles for passenger trip & total time in seconds of trip
- `base_passenger_fare`: base fare excluding toll (`tolls`), tips (`tips`), taxes (`sales_tax`) and fees (`airport_fee`, `congestion_surcharge`, `bcf`). 
- `driver_pay`: total driver pay (exclusing tools, tips, commission, taxes...)
- `shared_match_flag`: did the passenger share the vehicle with another passenger who booked separately? (Y/N)

Let's load it and print the first rows.

In [3]:
import pandas as pd

data = pd.read_parquet("data/taxi-trips-2022-01.parquet")
data.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,...,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
13103716,HV0005,B03406,,2022-01-28 14:37:32,NaT,2022-01-28 14:43:10,2022-01-28 14:50:13,236,263,0.58,...,0.65,2.75,0.0,0.0,5.47,N,N,N,N,Y
2948771,HV0003,B03404,B03404,2022-01-07 23:13:02,2022-01-07 23:15:30,2022-01-07 23:17:30,2022-01-07 23:53:36,261,146,9.79,...,3.11,2.75,0.0,0.0,28.93,N,N,,N,N
10761352,HV0005,B03406,,2022-01-23 16:41:26,NaT,2022-01-23 16:44:31,2022-01-23 17:04:13,164,138,9.437,...,3.25,2.75,2.5,0.0,20.38,N,N,N,N,N
12840678,HV0003,B03404,B03404,2022-01-28 01:02:23,2022-01-28 01:03:45,2022-01-28 01:05:46,2022-01-28 01:11:07,36,37,0.76,...,0.71,0.0,0.0,0.0,5.48,N,N,,N,N
7438686,HV000,B03404,B03404,2022-01-17 06:43:01,2022-01-17 06:44:18,2022-01-17 06:44:43,2022-01-17 07:13:53,228,42,15.72,...,4.08,2.75,0.0,0.0,31.98,N,N,,N,N


### 1-3. Exploring our data

Let's assume our goal will be to create an application able to predict the fare of a trip, from the pick-up & dropoff locations. We will eventually be using the following columns:
- `base_passenger_fare`: our target variable
- `hvfhs_license_num`: fare might depend on operating company (HV0003 or HV0005)
- `request_datetime`, `on_scene_datetime`, `pickup_datetime`, `dropoff_datetime`: fare might depend on congestion & time of pickup
- `PULocationID`, `DOLocationID`: fare will depend on pick up and drop off location
- `trip_miles`, `trip_time`: these fields might be useful to normalize training data

Let's first explore quality of these key fields. What can you see? Is data quality sufficient?

In [11]:
data.describe()

Unnamed: 0,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay
count,3687898.0,3687898.0,3687898.0,3687898.0,3687898.0,3687898.0,3687898.0,3687898.0,3687898.0,3687898.0,3687898.0,3687898.0
mean,138.0291,141.3996,5.115186,1020.185,19.62433,0.9007726,0.6222032,1.708277,1.079377,0.1471579,0.8055923,15.64649
std,75.77711,78.07552,89.06475,686.3401,15.5527,3.419244,0.5281585,1.312195,1.351224,0.5921211,2.450585,12.29859
min,2.0,1.0,0.0,1.0,-263.69,0.0,0.0,0.0,0.0,0.0,-36.41,-15.44
25%,74.0,75.0,1.52,549.0,9.85,0.0,0.3,0.85,0.0,0.0,0.0,7.68
50%,139.0,141.0,2.81,851.0,15.41,0.0,0.47,1.34,0.0,0.0,0.0,12.03
75%,211.0,216.0,5.72,1304.0,24.0,0.0,0.74,2.11,2.75,0.0,0.0,19.42
max,265.0,265.0,110490.0,101446.0,3078.21,456.26,92.57,107.06,11.0,6.4,203.16,1693.02


In [8]:
def missing_values(df, keep_zeros=True):
    data_count = df.shape[0] * df.shape[1]
    missing = missing_df = df.isna().sum()

    if not keep_zeros:
        missing_df = missing_df[missing_df > 0]

    missing_df = missing_df.sort_values(ascending=False).apply(lambda m: f"{m} ({round((m * 100) / df.shape[0], 2)}%)")

    print((
        f"Missing values: {round((missing.sum() / data_count) * 100, 2)}%\n"
        f"{missing_df}"
    ))


missing_values(df=data, keep_zeros=True)

Missing values: 2.21%
originating_base_num    980125 (26.58%)
on_scene_datetime       980056 (26.57%)
hvfhs_license_num              0 (0.0%)
bcf                            0 (0.0%)
wav_request_flag               0 (0.0%)
access_a_ride_flag             0 (0.0%)
shared_match_flag              0 (0.0%)
shared_request_flag            0 (0.0%)
driver_pay                     0 (0.0%)
tips                           0 (0.0%)
airport_fee                    0 (0.0%)
congestion_surcharge           0 (0.0%)
sales_tax                      0 (0.0%)
tolls                          0 (0.0%)
dispatching_base_num           0 (0.0%)
base_passenger_fare            0 (0.0%)
trip_time                      0 (0.0%)
trip_miles                     0 (0.0%)
DOLocationID                   0 (0.0%)
PULocationID                   0 (0.0%)
dropoff_datetime               0 (0.0%)
pickup_datetime                0 (0.0%)
request_datetime               0 (0.0%)
wav_match_flag                 0 (0.0%)
dtype: object


In [10]:
def duplicated_values(df):
    data_count = df.shape[0] * df.shape[1]
    duplicated = df.duplicated().sum()

    print(f"Duplicated values: {duplicated} ({round((duplicated.sum() / data_count) * 100, 2)}%)")


duplicated_values(df=data)

Duplicated values: 0 (0.0%)


In [6]:
data["hvfhs_license_num"].unique()

array(['HV0005', 'HV0003', 'HV000'], dtype=object)

In [5]:
data["on_scene_datetime"].unique()

array([                          'NaT', '2022-01-07T23:15:30.000000000',
       '2022-01-28T01:03:45.000000000', ...,
       '2022-01-05T21:42:35.000000000', '2022-01-31T16:17:41.000000000',
       '2022-01-04T00:56:08.000000000'], dtype='datetime64[ns]')

**Conclusion:** : 
Missing values
Invalid values
Preprocessed required

## 2 - Installing Great Expectations

Great expectation allows us to:
- define data quality rules in a language agnostic format (as config files)
- run these data quality checks & rules on various types of data sources
- trigger actions & alerting whenever a rule breaks
- generate data quality reports easily from our set of rules

Your environment should already contain great expectation as a python library. Otherwise you can simply follow the following commands to install it: https://docs.greatexpectations.io/docs/guides/setup/installation/local

In [12]:
%pip list | grep great-expectations

[0mgreat-expectations            0.15.50
Note: you may need to restart the kernel to use updated packages.


## 3 - Getting to know Great Expectations

### 3-1. Connecting to our data

As we will see, Great expectations, works with a lot of configuration files (`.yml`, `.json`). This enables us to stay language & datasource agnostic, and to have our rules & checks documented as config and not hard coded.

The main entrypoint & best practice to manage 'rules' is to have a folder `gx` where we will store all our config. 

Before starting implementing checks & triggers, we first need to connect to a dataset, and explain to Great Expectation how to connect to it. This can usually be best done in the following main file: `gx/great_expectations.yml`. 

In [13]:
import yaml
from pprint import pprint

with open("great_expectations/great_expectations.yml", "r") as stream:
    try:
        ge_config = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        print(exc)

pprint(ge_config["datasources"], indent=0)

{'taxi_trips': {'class_name': 'Datasource',
              'data_connectors': {'parquet_data_connector': {'assets': {'taxi_trips_2022': {'group_names': ['month'],
                                                                                        'pattern': 'taxi-trips-2022-(.*)\\.parquet'}},
                                                           'base_directory': '../data',
                                                           'batch_spec_passthrough': {'reader_method': 'read_parquet',
                                                                                     'reader_options': {}},
                                                           'class_name': 'ConfiguredAssetFilesystemDataConnector',
                                                           'module_name': 'great_expectations.datasource.data_connector'}},
              'execution_engine': {'class_name': 'PandasExecutionEngine',
                                  'module_name': 'great_expectations.execut

We have already made part of our task: and told great expectation where to find our dataset, and how to read it (using Pandas & the parquet read function).

### 3-2. Writing a first expectation
Data quality rules (or "expectations") can also be written in config files and are stored in the `gx/expectations/` folder.
We have already written one expecting the base fare not to be negative.

In [14]:
import json

# Open JSON file
with open("great_expectations/expectations/taxi-trips-expectations.json", "r") as f:
    data = json.load(f)

# Pretty print JSON data
print(json.dumps(data, indent=4))

{
    "data_asset_type": null,
    "expectation_suite_name": "taxi-trips-expectations",
    "expectations": [
        {
            "expectation_type": "expect_column_min_to_be_between",
            "kwargs": {
                "column": "base_passenger_fare",
                "min_value": 0
            },
            "meta": {
                "notes": {
                    "format": "markdown",
                    "content": "Target variable should not be negative as drivers should be paid a positive amount."
                }
            }
        }
    ],
    "ge_cloud_id": null,
    "meta": {
        "great_expectations_version": "0.15.46"
    }
}


You will have to define your own expectations afterwards, feel free to [explore the doc](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_based_on_domain_knowledge_without_inspecting_data_directly) to understand the JSON definition of expectations.

### 3-3. Checking our data
Now that we can connect to our data... and have defined a set of data quality rules, how do we apply these rules to our datasources? As you would expect, great expectations also uses configuration files to run data checks, as found in the `great_expectations/checkpoints/` folder. Where we bin a datasource (and particularly a data asset) to a suite of expectations.

In [15]:
with open("great_expectations/checkpoints/taxi-trips-checkpoint.yml", "r") as stream:
    try:
        chkp_config = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        print(exc)

pprint(chkp_config)

{'class_name': 'SimpleCheckpoint',
 'config_version': 1.0,
 'name': 'taxi-trips-checkpoint',
 'run_name_template': '%Y%m%d-%H%M%S-my-run-name-template',
 'validations': [{'batch_request': {'data_asset_name': 'taxi_trips_2022',
                                    'data_connector_name': 'parquet_data_connector',
                                    'data_connector_query': {'batch_filter_parameters': {'month': '02'}},
                                    'datasource_name': 'taxi_trips'},
                  'expectation_suite_name': 'taxi-trips-expectations'}]}


Again, take some time to [follow the documentation](https://docs.greatexpectations.io/docs/guides/validation/checkpoints/how_to_create_a_new_checkpoint/) to understand the content of this file.

Before running our checkpoint, let's introduce the `great_expectations.data_context`: this object scans your repository and stores all datasources, checkpoints & expectations you have defined. You can then handle them from your code.

In [16]:
import great_expectations as gx
import great_expectations.jupyter_ux
from great_expectations.datasource.types import BatchKwargs

import datetime

#context = gx.get_context()
context = gx.data_context.DataContext()
print(context.list_expectation_suite_names())
print([datasource["name"] for datasource in context.list_datasources()])
print(context.list_checkpoints())

 80% 152M/190M [13:21<03:17, 190kB/s] 


2023-03-09T14:38:23+0000 - INFO - Great Expectations logging enabled at 20 level by JupyterUX module.
2023-03-09T14:38:23+0000 - INFO - FileDataContext loading zep config
2023-03-09T14:38:23+0000 - INFO - GxConfig.parse_yaml() failed with errors - [{'loc': ('xdatasources',), 'msg': 'field required', 'type': 'value_error.missing'}]
2023-03-09T14:38:23+0000 - INFO - GxConfig.parse_yaml() returning empty `xdatasources`
2023-03-09T14:38:23+0000 - INFO - Loading 'datasources' ->
{}
2023-03-09T14:38:23+0000 - INFO - Loaded 'datasources' ->
{}
2023-03-09T14:38:23+0000 - INFO - Profiler store is not configured; omitting it from active stores
2023-03-09T14:38:23+0000 - INFO - Profiler store is not configured; omitting it from active stores
2023-03-09T14:38:23+0000 - INFO - Profiler store is not configured; omitting it from active stores
2023-03-09T14:38:23+0000 - INFO - Profiler store is not configured; omitting it from active stores
2023-03-09T14:38:23+0000 - INFO - Profiler store is not confi

In [17]:
context.list_checkpoints()

['taxi-trips-checkpoint']

You can now run a checkpoint

In [18]:
context.run_checkpoint(checkpoint_name="taxi-trips-checkpoint")

2023-03-09T14:38:37+0000 - INFO - 	1 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/4 [00:00<?, ?it/s]

{
  "run_id": {
    "run_name": "20230309-143831-my-run-name-template",
    "run_time": "2023-03-09T14:38:31.384177+00:00"
  },
  "run_results": {
    "ValidationResultIdentifier::taxi-trips-expectations/20230309-143831-my-run-name-template/20230309T143831.384177Z/da953f972937f5d7391b694c2b8cfefa": {
      "validation_result": {
        "statistics": {
          "evaluated_expectations": 1,
          "successful_expectations": 0,
          "unsuccessful_expectations": 1,
          "success_percent": 0.0
        },
        "meta": {
          "great_expectations_version": "0.15.50",
          "expectation_suite_name": "taxi-trips-expectations",
          "run_id": {
            "run_name": "20230309-143831-my-run-name-template",
            "run_time": "2023-03-09T14:38:31.384177+00:00"
          },
          "batch_spec": {
            "path": "/app/tp2/great_expectations/../data/taxi-trips-2022-02.parquet",
            "reader_method": "read_parquet",
            "reader_options": {}


Note that GE allows you to export your results in a simple html format

In [19]:
context.open_data_docs()

## 4 - More expectations & more data!

### 4-1. More expectations
Now use what you have learnt to great 2 or 3 more expectations for your data. You can look for ideas there: https://greatexpectations.io/expectations/

TODO : Create your expectations in the `great_expectations/expectations/taxi-trips-expectations.json` file and once it's done run the code below to make sure they works.


In [20]:
import json

with open("great_expectations/expectations/taxi-trips-expectations.json") as f:
    expectation = json.load(f)

pprint(expectation["expectations"])

[{'expectation_type': 'expect_column_min_to_be_between',
  'kwargs': {'column': 'base_passenger_fare', 'min_value': 0},
  'meta': {'notes': {'content': 'Target variable should not be negative as '
                                'drivers should be paid a positive amount.',
                     'format': 'markdown'}}},
 {'expectation_type': 'expect_column_values_to_be_between',
  'kwargs': {'column': 'driver_pay', 'min_value': 0},
  'meta': {'notes': {'content': 'Target variable should not be negative as '
                                'drivers should be paid a positive amount.',
                     'format': 'markdown'}}}]


### 4-2. Running our new expectations
Update your checkpoint file & run the expectations you have just created. 

In [21]:
context.run_checkpoint(checkpoint_name="taxi-trips-checkpoint")
context.open_data_docs()

2023-03-09T14:51:40+0000 - INFO - 	2 expectation(s) included in expectation_suite.


Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

## 5 - Wrapping up

In this short tutorial, you have seen how to configure a simple great expectations project & run a few data quality rules. The main takeaway is that GE allows you to create expectations & run them entirely with configuration, abstracting the connection to data sources behind.

Other exercices you could work on:
- Connecting to a distant datasource (s3, BigQuery...)
- Writing your own expectation (not available in the gallery)
- Using great expectations actions to avoid deploying if data quality is not as expected
- Automate and schedule Data Quality test with Prefact (or any other orchestrator tool)