# Requirements

In [None]:
!pip install tensorflow-data-validation==1.3.0

In [None]:
import tensorflow_data_validation as tfdv
from google.cloud import bigquery

<h1> Define constants

Modify project-specific constants bracketed below

In [None]:
# GCP project id
PROJECT_ID = '<project_id>'
# BQ dataset id
DATASET_ID = '<dataset_id>'
# dataset location 
DATA_LOCATION = '<location>' # e.g. "EU"
# source table name to extract sample from 
SOURCE_TABLE = '<training_input_table_id>' 
# sample table id suffix
SAMPLE_TABLE = SOURCE_TABLE + '<table_suffix>' # e.g. "_sample"
# full source table name
SOURCE_TABLE_WITH_PROJECT_ID = PROJECT_ID + '.' + DATASET_ID + '.' + SOURCE_TABLE
# full sample table names 
SAMPLE_TABLE_WITH_PROJECT_ID = PROJECT_ID + '.' + DATASET_ID + '.' + SAMPLE_TABLE
SAMPLE_TABLE_WO_PROJECT_ID = DATASET_ID + '.' + SAMPLE_TABLE

# GCS path to schema folder 
GCS_BASE_DIR = 'gs://<bucket>/<schema_folder>/'
# GCS table name
GCS_TABLE = '<sample_table.csv>'
# full GCS sample table path
GCS_SAMPLE_TABLE = GCS_BASE_DIR + GCS_TABLE
# sample table size
# should be defined such that the sample table provides adequate representation of distributions of numeric features
SAMPLE_SIZE = 1000 

# name of label column for serving 
LABEL_COLUMN_NAME = '<label_column_name>' # e.g. "total_fare"
# features to check for skew with corresponding threshold values
SKEW_THRESHOLD = {
  "<feature_1>": 0.01,
  "<feature_2>": 0.01
}
# the threshold values should be defined in 
# L infinity norm for categorical features
# jensen shannon divergence for numeric features

<h1> Create a sample of the preprocessed dataset </h1>

Extract a subset from the source table and write to a sample table in BigQuery

In [None]:
!bq --location=$DATA_LOCATION query \
--use_legacy_sql=false \
--destination_table=$SAMPLE_TABLE_WITH_PROJECT_ID \
--replace=true \
'CREATE OR REPLACE TABLE {SAMPLE_TABLE_WITH_PROJECT_ID} AS (SELECT * FROM `{SOURCE_TABLE_WITH_PROJECT_ID}` LIMIT {SAMPLE_SIZE})'

<h1>Extract the sample to GCS</h1>

Save the sample table to a cloud bucket as a csv file

In [None]:
!bq --location=$DATA_LOCATION extract \
$SAMPLE_TABLE_WO_PROJECT_ID \
$GCS_SAMPLE_TABLE

<h1>Generate statistics from sample</h1>

Generate statistics from the csv file and write to `sample_stats.pb` in the same bucket folder

In [None]:
stats = tfdv.generate_statistics_from_csv(
    data_location=GCS_SAMPLE_TABLE,
    output_path=GCS_BASE_DIR+'sample_stats.pb'
)

# Infer schema from sample statistics

In [None]:
schema = tfdv.infer_schema(stats)

The sample table should provide an adequate representation of the numeric features in terms of data schema. However, it would only contain a small portion of all possible domain values of categorical features due to its size. Therefore, the full domains of categorical features need to be generated from the original (preprocessed) dataset. The domain values in the schema then need to be cleared and re-populated with the new values. 

# Modify schema to inclue full string domain of categorical features

In [None]:
# loop over each categorical feature
for f in schema.string_domain:
    # take the schema of feature f to be modified
    # clear existing domain of f
    while len(f.value)>0:
        f.value.pop()
    
    # query full domain values from original data
    QUERY = (f'SELECT DISTINCT {f.name} FROM `{SOURCE_TABLE_WITH_PROJECT_ID}` WHERE {f.name} IS NOT NULL')
    query_job = bigquery.Client().query(QUERY)
    rows = query_job.result()
    
    # append full list of values to the cleared domain of f
    for row in rows:
        new_value = list(row.values())
        f.value.extend(new_value)

# Write schema to text

Write to `tfdv_schema_training.pbtxt` in the same bucker folder

In [None]:
tfdv.write_schema_text(
    schema=schema,
    output_path=GCS_BASE_DIR+'tfdv_schema_training.pbtxt'
)

<h1>Generate serving schema based on the training schema</h1>

Specify skew comparators for serving data validation

In [None]:
# get a list of names of categorical features
cat_feat = [f.name for f in schema.string_domain]

In [None]:
# add skew comparator for features in SKEW_THRESHOLD
for feature in SKEW_THRESHOLD:
    f = tfdv.get_feature(schema, feature)
    if feature in cat_feat:
        # use infinity_norm for categorical features
        f.skew_comparator.infinity_norm.threshold = SKEW_THRESHOLD[feature]
    else:
        # use jensen_shannon_divergence for numeric features
        f.skew_comparator.jensen_shannon_divergence.threshold = SKEW_THRESHOLD[feature]

Specify pipeline environments

In [None]:
# define TRAINING and SERVING environments
schema.default_environment.append('TRAINING')
schema.default_environment.append('SERVING')

# specify that the label column is not in SERVING environment
tfdv.get_feature(schema, LABEL_COLUMN_NAME).not_in_environment.append('SERVING')

Write the modified schema to `tfdv_schema_serving.pbtxt` in the same bucker folder

In [None]:
tfdv.write_schema_text(
    schema=schema,
    output_path=GCS_BASE_DIR+'tfdv_schema_serving.pbtxt'
)

<h1>Add schema to ../assets</h1>

Download `tfdv_schema_training.pbtxt` and `tfdv_schema_serving.pbtxt` to the `../assets` folder. Make sure the files are named correctly. 