# Data validation

This notebook briefly shows some ways and techniques for analysing artifacts from Generator components (e.g. ExampleGen, SchemaGen, etc.). All artifacts are fetched from the metadata storage.

In [1]:
# Import required libs
import glob
import os
import pprint
import pandas as pd

import tensorflow as tf
import tensorflow_data_validation as tfdv
from tensorflow_data_validation.utils.anomalies_util import load_anomalies_binary
from tfx.orchestration import metadata
from tfx.types import standard_artifacts, standard_component_specs
from tfx.orchestration.experimental.interactive import visualizations, standard_visualizations

from pipeline.configs import PIPELINE_NAME

from utils.mlmd_helpers import get_latest_artifacts, visualize_artifacts_nb
from utils.tfx_helpers import get_records

pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", 100)

standard_visualizations.register_standard_visualizations()
pp = pprint.PrettyPrinter()

print(f'TF version: {tf.version.VERSION}')
print(f'TFDV version: {tfdv.version.__version__}')

TF version: 2.7.0
TFDV version: 1.5.0


## Metadata artifacts

In order to investigate generated components from the pipeline we need to fetch the desired artifacts.  

We start by fetching the artifacts (if generated) from `ExampleGen`, `StatisticsGen`, `SchemaGen`, `ExampleValidator`, and `Transformer`.

In [2]:
# Read artifact information from metadata store.

# Metadata store path
METADATA_PATH = os.path.abspath(os.path.join(os.getcwd(), '..',
                                             'outputs/tfx_metadata',
                                             PIPELINE_NAME,
                                             'metadata.db'))

# Data path
DATA_PATH = os.path.abspath(os.path.join(os.getcwd(), 'data/chicago_taxi_trips/data.csv'))

# Metadata store config
metadata_connection_config = metadata.sqlite_metadata_connection_config(
              METADATA_PATH)

with metadata.Metadata(metadata_connection_config) as store:
    # Load ExampleGen artifacts (generated before transform)
    try:
        example_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'CsvExampleGen')
        # example_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'FileBasedExampleGen')
    except AttributeError:
        print('CsvExampleGen not available')
    
    # Load StatisticsGen artifacts
    try:
        # stats_artifacts = store.get_artifacts_by_type(standard_artifacts.ExampleStatistics.TYPE_NAME)
        stats_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'StatisticsGen')
    except AttributeError:
        print('StatisticsGen not available')
    
    # Load SchemaGen artifacts
    try:
        # schema_artifacts = store.get_artifacts_by_type(standard_artifacts.Schema.TYPE_NAME)
        schema_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'SchemaGen')
    except AttributeError:
        print('SchemaGen not available')
    
    # Load ExampleValidator artifacts
    try:
        # anomalies_artifacts = store.get_artifacts_by_type(standard_artifacts.ExampleAnomalies.TYPE_NAME)
        anomalies_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'ExampleValidator')
    except AttributeError:
        print('ExampleValidator not available')

    # Load Transform artifacts
    try:
        transform_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'Transform')
    except AttributeError:
        print('Transform not available')

Transform not available


Next we need the URI's of the arifacts.

In [3]:
# Extract artifacts URI paths and execution IDs
try:
    example_path = os.path.abspath(os.path.join('..', example_artifacts['examples'][-1].uri))
    example_id = example_artifacts['examples'][-1].id
    train_uri = os.path.join(example_path, 'Split-train')
    print(f'Training data example URI: {train_uri}')
    print(f'Training data execution id: {example_id}')
except NameError:
    print('Examples not defined')

print('-' * 10)

try:
    stats_path = os.path.abspath(os.path.join('..', stats_artifacts['statistics'][-1].uri))
    stats_id = stats_artifacts['statistics'][-1].id
    train_stats_file = os.path.join(stats_path, 'Split-train', 'FeatureStats.pb')
    eval_stats_file = os.path.join(stats_path, 'Split-eval', 'FeatureStats.pb')
    print(f'Train stats file: {train_stats_file}, \nEval stats file: {eval_stats_file}')
    print(f'Statistics execution id: {stats_id}')
except NameError:
    print('Statistics not defined')

print('-' * 10)

try:
    schema_path = os.path.abspath(os.path.join('..', schema_artifacts['schema'][-1].uri))
    schema_id = schema_artifacts['schema'][-1].id
    schema_file = os.path.join(schema_path, 'schema.pbtxt')
    print(f'Generated schema file: {schema_file}')
    print(f'Schema execution id: {schema_id}')
except NameError:
    print('Schema not defined')

print('-' * 10)

