# Lesson Overview 

1. Review of TF Data Validation (TFDV)

2. Dataset Review, compute & visualize statistics

3.  statistics

4. Infer Schema

5. Train vs Evaluation set data errors

6. Annomolies & Skew

7. Freeze Schema



## Data Validation

TensorFlow Data Validation (TFDV) is a library for exploring and validating machine learning data. It is designed to be highly scalable and to work well with TensorFlow and TensorFlow Extended (TFX).

TF Data Validation:

+ Compute summary statistics for train/test/validation data in a scalable way. While `scikit-learn` is limited to datasets which fit into RAM, this is not a concern for TFDV.
+ Includes a viewer for data distributions and statistics (integration with [Facets](https://pair-code.github.io/facets/) 
+ Automatic schema inference 
+ Schema generation includes description of expectations about data (like required values, ranges, and vocabularies)
+ A schema viewer to help you inspect the schema
+ Anomaly detection to identify anomalies (missing features, out-of-range values, or wrong feature types)
+ An anomalies viewer to see which features have anomalies
+ TFDV can help validate new data for inference to ensure no bad features are processed
+ TFDV can help validate that your model has been trained on part of the decision surface for new data during inference 
+ TFDV can help validate data after it's been transformed by TF Transform to ensure nothing unexpected has occurred to the data

## Dataset

The dataset will be using throughout this session will be the New York Yellow Cab dataset available via [BigQuery public datasets](https://console.cloud.google.com/marketplace/details/city-of-new-york/nyc-tlc-trips?filter=solution-type:dataset&filter=category:encyclopedic).

Here is an example of how to extract data:
```
SELECT vendor_id,
       EXTRACT(MONTH FROM pickup_datetime) AS pickup_month,
       EXTRACT(HOUR FROM pickup_datetime) AS pickup_hour,
       EXTRACT(DAYOFWEEK FROM pickup_datetime) AS pickup_day_of_week, 
       EXTRACT(MONTH FROM dropoff_datetime) AS dropoff_month,
       EXTRACT(HOUR FROM dropoff_datetime) AS dropoff_hour,
       EXTRACT(DAYOFWEEK FROM dropoff_datetime) AS dropoff_day_of_week,
       passenger_count,
       store_and_fwd_flag, 
       trip_distance,
       fare_amount,
       tip_amount,
       payment_type,
       trip_type
FROM `bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2018`  

```

The columns in the dataset are:

![title](../assets/schema.png)

## Third party packages already installed!

Third party dependencies can be found in `requirements.txt` and already have been installed.

In [1]:
# check if you are using Python 2
import sys
assert sys.version_info.major is 2, 'Oops, not running Python 2'

## Load necessary packages

In [2]:
import warnings
warnings.filterwarnings("ignore")

import apache_beam as beam  
import os, sys
import shutil
import tensorflow as tf
import tensorflow_data_validation as tfdv
from google.protobuf import text_format 
from tensorflow.python.lib.io import file_io
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.coders import example_proto_coder
from tensorflow_transform.saved import saved_transform_io
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import schema_utils

print('TFDV version: {}'.format(tfdv.version.__version__))
print('TF version: {}'.format(tf.VERSION))

  from .qhull import *
  from .lbfgsb import _minimize_lbfgsb


TFDV version: 0.11.0
TF version: 1.12.0


## Define Lesson-wide Parameters

In [3]:
BASE_DIR = os.getcwd()
DATA_DIR = os.path.join(BASE_DIR, '../data')
OUTPUT_DIR = os.path.join(BASE_DIR)

# base dir containing train and eval data
TRAIN_DATA_DIR = os.path.join(DATA_DIR, 'train')
EVAL_DATA_DIR = os.path.join(DATA_DIR, 'eval')
SERVING_DATA_DIR = os.path.join(DATA_DIR, 'serving')

TRAIN_DATA = os.path.join(TRAIN_DATA_DIR, 'train.csv')
EVAL_DATA = os.path.join(EVAL_DATA_DIR, 'eval.csv')
SERVING_DATA = os.path.join(SERVING_DATA_DIR, 'serving.csv')

TF_OUTPUT_BASE_DIR = os.path.join(OUTPUT_DIR, 'tf')

## Remove output from previous runs

In [4]:
shutil.rmtree(TF_OUTPUT_BASE_DIR, ignore_errors=True)

## Preview dataset

In [5]:
! head -n 5 ../data/train/train.csv

vendor_id,pickup_month,pickup_hour,pickup_day_of_week,dropoff_month,dropoff_hour,dropoff_day_of_week,passenger_count,trip_distance,fare_amount,tip_amount,payment_type,trip_type
2,1,0,2,1,1,2,1,9.78,35,0,2,1
2,1,0,2,1,0,2,2,1.34,26,0,1,1
2,1,1,2,1,2,2,1,10.34,34.5,7.16,1,1
2,1,1,2,1,2,2,1,9.79,30.5,0,2,1


In [6]:
import pandas as pd 
data_train = pd.read_csv(os.path.join(TRAIN_DATA_DIR, 'train.csv'))
data_train.head(5)                                                  

Unnamed: 0,vendor_id,pickup_month,pickup_hour,pickup_day_of_week,dropoff_month,dropoff_hour,dropoff_day_of_week,passenger_count,trip_distance,fare_amount,tip_amount,payment_type,trip_type
0,2,1,0,2,1,1,2,1,9.78,35.0,0.0,2,1
1,2,1,0,2,1,0,2,2,1.34,26.0,0.0,1,1
2,2,1,1,2,1,2,2,1,10.34,34.5,7.16,1,1
3,2,1,1,2,1,2,2,1,9.79,30.5,0.0,2,1
4,2,1,2,2,1,3,2,1,10.99,35.0,9.08,1,1


In [7]:
data_train.describe()

Unnamed: 0,vendor_id,pickup_month,pickup_hour,pickup_day_of_week,dropoff_month,dropoff_hour,dropoff_day_of_week,passenger_count,trip_distance,fare_amount,tip_amount,payment_type,trip_type
count,7999.0,7999.0,7999.0,7999.0,7999.0,7999.0,7999.0,7999.0,7999.0,7999.0,7999.0,7999.0,7999.0
mean,1.846856,1.715214,12.830479,4.178522,1.715464,13.044131,4.166521,1.373922,9.350869,33.173018,2.395086,1.356045,1.054882
std,0.360149,0.715741,5.777055,1.949893,0.716015,5.900859,1.955372,1.043119,4.955842,27.704877,3.815409,0.581879,0.227764
min,1.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,0.0,-100.0,-0.8,1.0,1.0
25%,2.0,1.0,9.0,2.0,1.0,9.0,2.0,1.0,6.91,26.5,0.0,1.0,1.0
50%,2.0,2.0,13.0,4.0,2.0,13.0,4.0,1.0,8.65,30.5,0.0,1.0,1.0
75%,2.0,2.0,17.0,6.0,2.0,18.0,6.0,1.0,11.25,38.0,5.15,2.0,1.0
max,2.0,3.0,23.0,7.0,3.0,23.0,7.0,9.0,101.87,2126.0,63.0,5.0,2.0


## Compute statistics

TFDV can help you compute descriptive statistics which provides an overview of the data in terms of the features that are present and the shapes of their distributions.

We'll be using `tfdv.generate_statistics_from_csv` to compute statistics for our training data.

In [8]:
train_stats = tfdv.generate_statistics_from_csv(data_location = TRAIN_DATA)

  from compiler import parse, ast, pycodegen


TFDV is able to scale to datasets which don't fit in RAM since it uses [Apache Beam's](https://beam.apache.org/releases/pydoc/2.9.0/) data-parallel processing framework to scale the computation of statistics. The API also exposes a Beam PTransform for statistics generation.

## Visualize statistics

`tfdv.visualize_statistics` uses [Facets](https://pair-code.github.io/facets/) to create a visualization of our training data.

• If you have numeric features and catagorical features, they will be visualized separately. Each chart displays the distributions for each feature respectively.

• Features with missing or zero values display a percentage in red to indicate that there may be issues with examples in those features. The percentage is the percentage of examples that have missing or zero values for that feature. For example, `tip_amount` has a value of zero for 63% of the rows.

• Try clicking "expand" above the charts to change the display

• Try hovering over bars in the charts to display bucket ranges and counts

• Try switching between the log and linear scales

• Try selecting "quantiles" from the "Chart to show" menu, and hover over the markers to show the quantile percentages

In [9]:
tfdv.visualize_statistics(train_stats)

## Infer Schema

For machine learning projects with structured data, we must understand the semantic meaning of each column, it's provenance, and the type/range of values. We can use `tfdv.infer_schema` to create a schema for our data. Manually inferring a schema can be a lengthy & error prone task, especially for datasets with large number of features.

It's really important to ensure the schema has been correctly generated as this will be used by the machine learning pipeline both during model training & inference. The schema also serves as documentation for the data, which can be useful for other data scientists, business analysts and/or developers on a project. Let's use `tfdv.display_schema` to display the inferred schema so that we can review it.

In [10]:
# infer schema from training data
schema = tfdv.infer_schema(statistics=train_stats, infer_feature_shape=False)
tfdv.display_schema(schema=schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'trip_distance',FLOAT,required,single,-
'pickup_day_of_week',INT,required,single,-
'vendor_id',INT,required,single,-
'tip_amount',FLOAT,required,single,-
'dropoff_hour',INT,required,single,-
'dropoff_month',INT,required,single,-
'pickup_hour',INT,required,single,-
'pickup_month',INT,required,single,-
'fare_amount',FLOAT,required,single,-
'passenger_count',INT,required,single,-


## Train vs Evaluation Data Validation

For supervized machine learning with structured data, it's critical that we...

+ Ensure the distribution (range of values) of the training data matches that of the evaluation set. Otherwise, it's likely that what the model learns using the training data wouldn't generalize to new data during inference.

+ Ensure train/test/validation & new data (during inference) matches the same schema

+ Ensure that we reduce the training-serving skew. This is the difference between performance during training and performance during serving. This skew can be caused by:

  + A discrepancy between how you handle data in the training and serving pipelines.

  + A change in the data between when you train and when you serve.

  + A feedback loop between your model and your algorithm.
  
TFDV can help us with a majority of these scenarios.

In [11]:
# compute stats over evaluation dataset
eval_stats = tfdv.generate_statistics_from_csv(data_location = EVAL_DATA)

In [12]:
# compare stats of train vs eval data
tfdv.visualize_statistics(lhs_statistics=eval_stats, rhs_statistics=train_stats,
                          lhs_name='EVAL_DATA.SET', rhs_name='TRAIN_DATASET')

Few things to keep in mind...

• Notice that each feature now includes statistics for both the training and evaluation datasets.

• Notice that the charts now have both the training and evaluation datasets overlaid, making it easy to compare them.

• Notice that the charts now include a percentages view, which can be combined with log or the default linear scales.

• `trip_distance` is different for training vs evaluation sets. Is this an issue? How will this cause problem(s)?

## Check for Train vs Evaluation set Annomolies

There is one important question to ask before we continue. Does our evaluation dataset match the schema from our training dataset? You will need to be careful with categorical features as there may be values present in the training data which aren't in evaluation set, or vice versa.

Let's think about the following scenarios...

1) What would happen if you tried to evaluate using data with categorical feature values that were not in our training dataset? 

2) What about numeric features that are outside the ranges in our training dataset?

In [13]:
# check evaluation data for annomolies by validating against the previously inferred schema during training
anomalies = tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)

## Fix Data Annomolies in the Schema

There are various reasons why data annomolies exist. Often, there is an issue in the data collection or pipeline which feeds data downstream, so you'll want to investigate and fix any underlying data issues in upstream processes before you continue.

Another common annomoly which can occur is if you have a categorical value in your training set which isn't in the evaluation set, you'll need to use:

`tfdv.get_domain(schema, feature_name).value.append('new_unique_value')`.

While we can't fix all the annomolies, we should fix issues we are not comfortable accepting.

In [14]:
# update the schema based on the observed anomalies.
vendor_id = tfdv.get_feature(schema, 'vendor_id')
# we want feature vendor_id to be populated in at least 50% of the examples
vendor_id.presence.min_fraction = 0.5

# validate eval stats after updating the schema 
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)

We are confident that the training and evaluation data are now consistent!

## Schema Environments

For this training session, we will need to create a `serving` dataset. Typically, all datasets in a pipeline should use the same schema, however; there are some notable exceptions. For instance, in supervised learning we need to include labels in our dataset, but when we serve the model for inference the labels will not be included. For this reaosn, we need to make a slight schema variation.

We can use `Environments` to help us use slightly differing schema definitions for each use case (train, model validation, inference). Specifically, we can use `in_environment` and `not_in_environment` to indicate which features in schema should be associated with a set of environments respectively.

For example, in our dataset the `fare_amount` feature is included as the label for training, but it's missing in the serving data. Without environment specified, it will show up as an anomaly.

In [15]:
serving_stats = tfdv.generate_statistics_from_csv(data_location = SERVING_DATA)
serving_anomalies = tfdv.validate_statistics(serving_stats, schema)

tfdv.display_anomalies(serving_anomalies)

Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'fare_amount',Column dropped,Column is completely missing



Now we just have the tips feature (which is our label) showing up as an anomaly ('Column dropped'). Of course we don't expect to have labels in our serving data, so let's tell TFDV to ignore that.

In [16]:
# all features are by default in both TRAINING, EVAL and SERVING environments
schema.default_environment.append('TRAINING')
schema.default_environment.append('EVAL')
schema.default_environment.append('SERVING')

# indicate that 'fare_amount' feature is not in SERVING environment.
tfdv.get_feature(schema, 'fare_amount').not_in_environment.append('SERVING')

serving_anomalies_with_env = tfdv.validate_statistics(
    serving_stats, schema, environment='SERVING')

tfdv.display_anomalies(serving_anomalies_with_env)

## Check for Skew

In addition to checking whether a dataset conforms to the expectations set in the schema, TFDV also provides functionalities to detect drift and skew. TFDV performs this check by comparing the statistics of the different datasets based on the drift/skew comparators specified in the schema.

TFDV can detect three different kinds of skew in your data - schema skew, feature skew, and distribution skew.

**1) Schema Skew** 

