# Data validation

This notebook briefly shows some ways and techniques for analysing artifacts from Generator components (e.g. ExampleGen, SchemaGen, etc.). All artifacts are fetched from the metadata storage.

In [1]:
# Import required libs
import glob
import os
import pprint
import pandas as pd
import numpy as np

import tensorflow as tf
import tensorflow_data_validation as tfdv
from tensorflow_data_validation.utils.anomalies_util import load_anomalies_binary
from tfx.orchestration import metadata
from tfx.types import standard_artifacts, standard_component_specs
from tfx.orchestration.experimental.interactive import visualizations, standard_visualizations

from pipeline.configs import PIPELINE_NAME

from utils.mlmd_helpers import get_latest_artifacts, visualize_artifacts_nb
from utils.tfx_helpers import get_records

pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", 100)

standard_visualizations.register_standard_visualizations()
pp = pprint.PrettyPrinter()

print(f'TF version: {tf.version.VERSION}')
print(f'TFDV version: {tfdv.version.__version__}')

TF version: 2.7.0
TFDV version: 1.5.0


## Metadata artifacts

In order to investigate generated components from the pipeline we need to fetch the desired artifacts.  

We start by fetching the artifacts (if generated) from `ExampleGen`, `StatisticsGen`, `SchemaGen`, `ExampleValidator`, and `Transformer`.

In [2]:
# Read artifact information from metadata store.

# Metadata store path
METADATA_PATH = os.path.abspath(os.path.join(os.getcwd(), '..',
                                             'outputs/tfx_metadata',
                                             PIPELINE_NAME,
                                             'metadata.db'))

# Data path
DATA_PATH = os.path.abspath(os.path.join(os.getcwd(), '..', 'data/klarna/small_train/data.csv'))

# Metadata store config
metadata_connection_config = metadata.sqlite_metadata_connection_config(
              METADATA_PATH)

with metadata.Metadata(metadata_connection_config) as store:
    # Load ExampleGen artifacts (generated before transform)
    try:
        example_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'CsvExampleGen')
        # example_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'FileBasedExampleGen')
    except AttributeError:
        print('CsvExampleGen not available')
    
    # Load StatisticsGen artifacts
    try:
        # stats_artifacts = store.get_artifacts_by_type(standard_artifacts.ExampleStatistics.TYPE_NAME)
        stats_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'StatisticsGen')
    except AttributeError:
        print('StatisticsGen not available')
    
    # Load SchemaGen artifacts
    try:
        # schema_artifacts = store.get_artifacts_by_type(standard_artifacts.Schema.TYPE_NAME)
        schema_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'SchemaGen')
    except AttributeError:
        print('SchemaGen not available')
    
    # Load ExampleValidator artifacts
    try:
        # anomalies_artifacts = store.get_artifacts_by_type(standard_artifacts.ExampleAnomalies.TYPE_NAME)
        anomalies_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'ExampleValidator')
    except AttributeError:
        print('ExampleValidator not available')

    # Load Transform artifacts
    try:
        transform_artifacts = get_latest_artifacts(store, PIPELINE_NAME, 'Transform')
    except AttributeError:
        print('Transform not available')

Next we need the URI's of the arifacts.

In [3]:
# Extract artifacts URI paths and execution IDs
try:
    example_path = os.path.abspath(os.path.join('..', example_artifacts['examples'][-1].uri))
    example_id = example_artifacts['examples'][-1].id
    train_uri = os.path.join(example_path, 'Split-train')
    print(f'Training data example URI: {train_uri}')
    print(f'Training data execution id: {example_id}')
except NameError:
    print('Examples not defined')

print('-' * 10)

try:
    stats_path = os.path.abspath(os.path.join('..', stats_artifacts['statistics'][-1].uri))
    stats_id = stats_artifacts['statistics'][-1].id
    train_stats_file = os.path.join(stats_path, 'Split-train', 'FeatureStats.pb')
    eval_stats_file = os.path.join(stats_path, 'Split-eval', 'FeatureStats.pb')
    print(f'Train stats file: {train_stats_file}, \nEval stats file: {eval_stats_file}')
    print(f'Statistics execution id: {stats_id}')
except NameError:
    print('Statistics not defined')

print('-' * 10)

try:
    schema_path = os.path.abspath(os.path.join('..', schema_artifacts['schema'][-1].uri))
    schema_id = schema_artifacts['schema'][-1].id
    schema_file = os.path.join(schema_path, 'schema.pbtxt')
    print(f'Generated schema file: {schema_file}')
    print(f'Schema execution id: {schema_id}')
except NameError:
    print('Schema not defined')

print('-' * 10)

