# SchemaGen Test run

## Set up

TFX requires apache-airflow and docker SDK.

In [1]:
!pip install 'apache-airflow[gcp]' docker tfx





You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In this notebook, we use TFX version 0.13.0

In [2]:
import tfx
tfx.version.__version__

'0.13.0'

TFX requires TensorFlow >= 1.13.1

In [3]:
import tensorflow as tf
tf.enable_eager_execution()
tf.__version__

'1.13.1'

TFX supports Python 3.5 from version 0.13.0

In [4]:
import sys
sys.version

'3.5.2 (default, Nov 12 2018, 13:43:14) \n[GCC 5.4.0 20160609]'

## Download sample data

In [5]:
%%bash
# This enables you to run this notebook twice.
# There should not be train/eval files at ~/taxi/data, since TFX can handle only single file with version 0.13.0
if [ -e ~/taxi/data ]; then
    rm -rf ~/taxi/data
fi

# download taxi data
mkdir -p ~/taxi/data/simple
mkdir -p ~/taxi/serving_model/taxi_simple
wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv -O ~/taxi/data/simple/data.csv

# download 
wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py -O ~/taxi/taxi_utils.py

--2019-06-13 08:51:50--  https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.108.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1922668 (1.8M) [text/plain]
Saving to: ‘/root/taxi/data/simple/data.csv’

     0K .......... .......... .......... .......... ..........  2% 3.02M 1s
    50K .......... .......... .......... .......... ..........  5% 10.4M 0s
   100K .......... .......... .......... .......... ..........  7% 6.14M 0s
   150K .......... .......... .......... .......... .......... 10% 8.94M 0s
   200K .......... .......... .......... .......... .......... 13% 6.61M 0s
   250K .......... .......... .......... .......... .......... 15% 8.74M 0s
   300K .......... .......... .......... .......... .......... 18% 14.5M 0s
   350K ........

## Import

In [6]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import datetime
import logging
import os
from google.protobuf import json_format

from tfx.components.base.base_component import ComponentOutputs
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
from tfx.orchestration.pipeline import Pipeline
from tfx.orchestration.tfx_runner import TfxRunner
from tfx.proto import evaluator_pb2
from tfx.proto import example_gen_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.utils.dsl_utils import csv_input
from tfx.utils.channel import Channel
from tfx.utils import types

  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '


## configs

In [7]:
# This example assumes that the taxi data is stored in ~/taxi/data and the
# taxi utility function is in ~/taxi.  Feel free to customize this as needed.
_taxi_root = os.path.join(os.environ['HOME'], 'taxi')
_data_root = os.path.join(_taxi_root, 'data/simple')
# Python module file to inject customized logic into the TFX components. The
# Transform and Trainer both require user-defined functions to run successfully.
_taxi_module_file = os.path.join(_taxi_root, 'taxi_utils.py')
# Path which can be listened to by the model server.  Pusher will output the
# trained model here.
_serving_model_dir = os.path.join(_taxi_root, 'serving_model/taxi_simple')

# Directory and data locations.  This example assumes all of the chicago taxi
# example code and metadata library is relative to $HOME, but you can store
# these files anywhere on your local filesystem.
_tfx_root = os.path.join(os.environ['HOME'], 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines')
_metadata_db_root = os.path.join(_tfx_root, 'metadata')
_log_root = os.path.join(_tfx_root, 'logs')

# Airflow-specific configs; these will be passed directly to airflow
_airflow_config = {
    'schedule_interval': None,
    'start_date': datetime.datetime(2019, 1, 1),
}

# Logging overrides
logger_overrides = {'log_root': _log_root, 'log_level': logging.INFO}

## Create ExampleGen

In [8]:
"""Implements the chicago taxi pipeline with TFX."""
examples = csv_input(_data_root)

# Brings data into the pipeline or otherwise joins/converts training data.
train_config = example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=2)
eval_config = example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
output_config = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
        train_config,
        eval_config
    ]))

# Create outputs
train_examples = types.TfxType(type_name='ExamplesPath', split='train')
train_examples.uri = os.path.join(_data_root, 'train/')

eval_examples = types.TfxType(type_name='ExamplesPath', split='eval')
eval_examples.uri = os.path.join(_data_root, 'eval/')

output_dict = {'examples': Channel(
    type_name='ExamplesPath',
    static_artifact_collection=[train_examples, eval_examples])}

example_outputs = ComponentOutputs(output_dict)

example_gen = CsvExampleGen(
    input_base=examples, # A Channel of 'ExternalPath' type, it contains path of data source.
    output_config=output_config,  # An example_gen_pb2.Output instance, it contains train-eval split ratio.
    outputs=example_outputs # dict from name to output channel, it will be stored example_gen.outputs
)

## Create StatisticsGen

