# Trainer Test Run

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/AseiSugiyama/TFXTestRun/blob/master/notebooks/TrainerTestRun.ipynb)

## Set up

TFX requires apache-airflow and docker SDK.

In [None]:
!pip install 'apache-airflow[gcp]' docker tfx

You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In this notebook, we use TFX version 0.13.0

In [None]:
import tfx
tfx.version.__version__

'0.13.0'

TFX requires TensorFlow >= 1.13.1

In [None]:
import tensorflow as tf
tf.enable_eager_execution()
tf.__version__

'1.13.1'

TFX supports Python 3.5 from version 0.13.0

In [None]:
import sys
sys.version

'3.5.2 (default, Nov 12 2018, 13:43:14) \n[GCC 5.4.0 20160609]'

## Download sample data

In [None]:
%%bash
# This enables you to run this notebook twice.
# There should not be train/eval files at ~/taxi/data, since TFX can handle only single file with version 0.13.0
if [ -e ~/taxi/data ]; then
    rm -rf ~/taxi/data
fi

# download taxi data
mkdir -p ~/taxi/data/simple
mkdir -p ~/taxi/serving_model/taxi_simple
wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv -O ~/taxi/data/simple/data.csv

# download 
wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/taxi_utils.py -O ~/taxi/taxi_utils.py

--2019-06-14 05:46:29--  https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.108.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1922668 (1.8M) [text/plain]
Saving to: ‘/root/taxi/data/simple/data.csv’

     0K .......... .......... .......... .......... ..........  2% 3.74M 0s
    50K .......... .......... .......... .......... ..........  5% 3.71M 0s
   100K .......... .......... .......... .......... ..........  7% 5.30M 0s
   150K .......... .......... .......... .......... .......... 10% 4.59M 0s
   200K .......... .......... .......... .......... .......... 13% 3.41M 0s
   250K .......... .......... .......... .......... .......... 15% 6.46M 0s
   300K .......... .......... .......... .......... .......... 18% 7.98M 0s
   350K ........

## Import

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import datetime
import logging
import os
from google.protobuf import json_format

from tfx.components.base.base_component import ComponentOutputs
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner
from tfx.orchestration.pipeline import Pipeline
from tfx.orchestration.tfx_runner import TfxRunner
from tfx.proto import evaluator_pb2
from tfx.proto import example_gen_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.utils.dsl_utils import csv_input
from tfx.utils.channel import Channel
from tfx.utils import types

  'Running the Apache Beam SDK on Python 3 is not yet fully supported. '


## configs

In [None]:
# This example assumes that the taxi data is stored in ~/taxi/data and the
# taxi utility function is in ~/taxi.  Feel free to customize this as needed.
_taxi_root = os.path.join(os.environ['HOME'], 'taxi')
_data_root = os.path.join(_taxi_root, 'data/simple')
# Python module file to inject customized logic into the TFX components. The
# Transform and Trainer both require user-defined functions to run successfully.
_taxi_module_file = os.path.join(_taxi_root, 'taxi_utils.py')

# Path which can be listened to by the model server.  Pusher will output the
# trained model here.
_serving_model_dir = os.path.join(_taxi_root, 'serving_model/taxi_simple')

# Directory and data locations.  This example assumes all of the chicago taxi
# example code and metadata library is relative to $HOME, but you can store
# these files anywhere on your local filesystem.
_tfx_root = os.path.join(os.environ['HOME'], 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines')
_metadata_db_root = os.path.join(_tfx_root, 'metadata')
_log_root = os.path.join(_tfx_root, 'logs')

# Airflow-specific configs; these will be passed directly to airflow
_airflow_config = {
    'schedule_interval': None,
    'start_date': datetime.datetime(2019, 1, 1),
}

# Logging overrides
logger_overrides = {'log_root': _log_root, 'log_level': logging.INFO}

## Create ExampleGen

In [None]:
"""Implements the chicago taxi pipeline with TFX."""
examples = csv_input(_data_root)

# Brings data into the pipeline or otherwise joins/converts training data.
train_config = example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=2)
eval_config = example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
output_config = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
        train_config,
        eval_config
    ]))

# Create outputs
train_examples = types.TfxType(type_name='ExamplesPath', split='train')
train_examples.uri = os.path.join(_data_root, 'csv_example_gen/train/')

eval_examples = types.TfxType(type_name='ExamplesPath', split='eval')
eval_examples.uri = os.path.join(_data_root, 'csv_example_gen/eval/')