try:
    anomalies_path = os.path.abspath(os.path.join('..', anomalies_artifacts['anomalies'][-1].uri))
    anomalies_id = anomalies_artifacts['anomalies'][-1].id
    anomalies_file = os.path.join(anomalies_path, 'Split-eval', 'SchemaDiff.pb')
    print(f'Generated anomalies file: {anomalies_file}')
    print(f'Anomalies execution id: {anomalies_id}')
except NameError:
    print('Anomalies not defined')

print('-' * 10)

try:
    tf_examples_path = os.path.abspath(os.path.join('..', transform_artifacts['transformed_examples'][-1].uri))
    tf_examples_id = transform_artifacts['transformed_examples'][-1].id
    tf_examples_uri = os.path.join(tf_examples_path, 'Split-train')
    
    tf_stats_path = os.path.abspath(os.path.join('..', transform_artifacts['post_transform_stats'][-1].uri))
    tf_stats_id = transform_artifacts['post_transform_stats'][-1].id
    tf_stats_file = os.path.join(tf_stats_path, 'FeatureStats.pb')

    tf_anom_path = os.path.abspath(os.path.join('..', transform_artifacts['post_transform_anomalies'][-1].uri))
    tf_anom_id = transform_artifacts['post_transform_anomalies'][-1].id
    tf_anom_file = os.path.join(tf_anom_path, 'SchemaDiff.pb')

    print(f'Transformed training data example URI: {tf_examples_uri}')
    print(f'Transformed training data execution id: {tf_examples_id}')
    print(f'Generated post-transform stats file: {tf_stats_file}')
    print(f'Transform stats execution id: {tf_stats_id}')
    print(f'Generated post-transform anomalies file: {tf_anom_file}')
    print(f'Transform anomalies execution id: {tf_anom_id}')
    
except NameError:
    print('Transform not defined')


Training data example URI: /Users/viktor.eriksson2/Documents/github/tfx-pipeline/outputs/tfx_pipeline_output/taxi_pipeline/CsvExampleGen/examples/1/Split-train
Training data execution id: 1
----------
Train stats file: /Users/viktor.eriksson2/Documents/github/tfx-pipeline/outputs/tfx_pipeline_output/taxi_pipeline/StatisticsGen/statistics/2/Split-train/FeatureStats.pb, 
Eval stats file: /Users/viktor.eriksson2/Documents/github/tfx-pipeline/outputs/tfx_pipeline_output/taxi_pipeline/StatisticsGen/statistics/2/Split-eval/FeatureStats.pb
Statistics execution id: 2
----------
Generated schema file: /Users/viktor.eriksson2/Documents/github/tfx-pipeline/outputs/tfx_pipeline_output/taxi_pipeline/SchemaGen/schema/3/schema.pbtxt
Schema execution id: 3
----------
Generated anomalies file: /Users/viktor.eriksson2/Documents/github/tfx-pipeline/outputs/tfx_pipeline_output/taxi_pipeline/ExampleValidator/anomalies/4/Split-eval/SchemaDiff.pb
Anomalies execution id: 4
----------
Transform not defined


## Data validation

Next steps is to visualize the data.  

We start by viewing the raw data.

In [4]:
# Preview the first few rows of the CSV file
!head -n 5 {DATA_PATH}