In [9]:
# Create outputs
train_statistics = types.TfxType(type_name='ExampleStatisticsPath', split='train')
train_statistics.uri = os.path.join(_data_root, 'train/stats/')

eval_statistics = types.TfxType(type_name='ExampleStatisticsPath', split='eval')
eval_statistics.uri = os.path.join(_data_root, 'eval/stats/')

output_dict = {'output': Channel(
    type_name='ExampleStatisticsPath',
    static_artifact_collection=[train_statistics, eval_statistics])}

statistics_outputs = ComponentOutputs(output_dict)

statistics_gen = StatisticsGen(
    input_data=example_gen.outputs.examples, # A Channel of 'ExamplesPath' type, it is equal to example_outputs
    name='Statistics Generator', # Optional, name should be unique if you are going to use multiple StatisticsGen in same pipeline.
    outputs=statistics_outputs # dict from name to output channel, it will be stored statistics_gen.outputs
)

## Create SchemaGen

In [10]:
# Create outputs
train_schema_path = types.TfxType(type_name='SchemaPath', split='train')
train_schema_path.uri = os.path.join(_data_root, 'train/schema/')

schema_outputs = ComponentOutputs({
    'output':Channel(
        type_name='SchemaPath',
        # SchemaGen.executor can handle just one SchemaPath.
        # Two or more SchemaPaths will cause ValueError
        # such as "ValueError: expected list length of one but got 2".
        static_artifact_collection=[train_schema_path] 
    )
})

schema_gen = SchemaGen(
    stats=statistics_gen.outputs.output, # A Channel of 'ExampleStatisticsPath' type, it is equal to statistics_outputs
    name='Schema Generator',  # Optional, name should be unique if you are going to use multiple StatisticsGen in same pipeline.
    outputs=schema_outputs # dict from name to output channel, it will be stored schema_gen.outputs
)

## Create pipeline

In [11]:
pipeline = Pipeline(
    pipeline_name="TFX Pipeline",
    pipeline_root=_pipeline_root,
    components=[example_gen, statistics_gen, schema_gen]
)

## Execute

In [12]:
class DirectRunner(TfxRunner):
    """Tfx runner on local"""
    
    def __init__(self, config=None):
        self._config = config or {}
    
    def run(self, pipeline):
        for component in pipeline.components:
            self._execute_component(component)
            
        return pipeline
            
    def _execute_component(self, component):
        input_dict = {key:value.get() for key, value in component.input_dict.items()}
        output_dict = {key: value.get() for key, value in component.outputs.get_all().items()}
        exec_properties = component.exec_properties
        executor = component.executor()
        executor.Do(input_dict, output_dict, exec_properties)

In [13]:
pipeline = DirectRunner().run(pipeline)