We saw that the schema between training & serving is expected to be slightly different, specifically, the label feature being only present in the training data but not in serving. This should be specified through enviornment field in the schema.

**2) Feature Skew** 

Feature skew occurs when the feature values that a model trains on are different from the feature values that it sees at serving time. For example, this can happen when there is a trend such as inflation in the price of fares. 

**3) Distribution Skew** 

Distribution skew occurs when the distribution of the training dataset is significantly different from the distribution of the serving dataset. One of the key causes for distribution skew is using different code or different data sources to generate the training dataset. 

Read up on `skew_comparator.infinity_norm.threshold` & `drift_comparator.infinity_norm.threshold` to see examples for how to set a threshold for categorical feautures.

## Freeze Schema

We want to persist our schema so that it can be used by other team members as well as the rest of the TensorFlow Transform & Serving pipeline. 

In [17]:
from tensorflow.python.lib.io import file_io
from google.protobuf import text_format

file_io.recursive_create_dir(OUTPUT_DIR)
schema_file = os.path.join(OUTPUT_DIR, 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)

!cat {schema_file}

feature {
  name: "trip_distance"
  value_count {
    min: 1
    max: 1
  }
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "pickup_day_of_week"
  value_count {
    min: 1
    max: 1
  }
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "vendor_id"
  value_count {
    min: 1
    max: 1
  }
  type: INT
  presence {
    min_fraction: 0.5
    min_count: 1
  }
}
feature {
  name: "tip_amount"
  value_count {
    min: 1
    max: 1
  }
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "dropoff_hour"
  value_count {
    min: 1
    max: 1
  }
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "dropoff_month"
  value_count {
    min: 1
    max: 1
  }
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "pickup_hour"
  value_count {