example_outputs = ComponentOutputs({
    'examples': Channel(
        type_name='ExamplesPath',
        static_artifact_collection=[train_examples, eval_examples]
    ),
    'training_examples': Channel(
        type_name='ExamplesPath',
        static_artifact_collection=[train_examples]
    ),
    'eval_examples': Channel(
        type_name='ExamplesPath',
        static_artifact_collection=[eval_examples]
    ),    
})

example_gen = CsvExampleGen(
    input_base=examples, # A Channel of 'ExternalPath' type, it contains path of data source.
    output_config=output_config,  # An example_gen_pb2.Output instance, it contains train-eval split ratio.
    outputs=example_outputs # dict from name to output channel, it will be stored example_gen.outputs
)

## Create StatisticsGen

In [None]:
# Create outputs
train_statistics = types.TfxType(type_name='ExampleStatisticsPath', split='train')
train_statistics.uri = os.path.join(_data_root, 'statistics_gen/train/')

eval_statistics = types.TfxType(type_name='ExampleStatisticsPath', split='eval')
eval_statistics.uri = os.path.join(_data_root, 'statistics_gen/eval/')

statistics_outputs = ComponentOutputs({
    'output': Channel(
        type_name='ExampleStatisticsPath',
        static_artifact_collection=[train_statistics, eval_statistics]
    )
})

statistics_gen = StatisticsGen(
    input_data=example_gen.outputs.examples, # A Channel of 'ExamplesPath' type, it is equal to example_outputs
    name='Statistics Generator', # Optional, name should be unique if you are going to use multiple StatisticsGen in same pipeline.
    outputs=statistics_outputs # dict from name to output channel, it will be stored statistics_gen.outputs
)

## Create SchemaGen

In [None]:
# Create outputs
train_schema_path = types.TfxType(type_name='SchemaPath', split='train')
train_schema_path.uri = os.path.join(_data_root, 'schema_gen/')

# NOTE: SchemaGen.executor can handle JUST ONE SchemaPath.
# Two or more SchemaPaths will cause ValueError
# such as "ValueError: expected list length of one but got 2".
schema_outputs = ComponentOutputs({
    'output':Channel(
        type_name='SchemaPath',
        static_artifact_collection=[train_schema_path] 
    )
})

infer_schema = SchemaGen(
    stats=statistics_gen.outputs.output, # A Channel of 'ExampleStatisticsPath' type, it is equal to statistics_outputs
    name='Schema Generator',  # Optional, name should be unique if you are going to use multiple StatisticsGen in same pipeline.
    outputs=schema_outputs # dict from name to output channel, it will be stored schema_gen.outputs
)

## Create Transform

In [None]:
train_examples = types.TfxType(type_name='ExamplesPath', split='train')
train_examples.uri = os.path.join(_data_root,
                                  'transform/transformed_examples/train/')
eval_examples = types.TfxType(type_name='ExamplesPath', split='eval')
eval_examples.uri = os.path.join(_data_root,
                                 'transform/transformed_examples/eval/')
transform_output = types.TfxType(type_name='TransformPath')
transform_output.uri = os.path.join(_data_root,
                                    'transform/transform_output/')

transform_outputs = ComponentOutputs({
    # Output of 'tf.Transform', which includes an exported 
    # Tensorflow graph suitable for both training and serving
    'transform_output':Channel(
        type_name='TransformPath',
        static_artifact_collection=[transform_output]
    ),
    # transformed_examples: Materialized transformed examples, which includes 
    # both 'train' and 'eval' splits.
    'transformed_examples':Channel(
        type_name='ExamplesPath',
        static_artifact_collection=[train_examples, eval_examples]
    )
})

transform = Transform(
    input_data=example_gen.outputs.examples,
    schema=infer_schema.outputs.output,
    module_file=_taxi_module_file,
    outputs=transform_outputs
)

## Create Trainer

In [None]:
model_exports = types.TfxType(type_name='ModelExportPath')
model_exports.uri = os.path.join(_data_root, 'trainer/current/')

trainer_outputs = ComponentOutputs({
    'output':Channel(
        type_name='ModelExportPath',
        static_artifact_collection=[model_exports]
    )
})

trainer = Trainer(
    module_file=_taxi_module_file,
    transformed_examples=transform.outputs.transformed_examples,
    schema=infer_schema.outputs.output,
    transform_output=transform.outputs.transform_output,
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000),
    outputs=trainer_outputs
)

## Create pipeline

In [None]:
pipeline = Pipeline(
    pipeline_name="TFX Pipeline",
    pipeline_root=_pipeline_root,
    components=[example_gen, statistics_gen, infer_schema, transform, trainer]
)