INFO:tensorflow:Starting Executor execution.
[2019-06-13 08:51:55,399] {base_executor.py:72} INFO - Starting Executor execution.
INFO:tensorflow:Inputs for Executor is: {"input-base": [{"artifact": {"uri": "/root/taxi/data/simple", "properties": {"type_name": {"stringValue": "ExternalPath"}, "split": {"stringValue": ""}}}, "artifact_type": {"name": "ExternalPath", "properties": {"state": "STRING", "span": "INT", "name": "STRING", "type_name": "STRING", "split": "STRING"}}}]}
[2019-06-13 08:51:55,409] {base_executor.py:74} INFO - Inputs for Executor is: {"input-base": [{"artifact": {"uri": "/root/taxi/data/simple", "properties": {"type_name": {"stringValue": "ExternalPath"}, "split": {"stringValue": ""}}}, "artifact_type": {"name": "ExternalPath", "properties": {"state": "STRING", "span": "INT", "name": "STRING", "type_name": "STRING", "split": "STRING"}}}]}
INFO:tensorflow:Outputs for Executor is: {"examples": [{"artifact": {"uri": "/root/taxi/data/simple/train/", "properties": {"type_

[2019-06-13 08:52:03,510] {fn_api_runner.py:437} INFO - Running (ref_PCollection_PCollection_24/Read)+((ref_AppliedPTransform_OutputSplittrain/Write/WriteImpl/PreFinalize_50)+(ref_PCollection_PCollection_33/Write))
[2019-06-13 08:52:03,527] {fn_api_runner.py:437} INFO - Running (ref_PCollection_PCollection_24/Read)+(ref_AppliedPTransform_OutputSplittrain/Write/WriteImpl/FinalizeWrite_51)
[2019-06-13 08:52:03,538] {filebasedsink.py:290} INFO - Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
[2019-06-13 08:52:03,644] {filebasedsink.py:327} INFO - Renamed 1 shards in 0.10 seconds.
[2019-06-13 08:52:03,667] {fn_api_runner.py:437} INFO - Running ((ref_AppliedPTransform_OutputSpliteval/Write/WriteImpl/DoOnce/Read_66)+((ref_AppliedPTransform_OutputSpliteval/Write/WriteImpl/InitializeWrite_67)+(ref_PCollection_PCollection_43/Write)))+(ref_PCollection_PCollection_42/Write)
[2019-06-13 08:52:03,684] {fn_api_runner.py:437} INFO - Running ((ShuffleSplite

[2019-06-13 08:52:12,181] {fn_api_runner.py:437} INFO - Running (((GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_CountSlicedFeatureNameValueTuple/CombinePerKey(CountCombineFn)/Group/Read)+((GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_CountSlicedFeatureNameValueTuple/CombinePerKey(CountCombineFn)/Merge)+(((GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_CountSlicedFeatureNameValueTuple/CombinePerKey(CountCombineFn)/ExtractOutputs)+(((ref_AppliedPTransform_GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_ModifyKeyToSlicedFeatureName_43)+(GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopK_GetTopK/CombinePerKey(TopCombineFn)/Precombine))+(ref_AppliedPTransform_GenerateStatistics.eval/RunSta

[2019-06-13 08:52:18,567] {fn_api_runner.py:437} INFO - Running ((((GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_CountSlicedFeatureNameValueTuple/CombinePerKey(CountCombineFn)/Group/Read)+(GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_CountSlicedFeatureNameValueTuple/CombinePerKey(CountCombineFn)/Merge))+((((GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_CountSlicedFeatureNameValueTuple/CombinePerKey(CountCombineFn)/ExtractOutputs)+(ref_AppliedPTransform_GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_ModifyKeyToSlicedFeatureName_146))+((GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopK_GetTopK/CombinePerKey(TopCombineFn)/Precombine)+(GenerateStatistics.train/RunStatsGenerators/G

[2019-06-13 08:52:19,691] {fn_api_runner.py:437} INFO - Running ((ref_AppliedPTransform_GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/ToList/ToList/DoOnce/Read_87)+((ref_AppliedPTransform_GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/ToList/ToList/InjectDefault_88)+(ref_AppliedPTransform_GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/MakeDatasetFeatureStatisticsListProto_89)))+((ref_AppliedPTransform_WriteStatsOutput.eval/Write/WriteImpl/Map(<lambda at iobase.py:984>)_96)+((ref_AppliedPTransform_WriteStatsOutput.eval/Write/WriteImpl/WindowInto(WindowIntoFn)_97)+(WriteStatsOutput.eval/Write/WriteImpl/GroupByKey/Write)))
[2019-06-13 08:52:19,726] {fn_api_runner.py:437} INFO - Running (((ref_AppliedPTransform_WriteStatsOutput.eval/Write/WriteImpl/DoOnce/Read_94)+(ref_AppliedPTransform_WriteStatsOutput.eval/Write/WriteImpl/InitializeWrite_95))+(ref_PCollection_PCollection_55/Write))+(ref_PCollection_PCollection_56

## Check Result

In [14]:
!ls -Rlhs /root/taxi/data/simple/

/root/taxi/data/simple/:
total 1.9M
1.9M -rw-r--r-- 1 root root 1.9M Jun 13 08:51 data.csv
4.0K drwxr-xr-x 3 root root 4.0K Jun 13 08:52 eval
4.0K drwxr-xr-x 4 root root 4.0K Jun 13 08:52 train

/root/taxi/data/simple/eval:
total 208K
204K -rw-r--r-- 1 root root 201K Jun 13 08:52 data_tfrecord-00000-of-00001.gz
4.0K drwxr-xr-x 2 root root 4.0K Jun 13 08:52 stats

/root/taxi/data/simple/eval/stats:
total 20K
20K -rw-r--r-- 1 root root 17K Jun 13 08:52 stats_tfrecord

/root/taxi/data/simple/train:
total 416K
408K -rw-r--r-- 1 root root 406K Jun 13 08:52 data_tfrecord-00000-of-00001.gz
4.0K drwxr-xr-x 2 root root 4.0K Jun 13 08:52 schema
4.0K drwxr-xr-x 2 root root 4.0K Jun 13 08:52 stats

/root/taxi/data/simple/train/schema:
total 8.0K
8.0K -rw-r--r-- 1 root root 4.5K Jun 13 08:52 schema.pbtxt

/root/taxi/data/simple/train/stats:
total 20K
20K -rw-r--r-- 1 root root 18K Jun 13 08:52 stats_tfrecord


In [26]:
!cat /root/taxi/data/simple/train/schema/schema.pbtxt

feature {
  name: "trip_start_hour"
  value_count {
    min: 1
    max: 1
  }
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "payment_type"
  value_count {
    min: 1
    max: 1
  }
  type: BYTES
  domain: "payment_type"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "trip_seconds"
  value_count {
    min: 1
    max: 1
  }
  type: FLOAT
  presence {
    min_count: 1
  }
}
feature {
  name: "pickup_longitude"
  value_count {
    min: 1
    max: 1
  }
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "pickup_census_tract"
  type: BYTES
  presence {
    min_count: 0
  }
}
feature {
  name: "trip_miles"
  value_count {
    min: 1
    max: 1
  }
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "trip_start_day"
  value_count {
    min: 1
    max: 1
  }
  type: INT
  pres

`schema.pbtxt` is only simple text file, we are going to;

1. get the path of `schema.pbtxt`from `schema_gen`
2. parse `schema.pbtxt` into schema (protobuf)
3. visualize schema with [tfdv](https://www.tensorflow.org/tfx/data_validation/get_started)


In [24]:
# 1. get the path of `schema.pbtxt`
def get_schema_directory(schema_gen):
    output_dict = {key: value.get() for key, value in schema_gen.outputs.get_all().items()}
    input_dict = {key:value.get() for key, value in schema_gen.input_dict.items()}
    split_to_instance = {x.split: x for x in input_dict['stats']}
    directory = types.get_split_uri(output_dict['output'], 'train')
    return directory

schema_directory = get_schema_directory(schema_gen)

In [43]:
# 2. parse schema.pbtxt

from pathlib import Path
from google.protobuf.text_format import Parse
from google.protobuf.message import Message
from tensorflow_metadata.proto.v0 import schema_pb2

def parse_schema_proto_string(schema_directory):
    path = Path(schema_directory)
    for filepath in path.glob('*'):
        with open(str(filepath), 'r') as file: # since we are using python 3.5, not 3.6+
            schema_proto_string = file.read()
            schema = schema_pb2.Schema()
            text_format.Parse(schema_proto_string, schema)
            return schema

schema = parse_schema_proto_string(schema_directory)

In [44]:
# 3. visualize schema with tfdv

import tensorflow_data_validation as tfdv
tfdv.display_schema(schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'trip_start_hour',INT,required,single,-
'payment_type',STRING,required,single,'payment_type'
'trip_seconds',FLOAT,optional,single,-
'pickup_longitude',FLOAT,required,single,-
'pickup_census_tract',BYTES,optional,,-
'trip_miles',FLOAT,required,single,-
'trip_start_day',INT,required,single,-
'trip_start_timestamp',INT,required,single,-
'trip_start_month',INT,required,single,-
'tips',FLOAT,required,single,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'payment_type',"'Cash', 'Credit Card', 'Dispute', 'No Charge', 'Pcard', 'Prcard', 'Unknown'"
'company',"'0118 - 42111 Godfrey S.Awir', '0694 - 59280 Chinesco Trans Inc', '2092 - 61288 Sbeih company', '2192 - 73487 Zeymane Corp', '2733 - 74600 Benny Jona', '2823 - 73307 Seung Lee', '3011 - 66308 JBL Cab Inc.', '3094 - 24059 G.L.B. Cab Co', '3152 - 97284 Crystal Abernathy', '3201 - C&D Cab Co Inc', '3201 - CID Cab Co Inc', '3253 - 91138 Gaither Cab Co.', '3319 - CD Cab Co', '3385 - 23210 Eman Cab', '3385 - Eman Cab', '3623 - 72222 Arrington Enterprises', '3897 - Ilie Malec', '4053 - 40193 Adwar H. Nikola', '4053 - Adwar H. Nikola', '4197 - 41842 Royal Star', '4197 - Royal Star', '4615 - 83503 Tyrone Henderson', '4623 - Jay Kim', '5006 - 39261 Salifu Bawa', '5006 - Salifu Bawa', '5074 - 54002 Ahzmi Inc', '5074 - Ahzmi Inc', '5129 - 87128', '5129 - 98755 Mengisti Taxi', '5129 - Mengisti Taxi', '5724 - KYVI Cab Inc', '585 - 88805 Valley Cab Co', '5864 - 73614 Thomas Owusu', '5864 - Thomas Owusu', '5874 - 73628 Sergey Cab Corp.', '5997 - 65283 AW Services Inc.', '6488 - 83287 Zuha Taxi', '6742 - 83735 Tasha ride inc', 'Blue Ribbon Taxi Association Inc.', 'C & D Cab Co Inc', 'Chicago Elite Cab Corp.', 'Chicago Elite Cab Corp. (Chicago Carriag', 'Chicago Medallion Leasing INC', 'Chicago Medallion Management', 'Choice Taxi Association', 'Dispatch Taxi Affiliation', 'KOAM Taxi Association', 'Northwest Management LLC', 'Taxi Affiliation Services', 'Top Cab Affiliation'"
