## TFX Pipelines

#### Generate Examples

In [None]:
# Temporary commands to unzip the zip file in Google Cloud Storage
# ! gsutil -m cp gs://text-analysis-323506/train_data/train_val.zip ./
# ! unzip train_val.zip
# ! gunzip *.csv.gz
# ! gsutil -m mv *.csv gs://text-analysis-323506/train_data/

In [None]:
# ! pip3 install tfx==1.4.0
# ! pip install pyparsing==2.4.2

In [47]:
import time
import os

import tfx

import absl
import os
import tempfile
import time

import tensorflow as tf
import tensorflow_data_validation as tfdv
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
import tensorflow as tf
from tfx.components.common_nodes.importer_node import ImporterNode

from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import example_gen_pb2
from tensorflow_metadata.proto.v0 import schema_pb2, statistics_pb2, anomalies_pb2
from tfx.components import StatisticsGen, CsvExampleGen, SchemaGen, ExampleValidator, Transform

from tfx.components import Trainer
from tfx.components import Transform
from tfx.components import Tuner
from tfx.dsl.components.base import executor_spec
from tfx.components.trainer import executor as trainer_executor

from tfx.proto import infra_validator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2

In [2]:
tf.__version__

'2.6.2'

In [3]:
tfx.__version__

'1.4.0'

In [4]:
DATA_ROOT = 'gs://text-analysis-323506/train_data/'
ARTIFACT_STORE = os.path.join(os.sep, 'home', 'jupyter', 'artifact-store')
SERVING_MODEL_DIR=os.path.join(os.sep, 'home', 'jupyter', 'serving_model')

In [5]:
PIPELINE_NAME = 'sentiment-analysis'
PIPELINE_ROOT = os.path.join('/home/jupyter/', PIPELINE_NAME, time.strftime("%Y%m%d_%H%M%S"))
os.makedirs(PIPELINE_ROOT, exist_ok=True)

In [6]:
context = InteractiveContext(
    pipeline_name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
    metadata_connection_config=None)



### CSV Example Generator

In [7]:
output_config = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[        
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=4),
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
    ]))

In [8]:
example_gen = CsvExampleGen(
    input_base=DATA_ROOT,
    output_config=output_config)

In [None]:
context.run(example_gen)

In [10]:
examples_uri = example_gen.outputs['examples'].get()[0].uri

In [11]:
tfrecord_filenames = [os.path.join(examples_uri, 'Split-train', name)
                      for name in os.listdir(os.path.join(examples_uri, 'Split-train'))]

In [12]:
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

2021-11-30 17:15:06.172454: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


In [13]:
for tfrecord in dataset.take(2):
    example = tf.train.Example()
    example.ParseFromString(tfrecord.numpy())
    for name, feature in example.features.feature.items():
        if feature.HasField('bytes_list'):
            value = feature.bytes_list.value
        if feature.HasField('float_list'):
            value = feature.float_list.value
        if feature.HasField('int64_list'):
            value = feature.int64_list.value
        print('{}: {}'.format(name, value))
    print('******')

labels: [1]
input: [b'myattorney home business looking account prose litigant state federal lawsuite maintain hisher files']
******
input: [b'great book liked book sarah plain tall authors name patricia maclachen characters names caleb sarah anna papa though book interestingit first started caleb siting fire asking questions mama singing songs anna explaining born mama died sarah answered papas letter came live sarah taught caleb swim anybody gets book really enjoy']
labels: [0]
******


2021-11-30 17:15:06.281215: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


#### Train and eval datasets have been created properly !

### Statistics Generator

In [14]:
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

In [None]:
context.run(statistics_gen)

In [None]:
context.show(statistics_gen.outputs['statistics'])

### Schema Generator

In [17]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)

In [None]:
context.run(schema_gen)

In [19]:
context.show(schema_gen.outputs['schema'])

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'input',BYTES,required,single,-
'labels',INT,required,single,-


In [20]:
schema_proto_path = '{}/{}'.format(schema_gen.outputs['schema'].get()[0].uri, 'schema.pbtxt')
schema = tfdv.load_schema_text(schema_proto_path)

In [21]:
tfdv.set_domain(schema, 'labels', schema_pb2.IntDomain(name='labels', min=0, max=1, is_categorical=True))