try:
    anomalies_path = os.path.abspath(os.path.join('..', anomalies_artifacts['anomalies'][-1].uri))
    anomalies_id = anomalies_artifacts['anomalies'][-1].id
    anomalies_file = os.path.join(anomalies_path, 'Split-eval', 'SchemaDiff.pb')
    print(f'Generated anomalies file: {anomalies_file}')
    print(f'Anomalies execution id: {anomalies_id}')
except NameError:
    print('Anomalies not defined')

print('-' * 10)

try:
    tf_examples_path = os.path.abspath(os.path.join('..', transform_artifacts['transformed_examples'][-1].uri))
    tf_examples_id = transform_artifacts['transformed_examples'][-1].id
    tf_examples_uri = os.path.join(tf_examples_path, 'Split-train')
    
    tf_stats_path = os.path.abspath(os.path.join('..', transform_artifacts['post_transform_stats'][-1].uri))
    tf_stats_id = transform_artifacts['post_transform_stats'][-1].id
    tf_stats_file = os.path.join(tf_stats_path, 'FeatureStats.pb')

    tf_anom_path = os.path.abspath(os.path.join('..', transform_artifacts['post_transform_anomalies'][-1].uri))
    tf_anom_id = transform_artifacts['post_transform_anomalies'][-1].id
    tf_anom_file = os.path.join(tf_anom_path, 'SchemaDiff.pb')

    print(f'Transformed training data example URI: {tf_examples_uri}')
    print(f'Transformed training data execution id: {tf_examples_id}')
    print(f'Generated post-transform stats file: {tf_stats_file}')
    print(f'Transform stats execution id: {tf_stats_id}')
    print(f'Generated post-transform anomalies file: {tf_anom_file}')
    print(f'Transform anomalies execution id: {tf_anom_id}')
    
except NameError:
    print('Transform not defined')


Training data example URI: /Users/viktor.eriksson2/Documents/stash/private/tfx-example-pipeline/outputs/tfx_pipeline_output/credit_pipeline/CsvExampleGen/examples/1/Split-train
Training data execution id: 1
----------
Train stats file: /Users/viktor.eriksson2/Documents/stash/private/tfx-example-pipeline/outputs/tfx_pipeline_output/credit_pipeline/StatisticsGen/statistics/2/Split-train/FeatureStats.pb, 
Eval stats file: /Users/viktor.eriksson2/Documents/stash/private/tfx-example-pipeline/outputs/tfx_pipeline_output/credit_pipeline/StatisticsGen/statistics/2/Split-eval/FeatureStats.pb
Statistics execution id: 2
----------
Generated schema file: /Users/viktor.eriksson2/Documents/stash/private/tfx-example-pipeline/outputs/tfx_pipeline_output/credit_pipeline/SchemaGen/schema/3/schema.pbtxt
Schema execution id: 3
----------
Generated anomalies file: /Users/viktor.eriksson2/Documents/stash/private/tfx-example-pipeline/outputs/tfx_pipeline_output/credit_pipeline/ExampleValidator/anomalies/4/Sp

## Data investigation

Next steps is to visualize the data.  

We start by viewing the raw data.

In [4]:
# Preview the first few rows of the CSV file
!head -n 6 {DATA_PATH}

uuid,default,account_amount_added_12_24m,account_days_in_dc_12_24m,account_days_in_rem_12_24m,account_days_in_term_12_24m,account_incoming_debt_vs_paid_0_24m,account_status,account_worst_status_0_3m,account_worst_status_12_24m,account_worst_status_3_6m,account_worst_status_6_12m,age,avg_payment_span_0_12m,avg_payment_span_0_3m,merchant_category,merchant_group,has_paid,max_paid_inv_0_12m,max_paid_inv_0_24m,name_in_email,num_active_div_by_paid_inv_0_12m,num_active_inv,num_arch_dc_0_12m,num_arch_dc_12_24m,num_arch_ok_0_12m,num_arch_ok_12_24m,num_arch_rem_0_12m,num_arch_written_off_0_12m,num_arch_written_off_12_24m,num_unpaid_bills,status_last_archived_0_24m,status_2nd_last_archived_0_24m,status_3rd_last_archived_0_24m,status_max_archived_0_6_months,status_max_archived_0_12_months,status_max_archived_0_24_months,recovery_debt,sum_capital_paid_account_0_12m,sum_capital_paid_account_12_24m,sum_paid_inv_0_12m,time_hours,worst_status_active_inv
19689e3e-b3a1-4339-987b-ac1b76d4aee2,0,0,0.0,0.0,

In [5]:
# Preview first row(s) as TFRecordDataset
try:
    data_files = [os.path.join(train_uri, name) for name in os.listdir(train_uri)]

    # Create a `TFRecordDataset` to read the file
    dataset = tf.data.TFRecordDataset(data_files, compression_type="GZIP")

    # Get records from the dataset
    sample_records = get_records(dataset=dataset, num_records=1)

    # Print records
    pp.pprint(sample_records)