pickup_community_area,fare,trip_start_month,trip_start_hour,trip_start_day,trip_start_timestamp,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,trip_miles,pickup_census_tract,dropoff_census_tract,payment_type,company,trip_seconds,dropoff_community_area,tips
,12.45,5,19,6,1400269500,,,,,0.0,,,Credit Card,Chicago Elite Cab Corp. (Chicago Carriag,0,,0.0
,0,3,19,5,1362683700,,,,,0,,,Unknown,Chicago Elite Cab Corp.,300,,0
60,27.05,10,2,3,1380593700,41.836150155,-87.648787952,,,12.6,,,Cash,Taxi Affiliation Services,1380,,0.0
10,5.85,10,1,2,1382319000,41.985015101,-87.804532006,,,0.0,,,Cash,Taxi Affiliation Services,180,,0.0


In [5]:
# Preview first row as TFRecordDataset
try:
    data_files = [os.path.join(train_uri, name) for name in os.listdir(train_uri)]

    # Create a `TFRecordDataset` to read the file
    dataset = tf.data.TFRecordDataset(data_files, compression_type="GZIP")

    # Get records from the dataset
    sample_records = get_records(dataset=dataset, num_records=1)

    # Print records
    pp.pprint(sample_records)
except NameError:
    print('train_uri not defined')

[{'features': {'feature': {'company': {'bytesList': {'value': ['Q2hpY2FnbyBFbGl0ZSBDYWIgQ29ycC4gKENoaWNhZ28gQ2FycmlhZw==']}},
                           'dropoff_census_tract': {'int64List': {}},
                           'dropoff_community_area': {'int64List': {}},
                           'dropoff_latitude': {'floatList': {}},
                           'dropoff_longitude': {'floatList': {}},
                           'fare': {'floatList': {'value': [12.45]}},
                           'payment_type': {'bytesList': {'value': ['Q3JlZGl0IENhcmQ=']}},
                           'pickup_census_tract': {'int64List': {}},
                           'pickup_community_area': {'int64List': {}},
                           'pickup_latitude': {'floatList': {}},
                           'pickup_longitude': {'floatList': {}},
                           'tips': {'floatList': {'value': [0.0]}},
                           'trip_miles': {'floatList': {'value': [0.0]}},
                         

2022-02-13 19:11:54.087616: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [9]:
# It can also be nice to see the data in a pandas dataframe
df = pd.read_csv(DATA_PATH)
print(df.head(10))
print('-' * 30)
print(f'Shape of data: {df.shape}')
print('-' * 30)
print(df.describe().T)
print('-' * 30)
print('Missing rate:\n', df.isna().sum() / df.shape[0])

   pickup_community_area   fare  trip_start_month  trip_start_hour  \
0                    NaN  12.45                 5               19   
1                    NaN   0.00                 3               19   
2                   60.0  27.05                10                2   
3                   10.0   5.85                10                1   
4                   14.0  16.65                 5                7   
5                   13.0  16.45                11               12   
6                   16.0  32.05                12                1   
7                   30.0  38.45                10               10   
8                   11.0  14.65                 1                1   
9                   33.0   3.25                 5               17   

   trip_start_day  trip_start_timestamp  pickup_latitude  pickup_longitude  \
0               6            1400269500              NaN               NaN   
1               5            1362683700              NaN               Na

### Tensorflow Data Validation

We have loaded the statistics of both our train and evals sets. These will now be visualized and compared using the `tensorflow_data_validation` library.

In [8]:
# Load generated statistics from StatisticsGen
try:
    train_stats = tfdv.load_stats_binary(train_stats_file)
    eval_stats = tfdv.load_stats_binary(eval_stats_file)
    tfdv.visualize_statistics(lhs_statistics=eval_stats, rhs_statistics=train_stats,
                              lhs_name='EVAL_DATASET', rhs_name='TRAIN_DATASET')
except NameError:
    print('train_stats/eval_stats not defined')

We review the schema created from the statistics.

> **Note**: that the schema is based on the training split.

In [None]:
# Load generated schema from SchemaGen
try:
    schema = tfdv.load_schema_text(schema_file)
    tfdv.display_schema(schema=schema)
except NameError:
    print('schema not defined')

Next, we review if there are any anomalies detected in the `eval` dataset. The anomalies are calculated based on the generated statistics and schema from the `train` dataset.

In [None]:
# Load data vaildation result from ExampleValidator
try:
    anomalies = load_anomalies_binary(anomalies_file)
    tfdv.display_anomalies(anomalies)
except NameError:
    print('anomalies not defined')

Since we apply some transformations to the data before training a model it can be of interest to review that data too.

In [None]:
# Preview first transformed data
try:
    tf_data_files = [os.path.join(tf_examples_uri, name) for name in os.listdir(tf_examples_uri)]

    # Create a `TFRecordDataset` to read the file
    tf_dataset = tf.data.TFRecordDataset(tf_data_files, compression_type="GZIP")

    # Get records from the dataset
    tf_sample_records = get_records(dataset=tf_dataset, num_records=3)

    # Print records
    pp.pprint(tf_sample_records)
except NameError:
    print('tf_examples_uri not defined')

Let's review the statistics visualization of the transformed data.

In [None]:
# Load generated statistics from Transform
try:
    tf_stats = tfdv.load_stats_binary(tf_stats_file)
    tfdv.visualize_statistics(tf_stats)
except NameError:
    print('post-transform stats not defined')

Lastly, we check if there are any anomalies detected in the transformed eval dataset.

In [None]:
# Load data vaildation result from Transform
try:
    tf_anomalies = load_anomalies_binary(tf_anom_file)
    tfdv.display_anomalies(tf_anomalies)
except NameError:
    print('post-transform anomalies not defined')

# Wrap up

And we're done! You have now investigated the artifacts generated by the data related components of the pipeline.