## Execute

In [None]:
class DirectRunner(TfxRunner):
    """Tfx runner on local"""
    
    def __init__(self, config=None):
        self._config = config or {}
    
    def run(self, pipeline):
        for component in pipeline.components:
            self._execute_component(component)
            
        return pipeline
            
    def _execute_component(self, component):
        input_dict = {key:value.get() for key, value in component.input_dict.items()}
        output_dict = {key: value.get() for key, value in component.outputs.get_all().items()}
        exec_properties = component.exec_properties
        executor = component.executor()
        executor.Do(input_dict, output_dict, exec_properties)

In [None]:
pipeline = DirectRunner().run(pipeline)

INFO:tensorflow:Starting Executor execution.
[2019-06-14 06:46:15,380] {base_executor.py:72} INFO - Starting Executor execution.
INFO:tensorflow:Inputs for Executor is: {"input-base": [{"artifact": {"uri": "/root/taxi/data/simple", "properties": {"split": {"stringValue": ""}, "type_name": {"stringValue": "ExternalPath"}}}, "artifact_type": {"name": "ExternalPath", "properties": {"span": "INT", "state": "STRING", "split": "STRING", "name": "STRING", "type_name": "STRING"}}}]}
[2019-06-14 06:46:15,388] {base_executor.py:74} INFO - Inputs for Executor is: {"input-base": [{"artifact": {"uri": "/root/taxi/data/simple", "properties": {"split": {"stringValue": ""}, "type_name": {"stringValue": "ExternalPath"}}}, "artifact_type": {"name": "ExternalPath", "properties": {"span": "INT", "state": "STRING", "split": "STRING", "name": "STRING", "type_name": "STRING"}}}]}
INFO:tensorflow:Outputs for Executor is: {"examples": [{"artifact": {"uri": "/root/taxi/data/simple/csv_example_gen/train/", "prop

[2019-06-14 06:46:23,352] {fn_api_runner.py:437} INFO - Running (ShuffleSplittrain/ReshufflePerKey/GroupByKey/Read)+(((ref_AppliedPTransform_ShuffleSplittrain/ReshufflePerKey/FlatMap(restore_timestamps)_34)+(ref_AppliedPTransform_ShuffleSplittrain/RemoveRandomKeys_35))+(((ref_AppliedPTransform_OutputSplittrain/Write/WriteImpl/WriteBundles_42)+((ref_AppliedPTransform_OutputSplittrain/Write/WriteImpl/Pair_43)+(ref_AppliedPTransform_OutputSplittrain/Write/WriteImpl/WindowInto(WindowIntoFn)_44)))+(OutputSplittrain/Write/WriteImpl/GroupByKey/Write)))
[2019-06-14 06:46:23,949] {fn_api_runner.py:437} INFO - Running ((OutputSplittrain/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_OutputSplittrain/Write/WriteImpl/Extract_49))+(ref_PCollection_PCollection_32/Write)
[2019-06-14 06:46:23,963] {fn_api_runner.py:437} INFO - Running (ref_PCollection_PCollection_24/Read)+((ref_AppliedPTransform_OutputSplittrain/Write/WriteImpl/PreFinalize_50)+(ref_PCollection_PCollection_33/Write))
[2019-06-

[2019-06-14 06:46:29,679] {fn_api_runner.py:437} INFO - Running (((((ref_AppliedPTransform_ReadData.train/Read_106)+(ref_AppliedPTransform_DecodeData.train/ParseTFExamples_108))+(ref_AppliedPTransform_GenerateStatistics.train/RunStatsGenerators/KeyWithVoid_111))+(((ref_AppliedPTransform_GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/ParDo(SplitHotCold)/ParDo(SplitHotCold)_115)+(ref_AppliedPTransform_GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/WindowIntoDiscarding_116))+((GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/CombinePerKey(PreCombineFn)/Precombine)+(GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/CombinePerKey(PreCombineFn)/Group/Write))))+(((ref_AppliedPTransform_GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/TopKUniques_ConvertInputToFeatureV

[2019-06-14 06:46:40,184] {fn_api_runner.py:437} INFO - Running (GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/Flatten/Read)+((GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/CombinePerKey(PostCombineFn)/Precombine)+(GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/CombinePerKey(PostCombineFn)/Group/Write))
[2019-06-14 06:46:40,393] {fn_api_runner.py:437} INFO - Running ((((GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/CombinePerKey(PostCombineFn)/Group/Read)+(GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/CombinePerKey(PostCombineFn)/Merge))+(GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/BasicStatsGenerator/CombinePerKey(PostCombineFn)/ExtractOutputs))+(GenerateStatistics.eval/RunStatsGenerators/GenerateSlicedStatisticsImpl/FlattenFeatureS

[2019-06-14 06:46:42,315] {fn_api_runner.py:437} INFO - Running (((GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/Uniques_CountPerFeatureName/CombinePerKey(CountCombineFn)/Group/Read)+(GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/Uniques_CountPerFeatureName/CombinePerKey(CountCombineFn)/Merge))+(GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/Uniques_CountPerFeatureName/CombinePerKey(CountCombineFn)/ExtractOutputs))+((ref_AppliedPTransform_GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/Uniques_ConvertToSingleFeatureStats_166)+(GenerateStatistics.train/RunStatsGenerators/GenerateSlicedStatisticsImpl/TopKUniquesStatsGenerator/FlattenTopKUniquesResults/Write/1))
[2019-06-14 06:46:42,334] {fn_api_runner.py:437} INFO - Running ((GenerateStatistics.train/RunStatsGenerators/GenerateSlice

[2019-06-14 06:46:42,907] {base_executor.py:74} INFO - Inputs for Executor is: {"input_data": [{"artifact": {"uri": "/root/taxi/data/simple/csv_example_gen/train/", "properties": {"split": {"stringValue": "train"}, "type_name": {"stringValue": "ExamplesPath"}}}, "artifact_type": {"name": "ExamplesPath", "properties": {"span": "INT", "type_name": "STRING", "split": "STRING", "name": "STRING", "state": "STRING"}}}, {"artifact": {"uri": "/root/taxi/data/simple/csv_example_gen/eval/", "properties": {"split": {"stringValue": "eval"}, "type_name": {"stringValue": "ExamplesPath"}}}, "artifact_type": {"name": "ExamplesPath", "properties": {"span": "INT", "type_name": "STRING", "split": "STRING", "name": "STRING", "state": "STRING"}}}], "schema": [{"artifact": {"uri": "/root/taxi/data/simple/schema_gen/", "properties": {"split": {"stringValue": "train"}, "type_name": {"stringValue": "SchemaPath"}}}, "artifact_type": {"name": "SchemaPath", "properties": {"span": "INT", "type_name": "STRING", "sp

[2019-06-14 06:46:44,402] {builder_impl.py:449} INFO - No assets to write.
INFO:tensorflow:SavedModel written to: /root/taxi/data/simple/transform/transform_output/.temp_path/tftransform_tmp/c0516ab5ec7a4f689bf6384c4eff47b6/saved_model.pb
[2019-06-14 06:46:44,451] {builder_impl.py:414} INFO - SavedModel written to: /root/taxi/data/simple/transform/transform_output/.temp_path/tftransform_tmp/c0516ab5ec7a4f689bf6384c4eff47b6/saved_model.pb
INFO:tensorflow:Assets added to graph.
[2019-06-14 06:46:47,387] {builder_impl.py:654} INFO - Assets added to graph.
INFO:tensorflow:No assets to write.
[2019-06-14 06:46:47,389] {builder_impl.py:449} INFO - No assets to write.
INFO:tensorflow:SavedModel written to: /root/taxi/data/simple/transform/transform_output/.temp_path/tftransform_tmp/89afd2f5775e44329f1ebb4c700e2dea/saved_model.pb
[2019-06-14 06:46:47,430] {builder_impl.py:414} INFO - SavedModel written to: /root/taxi/data/simple/transform/transform_output/.temp_path/tftransform_tmp/89afd2f5775

INFO:tensorflow:Saver not created because there are no variables in the graph to restore
[2019-06-14 06:46:54,862] {saver.py:1483} INFO - Saver not created because there are no variables in the graph to restore
[2019-06-14 06:46:55,945] {fn_api_runner.py:437} INFO - Running ((AnalyzeDataset/VocabularyAccumulate[compute_and_apply_vocabulary/vocabulary]/CountPerString/CombinePerKey(CountCombineFn)/Group/Read)+(((AnalyzeDataset/VocabularyAccumulate[compute_and_apply_vocabulary/vocabulary]/CountPerString/CombinePerKey(CountCombineFn)/Merge)+(AnalyzeDataset/VocabularyAccumulate[compute_and_apply_vocabulary/vocabulary]/CountPerString/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_AnalyzeDataset/VocabularyAccumulate[compute_and_apply_vocabulary/vocabulary]/FilterProblematicStrings_112)))+((AnalyzeDataset/VocabularyMerge[compute_and_apply_vocabulary/vocabulary]/CountPerString/Precombine)+(AnalyzeDataset/VocabularyMerge[compute_and_apply_vocabulary/vocabulary]/CountPerStr

[2019-06-14 06:46:56,282] {fn_api_runner.py:437} INFO - Running (AnalyzeDataset/VocabularyOrderAndFilter[compute_and_apply_vocabulary_1/vocabulary]/ApplyFrequencyThresholdAndTopK/Top(1000)/Flatten/Read)+(AnalyzeDataset/VocabularyOrderAndFilter[compute_and_apply_vocabulary_1/vocabulary]/ApplyFrequencyThresholdAndTopK/Top(1000)/GroupByKey/Write)
[2019-06-14 06:46:56,297] {fn_api_runner.py:437} INFO - Running ((AnalyzeDataset/VocabularyOrderAndFilter[compute_and_apply_vocabulary_1/vocabulary]/ApplyFrequencyThresholdAndTopK/Top(1000)/GroupByKey/Read)+(ref_AppliedPTransform_AnalyzeDataset/VocabularyOrderAndFilter[compute_and_apply_vocabulary_1/vocabulary]/ApplyFrequencyThresholdAndTopK/Top(1000)/ParDo(_MergeTopPerBundle)_191))+((ref_AppliedPTransform_AnalyzeDataset/VocabularyOrderAndFilter[compute_and_apply_vocabulary_1/vocabulary]/ApplyFrequencyThresholdAndTopK/FlattenList_192)+(ref_PCollection_PCollection_119/Write))
[2019-06-14 06:46:56,327] {fn_api_runner.py:437} INFO - Running (((ref_A

[2019-06-14 06:46:57,679] {fn_api_runner.py:437} INFO - Running ((((ref_AppliedPTransform_AnalyzeDataset/CacheableCombineMerge[bucketize/quantiles]/MergeCombinesGlobally/DoOnce/Read_243)+((ref_AppliedPTransform_AnalyzeDataset/CacheableCombineMerge[bucketize/quantiles]/MergeCombinesGlobally/InjectDefault_244)+(ref_AppliedPTransform_AnalyzeDataset/CacheableCombineMerge[bucketize/quantiles]/ExtractOutputs/FlatMap(extract_outputs)_246)))+(ref_AppliedPTransform_AnalyzeDataset/CreateTensorBinding[bucketize/quantiles/Placeholder]_247))+(AnalyzeDataset/CreateSavedModel/Flatten/Transcode/2))+(AnalyzeDataset/CreateSavedModel/Flatten/Write/2)
[2019-06-14 06:46:57,711] {fn_api_runner.py:437} INFO - Running (AnalyzeDataset/CacheableCombineAccumulate[scale_to_z_score_2/mean_and_var]/InitialCombineGlobally/CombinePerKey/Group/Read)+((((AnalyzeDataset/CacheableCombineAccumulate[scale_to_z_score_2/mean_and_var]/InitialCombineGlobally/CombinePerKey/Merge)+(AnalyzeDataset/CacheableCombineAccumulate[scale

[2019-06-14 06:46:58,979] {fn_api_runner.py:437} INFO - Running ((AnalyzeDataset/CacheableCombineMerge[bucketize_3/quantiles]/MergeCombinesGlobally/CombinePerKey/Group/Read)+(AnalyzeDataset/CacheableCombineMerge[bucketize_3/quantiles]/MergeCombinesGlobally/CombinePerKey/Merge))+((AnalyzeDataset/CacheableCombineMerge[bucketize_3/quantiles]/MergeCombinesGlobally/CombinePerKey/ExtractOutputs)+((ref_AppliedPTransform_AnalyzeDataset/CacheableCombineMerge[bucketize_3/quantiles]/MergeCombinesGlobally/UnKey_337)+(ref_PCollection_PCollection_209/Write)))
[2019-06-14 06:46:59,158] {fn_api_runner.py:437} INFO - Running ((ref_AppliedPTransform_AnalyzeDataset/CacheableCombineMerge[bucketize_3/quantiles]/MergeCombinesGlobally/DoOnce/Read_339)+(ref_AppliedPTransform_AnalyzeDataset/CacheableCombineMerge[bucketize_3/quantiles]/MergeCombinesGlobally/InjectDefault_340))+(((ref_AppliedPTransform_AnalyzeDataset/CacheableCombineMerge[bucketize_3/quantiles]/ExtractOutputs/FlatMap(extract_outputs)_342)+((ref_

value: "\n\013\n\tConst_2:0\022/vocab_compute_and_apply_vocabulary_1_vocabulary"

value: "\n\013\n\tConst_2:0\022/vocab_compute_and_apply_vocabulary_1_vocabulary"

value: "\n\013\n\tConst_3:0\022-vocab_compute_and_apply_vocabulary_vocabulary"

value: "\n\013\n\tConst_3:0\022-vocab_compute_and_apply_vocabulary_vocabulary"

INFO:tensorflow:Saver not created because there are no variables in the graph to restore
[2019-06-14 06:47:04,567] {saver.py:1483} INFO - Saver not created because there are no variables in the graph to restore
[2019-06-14 06:47:07,021] {fn_api_runner.py:437} INFO - Running (ref_AppliedPTransform_TransformDataset[1]/PrepareToClearSharedKeepAlives/Read_396)+(ref_AppliedPTransform_TransformDataset[1]/WaitAndClearSharedKeepAlives_397)
[2019-06-14 06:47:07,045] {fn_api_runner.py:437} INFO - Running ((ref_PCollection_PCollection_250/Read)+(ref_AppliedPTransform_Materialize[0]/Write/Write/WriteImpl/PreFinalize_415))+(ref_PCollection_PCollection_259/Write)
[2019-06-14 06:47:

[2019-06-14 06:47:07,548] {estimator.py:201} INFO - Using config: {'_keep_checkpoint_every_n_hours': 10000, '_keep_checkpoint_max': 1, '_service': None, '_log_step_count_steps': 100, '_device_fn': None, '_train_distribute': None, '_master': '', '_tf_random_seed': None, '_num_ps_replicas': 0, '_task_id': 0, '_save_checkpoints_secs': None, '_eval_distribute': None, '_is_chief': True, '_num_worker_replicas': 1, '_experimental_distribute': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f56386ffe80>, '_model_dir': '/root/taxi/data/simple/trainer/current/serving_model_dir', '_global_id_in_cluster': 0, '_save_checkpoints_steps': 999, '_protocol': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_evaluation_master': '', '_task_type': 'worker', '_save_summary_steps': 100}
INFO:tensorflow:Training model.
[2019-06-14 06:47:07,552] {executor.py:141} INFO - Training model.
INFO

INFO:tensorflow:Done calling model_fn.
[2019-06-14 06:47:20,277] {estimator.py:1113} INFO - Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2019-06-14T06:47:20Z
[2019-06-14 06:47:20,310] {evaluation.py:257} INFO - Starting evaluation at 2019-06-14T06:47:20Z
INFO:tensorflow:Graph was finalized.
[2019-06-14 06:47:20,520] {monitored_session.py:222} INFO - Graph was finalized.
Instructions for updating:
Use standard file APIs to check for files with this prefix.
Instructions for updating:
Use standard file APIs to check for files with this prefix.
INFO:tensorflow:Restoring parameters from /root/taxi/data/simple/trainer/current/serving_model_dir/model.ckpt-999
[2019-06-14 06:47:20,528] {saver.py:1270} INFO - Restoring parameters from /root/taxi/data/simple/trainer/current/serving_model_dir/model.ckpt-999
INFO:tensorflow:Running local_init_op.
[2019-06-14 06:47:20,640] {session_manager.py:491} INFO - Running local_init_op.
INFO:tensorflow:Done running local_init_op.
[2019-06-14

[2019-06-14 06:47:43,644] {basic_session_run_hooks.py:680} INFO - global_step/sec: 259.081
INFO:tensorflow:loss = 16.93203, step = 2101 (0.388 sec)
[2019-06-14 06:47:43,651] {basic_session_run_hooks.py:247} INFO - loss = 16.93203, step = 2101 (0.388 sec)
INFO:tensorflow:global_step/sec: 267.33
[2019-06-14 06:47:44,018] {basic_session_run_hooks.py:680} INFO - global_step/sec: 267.33
INFO:tensorflow:loss = 13.191904, step = 2201 (0.371 sec)
[2019-06-14 06:47:44,022] {basic_session_run_hooks.py:247} INFO - loss = 13.191904, step = 2201 (0.371 sec)
INFO:tensorflow:global_step/sec: 350.815
[2019-06-14 06:47:44,303] {basic_session_run_hooks.py:680} INFO - global_step/sec: 350.815
INFO:tensorflow:loss = 20.152155, step = 2301 (0.288 sec)
[2019-06-14 06:47:44,310] {basic_session_run_hooks.py:247} INFO - loss = 20.152155, step = 2301 (0.288 sec)
INFO:tensorflow:global_step/sec: 301.039
[2019-06-14 06:47:44,635] {basic_session_run_hooks.py:680} INFO - global_step/sec: 301.039
INFO:tensorflow:los

[2019-06-14 06:47:51,996] {basic_session_run_hooks.py:247} INFO - loss = 15.120426, step = 4501 (0.275 sec)
INFO:tensorflow:global_step/sec: 414.979
[2019-06-14 06:47:52,231] {basic_session_run_hooks.py:680} INFO - global_step/sec: 414.979
INFO:tensorflow:loss = 15.069384, step = 4601 (0.242 sec)
[2019-06-14 06:47:52,239] {basic_session_run_hooks.py:247} INFO - loss = 15.069384, step = 4601 (0.242 sec)
INFO:tensorflow:global_step/sec: 393.449
[2019-06-14 06:47:52,486] {basic_session_run_hooks.py:680} INFO - global_step/sec: 393.449
INFO:tensorflow:loss = 17.666027, step = 4701 (0.253 sec)
[2019-06-14 06:47:52,492] {basic_session_run_hooks.py:247} INFO - loss = 17.666027, step = 4701 (0.253 sec)
INFO:tensorflow:global_step/sec: 378.466
[2019-06-14 06:47:52,750] {basic_session_run_hooks.py:680} INFO - global_step/sec: 378.466
INFO:tensorflow:loss = 14.762946, step = 4801 (0.264 sec)
[2019-06-14 06:47:52,756] {basic_session_run_hooks.py:247} INFO - loss = 14.762946, step = 4801 (0.264 sec

INFO:tensorflow:Saving checkpoints for 6993 into /root/taxi/data/simple/trainer/current/serving_model_dir/model.ckpt.
[2019-06-14 06:48:00,434] {basic_session_run_hooks.py:594} INFO - Saving checkpoints for 6993 into /root/taxi/data/simple/trainer/current/serving_model_dir/model.ckpt.
INFO:tensorflow:Skip the current checkpoint eval due to throttle secs (600 secs).
[2019-06-14 06:48:00,743] {training.py:525} INFO - Skip the current checkpoint eval due to throttle secs (600 secs).
INFO:tensorflow:global_step/sec: 145.701
[2019-06-14 06:48:00,792] {basic_session_run_hooks.py:680} INFO - global_step/sec: 145.701
INFO:tensorflow:loss = 11.043283, step = 7001 (0.684 sec)
[2019-06-14 06:48:00,798] {basic_session_run_hooks.py:247} INFO - loss = 11.043283, step = 7001 (0.684 sec)
INFO:tensorflow:global_step/sec: 241.005
[2019-06-14 06:48:01,207] {basic_session_run_hooks.py:680} INFO - global_step/sec: 241.005
INFO:tensorflow:loss = 16.511175, step = 7101 (0.413 sec)
[2019-06-14 06:48:01,212] {

INFO:tensorflow:global_step/sec: 250.393
[2019-06-14 06:48:09,879] {basic_session_run_hooks.py:680} INFO - global_step/sec: 250.393
INFO:tensorflow:loss = 13.946808, step = 9301 (0.396 sec)
[2019-06-14 06:48:09,884] {basic_session_run_hooks.py:247} INFO - loss = 13.946808, step = 9301 (0.396 sec)
INFO:tensorflow:global_step/sec: 314.214
[2019-06-14 06:48:10,197] {basic_session_run_hooks.py:680} INFO - global_step/sec: 314.214
INFO:tensorflow:loss = 14.231247, step = 9401 (0.319 sec)
[2019-06-14 06:48:10,202] {basic_session_run_hooks.py:247} INFO - loss = 14.231247, step = 9401 (0.319 sec)
INFO:tensorflow:global_step/sec: 105.02
[2019-06-14 06:48:11,150] {basic_session_run_hooks.py:680} INFO - global_step/sec: 105.02
INFO:tensorflow:loss = 13.067789, step = 9501 (0.951 sec)
[2019-06-14 06:48:11,154] {basic_session_run_hooks.py:247} INFO - loss = 13.067789, step = 9501 (0.951 sec)
INFO:tensorflow:global_step/sec: 309.122
[2019-06-14 06:48:11,473] {basic_session_run_hooks.py:680} INFO - g

[2019-06-14 06:48:35,441] {estimator.py:1111} INFO - Calling model_fn.
INFO:tensorflow:Done calling model_fn.
[2019-06-14 06:48:36,740] {estimator.py:1113} INFO - Done calling model_fn.
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['predict']
[2019-06-14 06:48:36,744] {export.py:587} INFO - Signatures INCLUDED in export for Predict: ['predict']
INFO:tensorflow:Signatures INCLUDED in export for Classify: ['serving_default', 'classification']
[2019-06-14 06:48:36,748] {export.py:587} INFO - Signatures INCLUDED in export for Classify: ['serving_default', 'classification']
INFO:tensorflow:Signatures INCLUDED in export for Regress: ['regression']
[2019-06-14 06:48:36,750] {export.py:587} INFO - Signatures INCLUDED in export for Regress: ['regression']
INFO:tensorflow:Signatures INCLUDED in export for Eval: None
[2019-06-14 06:48:36,752] {export.py:587} INFO - Signatures INCLUDED in export for Eval: None
INFO:tensorflow:Signatures INCLUDED in export for Train: None
[2019-06-14 

## Check Result

In [None]:
!ls -Rlhs /root/taxi/data/simple/

/root/taxi/data/simple/:
total 1.9M
4.0K drwxr-xr-x 4 root root 4.0K Jun 14 06:46 csv_example_gen
1.9M -rw-r--r-- 1 root root 1.9M Jun 14 05:46 data.csv
4.0K drwxr-xr-x 2 root root 4.0K Jun 14 06:46 schema_gen
4.0K drwxr-xr-x 4 root root 4.0K Jun 14 06:46 statistics_gen
4.0K drwxr-xr-x 3 root root 4.0K Jun 14 06:47 trainer
4.0K drwxr-xr-x 4 root root 4.0K Jun 14 06:46 transform

/root/taxi/data/simple/csv_example_gen:
total 8.0K
4.0K drwxr-xr-x 2 root root 4.0K Jun 14 06:46 eval
4.0K drwxr-xr-x 2 root root 4.0K Jun 14 06:46 train

/root/taxi/data/simple/csv_example_gen/eval:
total 204K
204K -rw-r--r-- 1 root root 201K Jun 14 06:46 data_tfrecord-00000-of-00001.gz

/root/taxi/data/simple/csv_example_gen/train:
total 408K
408K -rw-r--r-- 1 root root 405K Jun 14 06:46 data_tfrecord-00000-of-00001.gz

/root/taxi/data/simple/schema_gen:
total 8.0K
8.0K -rw-r--r-- 1 root root 4.5K Jun 14 06:46 schema.pbtxt

/root/taxi/data/simple/statistics_gen:
total 8.0K
4.0K drw

In [None]:
with ops.Graph().as_default() as graph:
    with tf.Session(graph=graph) as sess:
        tf.saved_model.loader.load(sess,
                                   [tf.saved_model.tag_constants.SERVING], 
                                   export_dir='/root/taxi/data/simple/trainer/current/serving_model_dir/export/chicago-taxi/1560494915')
        weights = graph

INFO:tensorflow:Restoring parameters from /root/taxi/data/simple/trainer/current/serving_model_dir/export/chicago-taxi/1560494915/variables/variables
[2019-06-14 10:08:35,809] {saver.py:1270} INFO - Restoring parameters from /root/taxi/data/simple/trainer/current/serving_model_dir/export/chicago-taxi/1560494915/variables/variables


In [None]:
%load_ext tensorboard.notebook

In [None]:
%tensorboard --logdir /root/taxi/data/simple/trainer/current/serving_model_dir/export/chicago-taxi/1560494915

`anomalies.pbtxt` is only simple text file. To visualize it, we are going to;

1. get the path of `anomalies.pbtxt`from `example_validator`
2. parse `anomalies.pbtxt` into anomalies (protobuf)
3. visualize schema with [tfdv](https://www.tensorflow.org/tfx/data_validation/get_started)


In [None]:
# 1. get the path of `transformed_examples/train`
def get_transformed_examples(transform):
    artifacts = transform.outputs.transformed_examples.get()
    return types.get_split_uri(artifacts, 'train')

transformed_examples_dir = get_transformed_examples(transform)

In [None]:
# 2. generate statistics from tfrecord
from pathlib import Path
import tensorflow_data_validation as tfdv

def generate_stats(transformed_examples_dir):
    path = Path(transformed_examples_dir)
    for filepath in path.glob('*'):
        # since we are using python 3.5, not 3.6+
        return tfdv.generate_statistics_from_tfrecord(str(filepath))

stats = generate_stats(transformed_examples_dir)

In [None]:
# 3. visualize statistics with tfdv

tfdv.visualize_statistics(stats)