# Data Validation for Chicago Taxi Trips Using Python and TensorFlow Data Validation
### David Lowe
### June 7, 2021

SUMMARY: The project aims to construct a data validation flow using TensorFlow Data Validation (TFDV) and document the end-to-end steps using a template. The Chicago Taxi Trips dataset is a regression situation where we attempt to predict the value of a continuous variable.

INTRODUCTION: The City of Chicago collects taxi trip data in its role as a regulatory agency. This example notebook illustrates how we can use TensorFlow Data Validation (TFDV) to investigate and visualize datasets. The data validation process includes examining descriptive statistics, inferring a schema, checking for and fixing anomalies, and detecting drift and skew in the dataset.

Additional Notes: I adapted this workflow from the TensorFlow Data Validation tutorial on TensorFlow.org (https://www.tensorflow.org/tfx/tutorials/data_validation/tfdv_basic). I also plan to build a TFDV script for validating future datasets and building machine learning models.

CONCLUSION: In this iteration, the data validation workflow helped to validate the features and structures of the training, validation, and test datasets. The workflow also generated statistics over different slices of data which can help track model and anomaly metrics.

Dataset Used: Chicago Taxi Trips Dataset, with modifications by TensorFlow.org

Dataset ML Model: Binary classification with numerical and categorical attributes

Dataset Reference: https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/chicago_data.zip

Data validation for a machine learning project generally can be broken down into the following tasks:

1. Prepare Environment
2. Generate and Visualize Training Data Statistics
3. Check Anomalies in Validation Dataset
4. Check Anomalies in Test Dataset
5. Check for Data Drift and Skew
6. Display Stats for Data Slices
7. Finalize the Schema

## Task 1 - Prepare Environment

### 1.a) Load libraries and modules

In [1]:
# Set the random seed number for reproducible results
RNG_SEED = 8

In [2]:
# Import packages
import os
import pandas as pd
from datetime import datetime
from sklearn.model_selection import train_test_split

import tensorflow as tf
import tempfile, urllib, zipfile
import tensorflow_data_validation as tfdv
from tensorflow.python.lib.io import file_io
from tensorflow_data_validation.utils import slicing_util
from tensorflow_metadata.proto.v0.statistics_pb2 import DatasetFeatureStatisticsList, DatasetFeatureStatistics
from tensorflow_metadata.proto.v0 import schema_pb2

### 1.b) Set up the controlling parameters and functions

In [3]:
# Begin the timer for the script processing
start_time_script = datetime.now()

# Set the percentage sizes for splitting the dataset
TEST_SET_RATIO = 0.5
VAL_SET_RATIO = 0.2

# Set TF's logger to only display errors to avoid internal warnings being shown
tf.get_logger().setLevel('ERROR')
print('TFDV version: {}'.format(tfdv.version.__version__))

TFDV version: 1.0.0


### 1.c) Load dataset

In [4]:
# Read CSV data into a dataframe and mark the missing data as NaN
dataset_path = 'https://dainesanalytics.com/datasets/tensorflow-chicago-taxi-trips/train_data.csv'
df_train = pd.read_csv(dataset_path, header=0)

# Take a peek at the dataframe after import
print(df_train.head())

   pickup_community_area   fare  trip_start_month  trip_start_hour  \
0                     22  12.85                 3               11   
1                     22   5.45                 8               21   
2                     33   0.00                 5               10   
3                     33  11.05                 3               15   
4                     33  11.05                 5               15   

   trip_start_day  trip_start_timestamp  pickup_latitude  pickup_longitude  \
0               7            1393673400        41.920452        -87.679955   
1               7            1439675100        41.920452        -87.679955   
2               4            1432118700        41.849247        -87.624135   
3               1            1427037300        41.849247        -87.624135   
4               6            1401464700        41.849247        -87.624135   

   dropoff_latitude  dropoff_longitude  trip_miles  pickup_census_tract  \
0         41.877406         -87.621

In [5]:
df_train.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 18 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   pickup_community_area   10000 non-null  int64  
 1   fare                    10000 non-null  float64
 2   trip_start_month        10000 non-null  int64  
 3   trip_start_hour         10000 non-null  int64  
 4   trip_start_day          10000 non-null  int64  
 5   trip_start_timestamp    10000 non-null  int64  
 6   pickup_latitude         10000 non-null  float64
 7   pickup_longitude        10000 non-null  float64
 8   dropoff_latitude        9693 non-null   float64
 9   dropoff_longitude       9693 non-null   float64
 10  trip_miles              10000 non-null  float64
 11  pickup_census_tract     0 non-null      float64
 12  dropoff_census_tract    7148 non-null   float64
 13  payment_type            10000 non-null  object 
 14  company                 6558 non-null  

In [6]:
# Read CSV data into a dataframe and mark the missing data as NaN
dataset_path = 'https://dainesanalytics.com/datasets/tensorflow-chicago-taxi-trips/eval_data.csv'
df_val = pd.read_csv(dataset_path, header=0)

# Take a peek at the dataframe after import
print(df_val.head())

   pickup_community_area   fare  trip_start_month  trip_start_hour  \
0                     60  27.05                10                2   
1                     10   5.85                10                1   
2                     14  16.65                 5                7   
3                     13  16.45                11               12   
4                     16  32.05                12                1   

   trip_start_day  trip_start_timestamp  pickup_latitude  pickup_longitude  \
0               3            1380593700        41.836150        -87.648788   
1               2            1382319000        41.985015        -87.804532   
2               5            1369897200        41.968069        -87.721559   
3               3            1446554700        41.983636        -87.723583   
4               1            1417916700        41.953582        -87.723452   

   dropoff_latitude  dropoff_longitude  trip_miles  pickup_census_tract  \
0               NaN                

In [7]:
df_val.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 18 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   pickup_community_area   5000 non-null   int64  
 1   fare                    5000 non-null   float64
 2   trip_start_month        5000 non-null   int64  
 3   trip_start_hour         5000 non-null   int64  
 4   trip_start_day          5000 non-null   int64  
 5   trip_start_timestamp    5000 non-null   int64  
 6   pickup_latitude         5000 non-null   float64
 7   pickup_longitude        5000 non-null   float64
 8   dropoff_latitude        4827 non-null   float64
 9   dropoff_longitude       4827 non-null   float64
 10  trip_miles              5000 non-null   float64
 11  pickup_census_tract     0 non-null      float64
 12  dropoff_census_tract    3613 non-null   float64
 13  payment_type            5000 non-null   object 
 14  company                 3302 non-null   

In [8]:
# Read CSV data into a dataframe and mark the missing data as NaN
dataset_path = 'https://dainesanalytics.com/datasets/tensorflow-chicago-taxi-trips/serving_data.csv'
df_test = pd.read_csv(dataset_path, header=0)

# Take a peek at the dataframe after import
print(df_test.head())

   pickup_community_area   fare  trip_start_month  trip_start_hour  \
0                      8   6.45                 9               17   
1                      8   8.05                10               20   
2                     32   7.65                 3               20   
3                      8  36.05                 6               17   
4                     32   4.45                 4               12   

   trip_start_day  trip_start_timestamp  pickup_latitude  pickup_longitude  \
0               4            1441213200        41.892073        -87.628874   
1               7            1414269000        41.899156        -87.626211   
2               2            1395087300        41.880994        -87.632746   
3               7            1370713500        41.890922        -87.618868   
4               3            1398775500        41.880994        -87.632746   

   dropoff_latitude  dropoff_longitude  trip_miles  pickup_census_tract  \
0         41.880994         -87.632

In [9]:
df_test.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 17 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   pickup_community_area   100 non-null    int64  
 1   fare                    100 non-null    float64
 2   trip_start_month        100 non-null    int64  
 3   trip_start_hour         100 non-null    int64  
 4   trip_start_day          100 non-null    int64  
 5   trip_start_timestamp    100 non-null    int64  
 6   pickup_latitude         100 non-null    float64
 7   pickup_longitude        100 non-null    float64
 8   dropoff_latitude        98 non-null     float64
 9   dropoff_longitude       98 non-null     float64
 10  trip_miles              100 non-null    float64
 11  pickup_census_tract     0 non-null      float64
 12  dropoff_census_tract    74 non-null     float64
 13  payment_type            100 non-null    object 
 14  company                 66 non-null     obj

### 1.d) Splitting Data into Sets

In [10]:
# Split the data further into training, validation, and test datasets
# df_train, df_val = train_test_split(df_dataset_import, test_size=VAL_SET_RATIO, random_state=RNG_SEED)

print("Training dataset has {} records and {} columns.".format(df_train.shape[0], df_train.shape[1]))
print("Validation dataset has {} records and {} columns.".format(df_val.shape[0], df_val.shape[1]))
print("Test dataset has {} records and {} columns".format(df_test.shape[0], df_test.shape[1]))

Training dataset has 10000 records and 18 columns.
Validation dataset has 5000 records and 18 columns.
Test dataset has 100 records and 17 columns


## Task 2 - Generate and Visualize Training Data Statistics

### 2.a) Removing Irrelevant Features

In [11]:
# Define features to remove
features_to_remove = {'company'}

# Collect features to whitelist while computing the statistics
features_to_keep = [col for col in df_train.columns if (col not in features_to_remove)]

# Instantiate a StatsOptions class and define the feature_whitelist property
options_train = tfdv.StatsOptions(feature_allowlist=features_to_keep)

# Review the features to generate the statistics
print(options_train.feature_allowlist)

['pickup_community_area', 'fare', 'trip_start_month', 'trip_start_hour', 'trip_start_day', 'trip_start_timestamp', 'pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude', 'trip_miles', 'pickup_census_tract', 'dropoff_census_tract', 'payment_type', 'trip_seconds', 'dropoff_community_area', 'tips']


### 2.b) Generate Statistics for Training Data

In [12]:
train_stats = tfdv.generate_statistics_from_dataframe(df_train, stats_options=options_train)

# get the number of features used to compute statistics
print(f"Number of features used: {len(train_stats.datasets[0].features)}")

# check the number of examples used
print(f"Number of examples used: {train_stats.datasets[0].num_examples}")

# check the column names of the first and last feature
print(f"First feature: {train_stats.datasets[0].features[0].path.step[0]}")
print(f"Last feature: {train_stats.datasets[0].features[-1].path.step[0]}")

Number of features used: 17
Number of examples used: 10000
First feature: pickup_community_area
Last feature: tips


### 2.c) Visualize Training Statistics

In [13]:
tfdv.visualize_statistics(train_stats)

### 2.d) Infer Training Schema

In [14]:
# Infer the data schema by using the training statistics previously generated
dataset_schema = tfdv.infer_schema(train_stats)

# Display the data schema
tfdv.display_schema(dataset_schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'pickup_community_area',INT,required,,-
'fare',FLOAT,required,,-
'trip_start_month',INT,required,,-
'trip_start_hour',INT,required,,-
'trip_start_day',INT,required,,-
'trip_start_timestamp',INT,required,,-
'pickup_latitude',FLOAT,required,,-
'pickup_longitude',FLOAT,required,,-
'dropoff_latitude',FLOAT,optional,single,-
'dropoff_longitude',FLOAT,optional,single,-


  pd.set_option('max_colwidth', -1)


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'payment_type',"'Cash', 'Credit Card', 'Dispute', 'No Charge', 'Pcard', 'Unknown'"


## Task 3 - Check Anomalies in Validation Dataset

### 3.a) Generate Statistics for Validation Data

In [15]:
val_stats = tfdv.generate_statistics_from_dataframe(df_val, stats_options=options_train)

# get the number of features used to compute statistics
print(f"Number of features used: {len(val_stats.datasets[0].features)}")

# check the number of examples used
print(f"Number of examples used: {val_stats.datasets[0].num_examples}")

# check the column names of the first and last feature
print(f"First feature: {val_stats.datasets[0].features[0].path.step[0]}")
print(f"Last feature: {val_stats.datasets[0].features[-1].path.step[0]}")

Number of features used: 17
Number of examples used: 5000
First feature: pickup_community_area
Last feature: tips


### 3.b) Compare Validation with Training Statistics

In [16]:
tfdv.visualize_statistics(lhs_statistics=val_stats, rhs_statistics=train_stats,
                          lhs_name='VAL_DATASET', rhs_name='TRAIN_DATASET')

### 3.c) Detect Anomalies

In [17]:
val_anomalies = tfdv.validate_statistics(statistics=val_stats, schema=dataset_schema)
tfdv.display_anomalies(val_anomalies)

  pd.set_option('max_colwidth', -1)


Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'payment_type',Unexpected string values,Examples contain values missing from the schema: Prcard (<1%).


### 3.d) Fix Validation Data Anomalies in Schema

In [18]:
# Add new value to the domain of feature payment_type.
payment_type_domain = tfdv.get_domain(dataset_schema, 'payment_type')
payment_type_domain.value.append('Prcard')

# Validate eval stats after updating the schema 
updated_anomalies = tfdv.validate_statistics(val_stats, dataset_schema)
tfdv.display_anomalies(updated_anomalies)

  pd.set_option('max_colwidth', -1)


## Task 4 - Check Anomalies in Test Dataset

### 4.a) Generate Statistics for Test Data

In [19]:
# Define a new statistics options by the tfdv.StatsOptions class for the serving data by passing the previously inferred schema
options_test = tfdv.StatsOptions(schema=dataset_schema, infer_type_from_schema=True, feature_allowlist=features_to_keep)

In [20]:
# Generate serving dataset statistics
test_stats = tfdv.generate_statistics_from_dataframe(df_test, stats_options=options_test)

# get the number of features used to compute statistics
print(f"Number of features used: {len(test_stats.datasets[0].features)}")

# check the number of examples used
print(f"Number of examples used: {test_stats.datasets[0].num_examples}")

# check the column names of the first and last feature
print(f"First feature: {test_stats.datasets[0].features[0].path.step[0]}")
print(f"Last feature: {test_stats.datasets[0].features[-1].path.step[0]}")

Number of features used: 16
Number of examples used: 100
First feature: pickup_community_area
Last feature: dropoff_community_area


### 4.b) Compare Test with Training Statistics

In [21]:
test_anomalies = tfdv.validate_statistics(statistics=test_stats, schema=dataset_schema)
tfdv.display_anomalies(test_anomalies)

Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'tips',Column dropped,Column is completely missing


### 4.c) Fix Test Data Anomalies in Schema

In [22]:
# Not applicable in this iteration of the data validation exercise

### 4.d) Organize Schema based on Environment

In [23]:
# All features are by default in both TRAINING and SERVING environments.
dataset_schema.default_environment.append('TRAINING')
dataset_schema.default_environment.append('TESTING')

In [24]:
# Specify that 'target' feature is not in TEST environment.
tfdv.get_feature(dataset_schema, 'tips').not_in_environment.append('TESTING')

# Re-calculate and re-display anomalies with the new environment parameters
test_anomalies = tfdv.validate_statistics(statistics=test_stats, schema=dataset_schema, environment='TESTING')
tfdv.display_anomalies(test_anomalies)

  pd.set_option('max_colwidth', -1)


## Task 5 - Check for Data Drift and Skew

In [25]:
# Add skew comparator for 'payment_type' feature.
payment_type = tfdv.get_feature(dataset_schema, 'payment_type')
payment_type.skew_comparator.infinity_norm.threshold = 0.01

skew_anomalies = tfdv.validate_statistics(train_stats, dataset_schema,
                                          previous_statistics=val_stats,
                                          serving_statistics=test_stats)

tfdv.display_anomalies(skew_anomalies)

Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'payment_type',High Linfty distance between training and serving,"The Linfty distance between training and serving is 0.0225 (up to six significant digits), above the threshold 0.01. The feature value with maximum difference is: Credit Card"


## Task 6 - Display Stats for Data Slices

In [26]:
def split_datasets(dataset_list):
    '''
    split datasets.

            Parameters:
                    dataset_list: List of datasets to split

            Returns:
                    datasets: sliced data
    '''
    datasets = []
    for dataset in dataset_list.datasets:
        proto_list = DatasetFeatureStatisticsList()
        proto_list.datasets.extend([dataset])
        datasets.append(proto_list)
    return datasets

In [27]:
def display_stats_at_index(index, datasets):
    '''
    display statistics at the specified data index

            Parameters:
                    index : index to show the anomalies
                    datasets: split data

            Returns:
                    display of generated sliced data statistics at the specified index
    '''
    if index < len(datasets):
        print(datasets[index].datasets[0].name)
        tfdv.visualize_statistics(datasets[index])

In [28]:
def sliced_stats_for_slice_fn(slice_fn, approved_cols, dataframe, schema):
    '''
    generate statistics for the sliced data.

            Parameters:
                    slice_fn : slicing definition
                    approved_cols: list of features to pass to the statistics options
                    dataframe: pandas dataframe to slice
                    schema: the schema

            Returns:
                    slice_info_datasets: statistics for the sliced dataset
    '''
    # Set the StatsOptions
    slice_stats_options = tfdv.StatsOptions(schema=schema,
                                            slice_functions=[slice_fn],
                                            infer_type_from_schema=True,
                                            feature_allowlist=approved_cols)
    
    # Convert Dataframe to CSV since `slice_functions` works only with `tfdv.generate_statistics_from_csv`
    CSV_PATH = 'slice_sample.csv'
    dataframe.to_csv(CSV_PATH)
    
    # Calculate statistics for the sliced dataset
    sliced_stats = tfdv.generate_statistics_from_csv(CSV_PATH, stats_options=slice_stats_options)
    
    # Split the dataset using the previously defined split_datasets function
    slice_info_datasets = split_datasets(sliced_stats)
    
    return slice_info_datasets

In [29]:
# Generate slice function for one of the features
slice_fn = slicing_util.get_feature_value_slicer(features={'payment_type': None})

# Generate stats for the sliced dataset
slice_datasets = sliced_stats_for_slice_fn(slice_fn, features_to_keep, dataframe=df_train, schema=dataset_schema)

# Print name of slices for reference
print(f'Statistics generated for:\n')
print('\n'.join([sliced.datasets[0].name for sliced in slice_datasets]))

# Display at index 1 as an example
display_stats_at_index(1, slice_datasets) 





Statistics generated for:

All Examples
payment_type_Cash
payment_type_Credit Card
payment_type_No Charge
payment_type_Unknown
payment_type_Dispute
payment_type_Pcard
payment_type_Cash


## Task 7 - Finalize the Schema

In [30]:
# Create output directory
OUTPUT_DIR = "output"
file_io.recursive_create_dir(OUTPUT_DIR)

# Use TensorFlow text output format pbtxt to store the schema
schema_file = os.path.join(OUTPUT_DIR, 'schema.pbtxt')

# write_schema_text function expect the defined schema and output path as parameters
tfdv.write_schema_text(dataset_schema, schema_file) 

In [31]:
print ('Total time for the script:',(datetime.now() - start_time_script))

Total time for the script: 0:00:09.833700