In [22]:
tfdv.display_schema(schema=schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'input',BYTES,required,single,-
'labels',INT,required,single,min: 0; max: 1


#### Write schema to new file

In [23]:
schema_dir = os.path.join(ARTIFACT_STORE, 'schema')
tf.io.gfile.makedirs(schema_dir)
schema_file = os.path.join(schema_dir, 'schema.pbtxt')

tfdv.write_schema_text(schema, schema_file)

!cat {schema_file}

feature {
  name: "input"
  value_count {
    min: 1
    max: 1
  }
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "labels"
  value_count {
    min: 1
    max: 1
  }
  type: INT
  int_domain {
    name: "labels"
    min: 0
    max: 1
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}


### Schema Importer

In [24]:
schema_importer = tfx.dsl.components.common.importer.Importer(
      source_uri=schema_dir,
      artifact_type=tfx.types.standard_artifacts.Schema).with_id(
          'schema_importer') 

In [None]:
context.run(schema_importer)

In [26]:
context.show(schema_importer.outputs['result'])

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'input',BYTES,required,single,-
'labels',INT,required,single,min: 0; max: 1


### ExampleValidator

In [27]:
example_validator = ExampleValidator(    
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_importer.outputs['result']).with_id(
          'example_validator') 

In [None]:
context.run(example_validator)

In [29]:
train_uri = example_validator.outputs['anomalies'].get()[0].uri
train_anomalies_filename = os.path.join(train_uri, "Split-train/SchemaDiff.pb")
!cat $train_anomalies_filename


M

input*0r	      �?
-
labels*0J
labels  (r	      �?8

In [30]:
context.show(example_validator.outputs['anomalies'])

### Transform

In [31]:
TRANSFORM_MODULE = 'preprocessing.py'

In [32]:
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_importer.outputs['result'],
    module_file=TRANSFORM_MODULE)

In [None]:
context.run(transform)

In [34]:
transform.outputs['transformed_examples'].get()[0].uri

'/home/jupyter/sentiment-analysis/20211130_171252/Transform/transformed_examples/6'

In [35]:
os.listdir(transform.outputs['transformed_examples'].get()[0].uri)

['Split-train', 'Split-eval']

In [36]:
transform_uri = transform.outputs['transformed_examples'].get()[0].uri
tfrecord_filenames = [os.path.join(transform_uri,  'Split-train', name)
                      for name in os.listdir(os.path.join(transform_uri, 'Split-train'))]

In [37]:
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
for tfrecord in dataset.take(2):
    example = tf.train.Example()
    example.ParseFromString(tfrecord.numpy())
    for name, feature in example.features.feature.items():
        if feature.HasField('bytes_list'):
            value = feature.bytes_list.value
        if feature.HasField('float_list'):
            value = feature.float_list.value
        if feature.HasField('int64_list'):
            value = feature.int64_list.value
    print('{}: {}'.format(name, value))
    print('******')

labels_xf: [1]
******
input_xf: [b'great book liked book sarah plain tall authors name patricia maclachen characters names caleb sarah anna papa though book interestingit first started caleb siting fire asking questions mama singing songs anna explaining born mama died sarah answered papas letter came live sarah taught caleb swim anybody gets book really enjoy']
******


### Trainer

In [38]:
TRAINER_MODULE_FILE = 'model.py'

In [48]:
trainer = Trainer(
    custom_executor_spec=executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
    module_file=TRAINER_MODULE_FILE,
    transformed_examples=transform.outputs['transformed_examples'],
    schema=schema_importer.outputs['result'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=5000),
    eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=1000))



In [51]:
context.run(trainer)



Processing /home/jupyter/sentiment-analysis/20211130_171252/_wheels/tfx_user_code_Trainer-0.0+99359850b23ca73104bd2f3199a77368c1bb0849cbfb17faf56531520364258c-py3-none-any.whl
Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+99359850b23ca73104bd2f3199a77368c1bb0849cbfb17faf56531520364258c




INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
  "The `lr` argument is deprecated, use `learning_rate` instead.")
2021-11-30 17:42:31.890021: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-11-30 17:42:31.890075: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-11-30 17:42:31.891185: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-11-30 17:42:31.965794: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-11-30 17:42:34.520936: W tensorflow/core/framework/op_kernel.cc:1669] OP_REQUIRES failed at cast_op.cc:121 : Unimplemented: Cast string to float is not supported


UnimplementedError:  Cast string to float is not supported
	 [[node model/Cast (defined at /threading.py:926) ]] [Op:__inference_train_function_8447]

Function call stack:
train_function
