# Intsalling Libraries

In [1]:
!pip install -U tfx
!pip install -U tensorflow-recommenders
!pip install -Uq tensorflow-datasets

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tfx
  Downloading tfx-1.12.0-py3-none-any.whl (2.7 MB)
[K     |████████████████████████████████| 2.7 MB 12.9 MB/s 
[?25hCollecting google-cloud-bigquery<3,>=2.26.0
  Downloading google_cloud_bigquery-2.34.4-py2.py3-none-any.whl (206 kB)
[K     |████████████████████████████████| 206 kB 71.2 MB/s 
[?25hCollecting google-cloud-aiplatform<1.18,>=1.6.2
  Downloading google_cloud_aiplatform-1.17.1-py2.py3-none-any.whl (2.3 MB)
[K     |████████████████████████████████| 2.3 MB 55.0 MB/s 
Collecting keras-tuner<2,>=1.0.4
  Downloading keras_tuner-1.1.3-py3-none-any.whl (135 kB)
[K     |████████████████████████████████| 135 kB 57.8 MB/s 
Collecting tensorflow-model-analysis<0.44.0,>=0.43.0
  Downloading tensorflow_model_analysis-0.43.0-py3-none-any.whl (1.9 MB)
[K     |████████████████████████████████| 1.9 MB 59.1 MB/s 
[?25hCollecting tensorflow-serving-api!=2.0.*,!=2.1.*,!=2.2.

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorflow-recommenders
  Downloading tensorflow_recommenders-0.7.2-py3-none-any.whl (89 kB)
[K     |████████████████████████████████| 89 kB 5.5 MB/s 
Installing collected packages: tensorflow-recommenders
Successfully installed tensorflow-recommenders-0.7.2
[K     |████████████████████████████████| 5.2 MB 11.3 MB/s 
[?25h

In [1]:
import os
import absl
import json
import pprint
import tempfile

from typing import Any, Dict, List, Text

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs
import tensorflow_model_analysis as tfma
import apache_beam as beam

from absl import logging

from tfx.components.example_gen.base_example_gen_executor import BaseExampleGenExecutor
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.components.example_gen import utils
from tfx.dsl.components.base import executor_spec

from tfx.types import artifact
from tfx.types import artifact_utils
from tfx.types import channel
from tfx.types import standard_artifacts
from tfx.types.standard_artifacts import Examples

from tfx.dsl.component.experimental.annotations import InputArtifact
from tfx.dsl.component.experimental.annotations import OutputArtifact
from tfx.dsl.component.experimental.annotations import Parameter
from tfx.dsl.component.experimental.decorators import component
from tfx.types.experimental.simple_artifacts import Dataset

from tfx import v1 as tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

# Set up logging.
tf.get_logger().propagate = False
absl.logging.set_verbosity(absl.logging.INFO)
pp = pprint.PrettyPrinter()

print(f"TensorFlow version: {tf.__version__}")
print(f"TFX version: {tfx.__version__}")
print(f"TensorFlow Recommenders version: {tfrs.__version__}")

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

TensorFlow version: 2.11.0
TFX version: 1.12.0
TensorFlow Recommenders version: v0.7.2


# Setting Up Files Destinations

In [2]:
context = InteractiveContext()



In [3]:
SCHEMA_PIPELINE_NAME = 'sample_schema'
PIPELINE_NAME = 'sample_pipeline'

# Output directory to store artifacts generated from the pipeline.
SCHEMA_PIPELINE_ROOT = os.path.join('pipelines', SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME,
                                    'metadata.db')
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

In [4]:
import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
# 'https://raw.githubusercontent.com/omar-lotfy-17/Datasets/main/nurmerical_amazon_fashion.csv'
_data_url = 'https://github.com/omar-lotfy-17/Datasets/raw/main/cleaned_amazon_fashion.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)

('/tmp/tfx-datahzfd5hdo/data.csv', <http.client.HTTPMessage at 0x7f1d100682b0>)

In [5]:
!head {_data_filepath}

,overall,verified,reviewTime,reviewerID,asin,reviewerName,reviewText,summary,unixReviewTime,size,color
24,1.0,True,2018-03-12,A3QY3THQ42WSCQ,B000YFSR5G,Amazon Customer,Waaaay too BIG,One Star,1520812800, X-Large, Charcoal Heather
25,5.0,True,2018-02-08,AGZ5OOZVDO194,B000YFSR5G,Amazon Customer,Comfortable,Five Stars,1518048000, XX-Large, Deep Royal
27,5.0,True,2017-10-25,A232J1FHOQI5YN,B000YFSR5G,Louis Robbio,Good product for the price.  Used very day and no problem.,Five Stars,1508889600, X-Large, Navy
28,5.0,True,2017-10-14,A2ON4RYI9Z4SQ,B000YFSR5G,Fhteacher,Good fit even after washing in hot water to force shrinking. I have a 30 inch inseam and 34 waist and got a medium size. Note that these sweatpants are very thick so not good for warm climates. Good quality and great price.,Good fit,1507939200, Medium, Light Steel
29,1.0,True,2018-03-12,A3QY3THQ42WSCQ,B000YFSR4W,Amazon Customer,Waaaay too BIG,One Star,1520812800, X-Large, Charcoal Heather
30,5.0,True,2017-07-30,A232J1FHOQI5YN,B000

# Schema Pipeline

In [6]:
def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:

  example_gen = tfx.components.CsvExampleGen(input_base=data_root)
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
  
  components = [
      example_gen,
      statistics_gen,
      schema_gen,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [7]:
from tfx.v1 import orchestration
orchestration.LocalDagRunner().run(
  _create_schema_pipeline(
      pipeline_name=SCHEMA_PIPELINE_NAME,
      pipeline_root=SCHEMA_PIPELINE_ROOT,
      data_root=DATA_ROOT,
      metadata_path=SCHEMA_METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metada

INFO:absl:Processing input csv data /tmp/tfx-datahzfd5hdo/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/sample_schema/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:823196,xor_checksum:1672629637,sum_checksum:1672629637"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties 

In [8]:
from ml_metadata.proto import metadata_store_pb2
from tfx.orchestration.portable.mlmd import execution_lib
from tfx.orchestration.experimental.interactive import visualizations
from tfx.orchestration.experimental.interactive import standard_visualizations

def get_latest_artifacts(metadata, pipeline_name, component_id):
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  print(context)
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_output_artifacts(metadata, latest_execution.id)
 

def visualize_artifacts(artifacts):
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)


standard_visualizations.register_standard_visualizations()

In [9]:
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

INFO:absl:MetadataStore with DB connection initialized


id: 5
type_id: 12
name: "sample_schema.StatisticsGen"
create_time_since_epoch: 1672629648094
last_update_time_since_epoch: 1672629648094

id: 7
type_id: 12
name: "sample_schema.SchemaGen"
create_time_since_epoch: 1672629661148
last_update_time_since_epoch: 1672629661148



In [10]:
visualize_artifacts(stats_artifacts)

In [11]:
visualize_artifacts(schema_artifacts)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'',INT,required,,-
'asin',STRING,required,,'asin'
'color',STRING,required,,'color'
'overall',FLOAT,required,,-
'reviewText',BYTES,required,,-
'reviewTime',BYTES,required,,-
'reviewerID',BYTES,required,,-
'reviewerName',BYTES,required,,-
'size',STRING,required,,'size'
'summary',BYTES,required,,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'asin',"'B000YFSR4W', 'B000YFSR5G', 'B0014F7B98', 'B001IKJOLW', 'B0058YEJ5K', 'B005AGO4LU', 'B0092UF54A', 'B009MA34NY', 'B010RRWKT4', 'B014IBJKNO'"
'color',"' Black', ' Black/Green Glow-blue Glow-white', ' Black/Hyper Violet/Total Crimson/White', ' Black/Red Bronze/White', ' Black/White-anthracite-stealth', ' Black/White/Anthracite/Stealth', ' Black/Wolf Grey/White/Pink', ' Blue Tint/Green Glow/Hasta/White', ' Bright Mango/Crimson/White/Blue', ' Charcoal Heather', ' Cool Grey / Volt - Wolf Grey - Pure Platinum', ' Cool Grey/Pure Platinum/White/Volt', ' Cool Grey/Rage Green/Hyper Turquoise', ' Cool Grey/Team Orange/White/Metallic Platinum', ' Cool Grey/Team Orange/White/Platinum', ' Cool Grey/Volt-pure Platinum', ' Cool Grey/Volt/Pure Platinum', ' Deep Royal', ' Energy/Mtlc Silver/Black', ' Green Glow/Seaweed - Hasta - White', ' Grey', ' Grey/Orange', ' Hyper Jade/Mtlc Silver/Hyper Turq', ' Light Steel', ' Navy', ' Ocean Fog/Blue Grey/Mango', ' Pink Blast/Stealth/Hyper Pink/White', ' Plure Platinum/Blue Glow/Wolf Grey', ' Pure Platinum/Blue Glow/Wolf Grey', ' Racer Blue/Obsidian/Blue Tint', ' White/Hyper Pink/Pure Platinum/Wolf Grey', ' White/Metallic Silver/Black', ' Wolf Grey/Black-pink Blast/White', ' Wolf Grey/Platinum/White/Crimson', ' black'"
'size',"' 10 B(M) US', ' 10 D - Wide', ' 10 D(M) US', ' 10 M US', ' 10 W US', ' 10.5 B(M) US', ' 10.5 D(M) US', ' 10.5 M US', ' 11 B(M) US', ' 11 D(M) US', ' 11 M US', ' 11.5 B(M) US', ' 12 B(M) US', ' 12 D(M) US', ' 12 M US', ' 13 D(M) US', ' 13 M US', ' 5 B(M) US', ' 5 M US', ' 6 B(M) US', ' 6 M US', ' 6.5 B(M) US', ' 6.5 M US', ' 7 B(M) US', ' 7 D - Wide', ' 7 M US', ' 7 W US', ' 7.5 B(M) US', ' 7.5 D - Wide', ' 7.5 D(M) US', ' 7.5 M US', ' 7.5 W US', ' 8 B(M) US', ' 8 D - Wide', ' 8 M US', ' 8 W US', ' 8.5 B(M) US', ' 8.5 D - Wide', ' 8.5 M US', ' 8.5 W US', ' 9 B(M) US', ' 9 D - Wide', ' 9 D(M) US', ' 9 M US', ' 9 W US', ' 9.5 B(M) US', ' 9.5 D Wide', ' 9.5 M US', ' 9.5 W US', ' Large', ' Medium', ' X-Large', ' XX-Large', ' XXX-Large'"
'verified',"'False', 'True'"


In [12]:
import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

shutil.copy(_generated_path, SCHEMA_PATH)

'schema/schema.pbtxt'

In [13]:
print(f'Schema at {SCHEMA_PATH}-----')
!cat {SCHEMA_PATH}/*

Schema at schema-----
feature {
  name: ""
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "asin"
  type: BYTES
  domain: "asin"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "color"
  type: BYTES
  domain: "color"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "overall"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "reviewText"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}
feature {
  name: "reviewTime"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "reviewerID"
  type: BYTES
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "reviewerN

# Transforming and Training Module

In [18]:
_module_file = 'module.py'

In [19]:
%%writefile {_module_file}

from typing import List, Text
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils
from keras.models import Model
from keras.layers import Input, Embedding, Flatten, Dense, Concatenate
from keras.layers import Dropout, BatchNormalization, Activation,  Dot, Add
from keras.regularizers import l2
from keras.optimizers import SGD, Adam
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = ['reviewerID', 'asin']
_LABEL_KEY = 'overall'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10
_NUM_OOV_BUCKETS = 1

def preprocessing_fn(inputs):
  outputs = {}

  for key in _FEATURE_KEYS:
    outputs[key] = tft.compute_and_apply_vocabulary(inputs[key],
                                                    num_oov_buckets=1)


  outputs[_LABEL_KEY] = inputs[_LABEL_KEY]

  return outputs


def _apply_preprocessing(raw_features, tft_layer):
  transformed_features = tft_layer(raw_features)
  if _LABEL_KEY in raw_features:
    transformed_label = transformed_features.pop(_LABEL_KEY)
    return transformed_features, transformed_label
  else:
    return transformed_features, None


def _get_serve_tf_examples_fn(model, tf_transform_output):
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function(input_signature=[
      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  ])
  def serve_tf_examples_fn(serialized_tf_examples):
    feature_spec = tf_transform_output.raw_feature_spec()
    required_feature_spec = {
        k: v for k, v in feature_spec.items() if k in _FEATURE_KEYS
    }
    parsed_features = tf.io.parse_example(serialized_tf_examples,
                                          required_feature_spec)


    transformed_features, _ = _apply_preprocessing(parsed_features,
                                                   model.tft_layer)
    
    return model(transformed_features)

  return serve_tf_examples_fn


def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:

  dataset = data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(batch_size=batch_size),
      schema=tf_transform_output.raw_metadata.schema)

  transform_layer = tf_transform_output.transform_features_layer()
  def apply_transform(raw_features):
    return _apply_preprocessing(raw_features, transform_layer)

  return dataset.map(apply_transform).repeat()


def _build_keras_model() -> tf.keras.Model:
  K = 10 # latent dimensionality
  mu = 4.2
  epochs = 30
  reg = 0. 
  N = 387
  M = 10
  feature_keys = [keras.layers.Input(shape=(1,), name=key)
      for key in _FEATURE_KEYS]


  # keras model
  u = Input(shape=(1,), name='reviewerID')
  m = Input(shape=(1,), name='asin')
  u_embedding = Embedding(N, K)(u) # (N, 1, K)
  m_embedding = Embedding(M, K)(m) # (N, 1, K)


  ##### main branch
  u_bias = Embedding(N, 1)(u) # (N, 1, 1)
  m_bias = Embedding(M, 1)(m) # (N, 1, 1)
  x = Dot(axes=2)([u_embedding, m_embedding]) # (N, 1, 1)
  x = Add()([x, u_bias, m_bias])
  x = Flatten()(x) # (N, 1)


  ##### side branch
  u_embedding = Flatten()(u_embedding) # (N, K)
  m_embedding = Flatten()(m_embedding) # (N, K)
  y = Concatenate()([u_embedding, m_embedding]) # (N, 2K)
  y = Dense(400)(y)
  y = Activation('elu')(y)
  # y = Dropout(0.5)(y)
  y = Dense(1)(y)


  ##### merge
  x = Add()([x, y])
  model = Model(inputs=[u, m], outputs=x)

  model.compile(
    loss='mse',
    # optimizer='adam',
    # optimizer=Adam(lr=0.01),
    optimizer=SGD(lr=0.08, momentum=0.9),
    metrics=['mse'],
  )
  model.summary(print_fn=logging.info)
  return model


def run_fn(fn_args: tfx.components.FnArgs):
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_EVAL_BATCH_SIZE)


  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps) 

  signatures = {
      'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Writing module.py


# Creating and Running The Pipeline 

In [22]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:

  example_gen = tfx.components.CsvExampleGen(input_base=data_root)


  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])


  schema_importer = tfx.dsl.Importer(
      source_uri=schema_path,
      artifact_type=tfx.types.standard_artifacts.Schema).with_id(
          'schema_importer')


  example_validator = tfx.components.ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_importer.outputs['result'])


  transform = tfx.components.Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_importer.outputs['result'],
      materialize=False,
      module_file=module_file)


  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      schema=schema_importer.outputs['result'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))
  

  # model_resolver = tfx.dsl.Resolver(
  #     strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
  #     model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
  #     model_blessing=tfx.dsl.Channel(
  #         type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
  #             'latest_blessed_model_resolver')
  
  # metrics_specs = tfma.metrics.specs_from_metrics(
  #     [tf.keras.metrics.MeanSquaredError(name='mse')])
  
  # eval_config = tfma.EvalConfig(
  #     model_specs=[tfma.ModelSpec(label_key='overall')],
  #     slicing_specs=[tfma.SlicingSpec(), 
  #                    tfma.SlicingSpec(feature_keys=['overall'])]
  #     )
  
  # evaluator = tfx.components.Evaluator(
  #     examples=example_gen.outputs['examples'],
  #     model=trainer.outputs['model'],
  #     baseline_model=model_resolver.outputs['model'],
  #     eval_config=eval_config)
  

  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      # model_blessing=evaluator.outputs['blessing'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      statistics_gen,
      schema_importer,
      example_validator,
      transform,
      # trainer,
      # model_resolver,
      # evaluator,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [23]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      schema_path=SCHEMA_PATH,
      module_file=_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/content/module.py' (including modules: ['module']).
INFO:absl:User module package has hash fingerprint version 4cf9464a5f3dce4ad2ea12851d79fc06a1ef7ffe1bccc9cacb711db7621e8416.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmpo73ohcbv/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpawobeifq', '--dist-dir', '/tmp/tmpyx82afbc']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/sample_pipeline/_wheels/tfx_user_code_Transform-0.0+4cf9464a5f3dce4ad2ea12851d79fc06a1ef7ffe1bccc9cacb711db7621e8416-py3-none-any.whl'; target user module is 'module'.
INFO:absl:Full user module path is 'module@pipelines/sample_pipeline/_wheels/tfx_user_code_Transform-0.0+4cf9464a5f3dce4ad2ea12851d79fc06a1ef7ffe1bccc9cacb711db7621e8416-py3-none-any.whl'
INFO:absl:Using deployment config:
 executor

In [None]:
# metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
#     METADATA_PATH)

# with Metadata(metadata_connection_config) as metadata_handler:
#   # Find output artifacts from MLMD.
#   evaluator_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
#                                           'Evaluator')
#   eval_artifact = evaluator_output[standard_component_specs.EVALUATION_KEY][0]

In [None]:
# eval_result = tfma.load_eval_result(eval_artifact.uri)
# tfma.view.render_slicing_metrics(eval_result, slicing_column='overall')

In [24]:
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
                                   'ExampleValidator')
  anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]

INFO:absl:MetadataStore with DB connection initialized


id: 11
type_id: 12
name: "sample_pipeline.ExampleValidator"
create_time_since_epoch: 1672629815686
last_update_time_since_epoch: 1672629815686



# Checking The Served Model

In [25]:
visualize_artifacts(anomalies_artifacts)

In [26]:
# List files in created model directory.
!find {SERVING_MODEL_DIR}

serving_model/sample_pipeline
serving_model/sample_pipeline/1672629929
serving_model/sample_pipeline/1672629929/variables
serving_model/sample_pipeline/1672629929/variables/variables.index
serving_model/sample_pipeline/1672629929/variables/variables.data-00000-of-00001
serving_model/sample_pipeline/1672629929/saved_model.pb
serving_model/sample_pipeline/1672629929/fingerprint.pb
serving_model/sample_pipeline/1672629929/assets
serving_model/sample_pipeline/1672629929/assets/vocab_compute_and_apply_vocabulary_1_vocabulary
serving_model/sample_pipeline/1672629929/assets/vocab_compute_and_apply_vocabulary_vocabulary
serving_model/sample_pipeline/1672629929/keras_metadata.pb