except NameError:
    print('train_uri not defined')

[{'features': {'feature': {'account_amount_added_12_24m': {'int64List': {'value': ['0']}},
                           'account_days_in_dc_12_24m': {'floatList': {'value': [0.0]}},
                           'account_days_in_rem_12_24m': {'floatList': {'value': [0.0]}},
                           'account_days_in_term_12_24m': {'floatList': {'value': [0.0]}},
                           'account_incoming_debt_vs_paid_0_24m': {'floatList': {'value': [0.2619351]}},
                           'account_status': {'floatList': {'value': [2.0]}},
                           'account_worst_status_0_3m': {'floatList': {'value': [2.0]}},
                           'account_worst_status_12_24m': {'floatList': {}},
                           'account_worst_status_3_6m': {'floatList': {}},
                           'account_worst_status_6_12m': {'floatList': {}},
                           'age': {'int64List': {'value': ['19']}},
                           'avg_payment_span_0_12m': {'floatList': {'va

2022-02-25 08:51:42.827149: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [6]:
# It can also be nice to see the data in a pandas dataframe
df = pd.read_csv(DATA_PATH)
print(df.head(10))
print('-' * 30)
print(f'Shape of data: {df.shape}')
print('-' * 30)
print(df.describe(include='all').T)
print('-' * 30)
print('Missing rate:\n', df.isna().sum() / df.shape[0])

                                   uuid  default  account_amount_added_12_24m  \
0  19689e3e-b3a1-4339-987b-ac1b76d4aee2        0                            0   
1  62deb776-0be1-4dc0-b6d5-d552de541cb6        0                        72461   
2  ec7e9b79-d06b-4f86-b32e-f2d1af7e5318        0                            0   
3  642bf413-4376-477f-b13a-10e44d4e807c        0                            0   
4  99a4212c-57dd-4de9-a761-91d5149f0ba6        0                            0   
5  096f3e3c-2dc8-4fff-b916-9004ab84b12a        0                        16549   
6  fac9f624-412c-48ba-8e42-52dcdfb1bd29        0                            0   
7  bcfa471d-a175-4cdd-8586-0122a95e53ca        0                            0   
8  47ef05df-7456-47f4-a2e2-0eacc34065cf        0                            0   
9  d7ece46b-9065-4005-a4cb-862cd84d659a        0                            0   

   account_days_in_dc_12_24m  account_days_in_rem_12_24m  \
0                        0.0                    

## Tensorflow Data Validation

We have loaded the statistics of both our train and evals sets. These will now be visualized and compared using the `tensorflow_data_validation` library.

In [7]:
# Load generated statistics from StatisticsGen
try:
    train_stats = tfdv.load_stats_binary(train_stats_file)
    eval_stats = tfdv.load_stats_binary(eval_stats_file)
    tfdv.visualize_statistics(lhs_statistics=eval_stats, rhs_statistics=train_stats,
                              lhs_name='EVAL_DATASET', rhs_name='TRAIN_DATASET')
except NameError:
    print('train_stats/eval_stats not defined')

## Tensorflow Schema

We review the schema created from the statistics.

> **Note**: that the schema is based on the training split.

In [8]:
# Load generated schema from SchemaGen
try:
    schema = tfdv.load_schema_text(schema_file)
    tfdv.display_schema(schema=schema)
except NameError:
    print('schema not defined')

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'has_paid',STRING,required,,'has_paid'
'merchant_category',STRING,required,,'merchant_category'
'merchant_group',STRING,required,,'merchant_group'
'name_in_email',STRING,required,,'name_in_email'
'uuid',BYTES,required,,-
'account_amount_added_12_24m',INT,required,,-
'account_days_in_dc_12_24m',FLOAT,required,,-
'account_days_in_rem_12_24m',FLOAT,required,,-
'account_days_in_term_12_24m',FLOAT,required,,-
'account_incoming_debt_vs_paid_0_24m',FLOAT,required,,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'has_paid',"'False', 'True'"
'merchant_category',"'Adult Shoes & Clothing', 'Automotive Parts & Accessories', 'Bags & Wallets', 'Body & Hair Care', 'Books & Magazines', 'Car electronics', 'Children Clothes & Nurturing products', 'Children toys', 'Cleaning & Sanitary', 'Collectibles', 'Concept stores & Miscellaneous', 'Cosmetics', 'Costumes & Party supplies', 'Dating services', 'Decoration & Art', 'Dietary supplements', 'Digital services', 'Diversified Health & Beauty products', 'Diversified Home & Garden products', 'Diversified Jewelry & Accessories', 'Diversified children products', 'Diversified electronics', 'Diversified entertainment', 'Diversified erotic material', 'Education', 'Electronic equipment & Related accessories', 'Erotic Clothing & Accessories', 'Event tickets', 'Food & Beverage', 'Fragrances', 'Furniture', 'General Shoes & Clothing', 'Hobby articles', 'Household electronics (whitegoods/appliances)', 'Jewelry & Watches', 'Kitchenware', 'Music & Movies', 'Musical Instruments & Equipment', 'Non', 'Office machines & Related accessories (excl. computers)', 'Personal care & Body improvement', 'Pet supplies', 'Pharmaceutical products', 'Plants & Flowers', 'Prescription optics', 'Prints & Photos', 'Safety products', 'Sex toys', 'Sports gear & Outdoor', 'Tobacco', 'Tools & Home improvement', 'Travel services', 'Underwear', 'Video Games & Related accessories', 'Wheels & Tires', 'Wine, Beer & Liquor', 'Youthful Shoes & Clothing'"
'merchant_group',"'Automotive Products', 'Children Products', 'Clothing & Shoes', 'Electronics', 'Entertainment', 'Erotic Materials', 'Food & Beverage', 'Health & Beauty', 'Home & Garden', 'Intangible products', 'Jewelry & Accessories', 'Leisure, Sport & Hobby'"
'name_in_email',"'F', 'F+L', 'F1+L', 'Initials', 'L', 'L1+F', 'Nick', 'no_match'"


## Tensorflow Example Validation

Next, we review if there are any anomalies detected in the `eval` dataset. The anomalies are calculated based on the generated statistics and schema from the `train` dataset.

In [None]:
# Load data vaildation result from ExampleValidator
try:
    anomalies = load_anomalies_binary(anomalies_file)
    tfdv.display_anomalies(anomalies)
except NameError:
    print('anomalies not defined')

## Tensorflow Transform

Since we apply some transformations to the data before training a model it can be of interest to review that data too.

In [None]:
# Preview first transformed data
try:
    tf_data_files = [os.path.join(tf_examples_uri, name) for name in os.listdir(tf_examples_uri)]

    # Create a `TFRecordDataset` to read the file
    tf_dataset = tf.data.TFRecordDataset(tf_data_files, compression_type="GZIP")

    # Get records from the dataset
    tf_sample_records = get_records(dataset=tf_dataset, num_records=1)

    # Print records
    pp.pprint(tf_sample_records)
except NameError:
    print('tf_examples_uri not defined')

Let's review the statistics visualization of the transformed data.

In [None]:
# Load generated statistics from Transform
try:
    tf_stats = tfdv.load_stats_binary(tf_stats_file)
    tfdv.visualize_statistics(tf_stats)
except NameError:
    print('post-transform stats not defined')

Lastly, we check if there are any anomalies detected in the transformed eval dataset.

In [None]:
# Load data vaildation result from Transform
try:
    tf_anomalies = load_anomalies_binary(tf_anom_file)
    tfdv.display_anomalies(tf_anomalies)
except NameError:
    print('post-transform anomalies not defined')

## Optional - Working on imbalanced data

Let's review some approaches before training our model on heavily imbalanced data

In [None]:
# Import model and hparams
from models.keras_model_baseline.model import _make_model, _get_hyperparameters
from models import features

In [None]:
# Review default hparams
hparams = _get_hyperparameters()
print(f"Learning rate: {hparams.get('learning_rate')}")
print(f"Hidden layer units: {hparams.get('units1')}")
print(f"Drop out layer: {hparams.get('drop_out1')}")

In [None]:
# Feed model with default hparams
model = _make_model(hparams)

In [None]:
# Get summary
model.summary()

In [None]:
# Get layers
model.layers

In [None]:
# Get weights
model.get_weights()

In [None]:
# From the data validation above we can extract the class imbalance
# in our training set
initial_bias = np.log((1 - 0.9858) / 0.9858)
initial_bias

In [None]:
# Check the class weights
pos = sum(df['default'] == 1)
neg = sum(df['default'] == 0)
total = df.shape[0]

d = {0: (1 / neg) * (total / 2.0), 1: (1 / pos) * (total / 2.0)}

print(f'Class weights: \n{d}')

In [None]:
# Set that as the initial bias, and the model will give much more
# reasonable initial guesses
init_bias_model = _make_model(hparams=hparams, output_bias=initial_bias)

In [None]:
# Review new weights
init_bias_model.get_weights()

### Checkpoint the initial weights
To make the various training runs more comparable, keep this initial model's weights in a checkpoint file, and load them into each model before training:

In [None]:
# Save path
initial_weights_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'config/initial_weights/initial_weights'))

# Save weights
init_bias_model.save_weights(initial_weights_path, save_format='tf')

# Wrap up

And we're done! You have now investigated the artifacts generated by the data related components of the pipeline.