# Create an interactive TFX pipeline

This notebook is the first of two notebooks that guide you through automating the [Real-time Item-to-item Recommendation with BigQuery ML Matrix Factorization and ScaNN](https://github.com/GoogleCloudPlatform/analytics-componentized-patterns/tree/master/retail/recommendation-system/bqml-scann) solution with a pipeline.

Use this notebook to create and run a [TFX](https://www.tensorflow.org/tfx) pipeline that performs the following steps:

1. Compute PMI on item co-occurrence data by using a [custom Python function](https://www.tensorflow.org/tfx/guide/custom_function_component) component.
2. Train a BigQuery ML matrix factorization model on the PMI data to learn item embeddings by using a custom Python function component.
3. Extract the embeddings from the model to a BigQuery table by using a custom Python function component.
4. Export the embeddings in [TFRecord](https://www.tensorflow.org/tutorials/load_data/tfrecord) format by using the standard [BigQueryExampleGen](https://www.tensorflow.org/tfx/api_docs/python/tfx/extensions/google_cloud_big_query/example_gen/component/BigQueryExampleGen) component.
5. Import the schema for the embeddings by using the standard [ImporterNode](https://www.tensorflow.org/tfx/api_docs/python/tfx/components/ImporterNode) component.
6. Validate the embeddings against the imported schema by using the standard [StatisticsGen](https://www.tensorflow.org/tfx/guide/statsgen) and [ExampleValidator](https://www.tensorflow.org/tfx/guide/exampleval) components. 
7. Create an embedding lookup SavedModel by using the standard [Trainer](https://www.tensorflow.org/tfx/api_docs/python/tfx/components/Trainer) component.
8. Push the embedding lookup model to a model registry directory by using the standard [Pusher](https://www.tensorflow.org/tfx/guide/pusher) component.
9. Build the ScaNN index by using the standard Trainer component.
10. Evaluate and validate the ScaNN index latency and recall by implementing a [TFX custom component](https://www.tensorflow.org/tfx/guide/custom_component).
11. Push the ScaNN index to a model registry directory by using the standard Pusher component.

The [tfx_pipeline](tfx_pipeline) directory contains the source code for the TFX pipeline implementation. 

Before starting this notebook, you must run the [00_prep_bq_procedures](00_prep_bq_procedures.ipynb) notebook to complete the solution prerequisites.

After completing this notebook, run the [tfx02_deploy_run](tfx02_deploy_run.ipynb) notebook to deploy the pipeline.

## Setup

Import the required libraries, configure the environment variables, and authenticate your GCP account.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
!pip install -U -q tfx

### Import libraries

In [None]:
import os
import numpy as np
import tfx
import tensorflow as tf
import tensorflow_data_validation as tfdv
from tensorflow_transform.tf_metadata import schema_utils
import logging

logging.getLogger().setLevel(logging.INFO)

print("Tensorflow Version:", tf.__version__)
print("TFX Version:", tfx.__version__)

### Configure GCP environment settings

Update the following variables to reflect the values for your GCP environment:

+ `PROJECT_ID`: The ID of the Google Cloud project you are using to implement this solution.
+ `BUCKET`: The name of the Cloud Storage bucket you created to use with this solution. The `BUCKET` value should be just the bucket name, so `myBucket` rather than `gs://myBucket`.

In [None]:
PROJECT_ID = 'yourProject' # Change to your project.
BUCKET = 'yourBucket' # Change to the bucket you created.
BQ_DATASET_NAME = 'recommendations'
ARTIFACT_STORE = f'gs://{BUCKET}/tfx_artifact_store'
LOCAL_MLMD_SQLLITE = 'mlmd/mlmd.sqllite'
PIPELINE_NAME = 'tfx_bqml_scann'
EMBEDDING_LOOKUP_MODEL_NAME = 'embeddings_lookup'
SCANN_INDEX_MODEL_NAME = 'embeddings_scann'

PIPELINE_ROOT = os.path.join(ARTIFACT_STORE, f'{PIPELINE_NAME}_interactive')
MODEL_REGISTRY_DIR = os.path.join(ARTIFACT_STORE, 'model_registry_interactive')

!gcloud config set project $PROJECT_ID

### Authenticate your GCP account
This is required if you run the notebook in Colab. If you use an AI Platform notebook, you should already be authenticated.

In [None]:
try:
  from google.colab import auth
  auth.authenticate_user()
  print("Colab user is authenticated.")
except: pass

## Instantiate the interactive context

Instantiate an [interactive context](https://www.tensorflow.org/tfx/api_docs/python/tfx/orchestration/experimental/interactive/interactive_context/InteractiveContext) so that you can execute the TFX pipeline components interactively in the notebook. The interactive context creates a local SQLite database in the `LOCAL_MLMD_SQLLITE` directory to use as its [ML Metadata (MLMD)](https://github.com/google/ml-metadata) store.


In [None]:
CLEAN_ARTIFACTS = True
if CLEAN_ARTIFACTS:
  if tf.io.gfile.exists(PIPELINE_ROOT):
    print("Removing previous artifacts...")
    tf.io.gfile.rmtree(PIPELINE_ROOT)
  if tf.io.gfile.exists('mlmd'):
    print("Removing local mlmd SQLite...")
    tf.io.gfile.rmtree('mlmd')

if not tf.io.gfile.exists('mlmd'):
  print("Creating mlmd directory...")
  tf.io.gfile.mkdir('mlmd')
    
print(f'Pipeline artifacts directory: {PIPELINE_ROOT}')
print(f'Model registry directory: {MODEL_REGISTRY_DIR}')
print(f'Local metadata SQLlit path: {LOCAL_MLMD_SQLLITE}')

In [None]:
import ml_metadata as mlmd
from ml_metadata.proto import metadata_store_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = LOCAL_MLMD_SQLLITE
connection_config.sqlite.connection_mode = 3 # READWRITE_OPENCREATE
mlmd_store = mlmd.metadata_store.MetadataStore(connection_config)

context = InteractiveContext(
  pipeline_name=PIPELINE_NAME,
  pipeline_root=PIPELINE_ROOT,
  metadata_connection_config=connection_config
)

## Executing the pipeline steps
The components that implement the pipeline steps are in the [tfx_pipeline/bq_components.py](tfx_pipeline/bq_components.py) module.

In [None]:
from tfx_pipeline import bq_components

### Step 1: Compute PMI

Run the `pmi_computer` step, which is an instance of the `compute_pmi` custom Python function component. This component executes the [sp_ComputePMI](sql_scripts/sp_ComputePMI.sql) stored procedure in BigQuery and returns the name of the resulting table as a custom property.


In [None]:
pmi_computer = bq_components.compute_pmi(
  project_id=PROJECT_ID,
  bq_dataset=BQ_DATASET_NAME,
  min_item_frequency=15,
  max_group_size=100,
)

In [None]:
context.run(pmi_computer)

In [None]:
pmi_computer.outputs.item_cooc.get()[0].get_string_custom_property('bq_result_table')

### Step 2: Train the BigQuery ML matrix factorization model

Run the `bqml_trainer` step, which is an instance of the `train_item_matching_model` custom Python function component. This component executes the [sp_TrainItemMatchingModel](sql_scripts/sp_TrainItemMatchingModel.sql) stored procedure in BigQuery and returns the name of the resulting model as a custom property.


In [None]:
bqml_trainer = bq_components.train_item_matching_model(
  project_id=PROJECT_ID,
  bq_dataset=BQ_DATASET_NAME,
  item_cooc=pmi_computer.outputs.item_cooc,
  dimensions=50,
)

In [None]:
context.run(bqml_trainer)

In [None]:
bqml_trainer.outputs.bq_model.get()[0].get_string_custom_property('bq_model_name')

### Step 3: Extract the trained embeddings

Run the `embeddings_extractor` step, which is an instance of the `extract_embeddings` custom Python function component. This component executes the [sp_ExractEmbeddings](sql_scripts/sp_ExractEmbeddings.sql) stored procedure in BigQuery and returns the name of the resulting table as a custom property.

In [None]:
embeddings_extractor = bq_components.extract_embeddings(
  project_id=PROJECT_ID,
  bq_dataset=BQ_DATASET_NAME,
  bq_model=bqml_trainer.outputs.bq_model,
)

In [None]:
context.run(embeddings_extractor)

In [None]:
embeddings_extractor.outputs.item_embeddings.get()[0].get_string_custom_property('bq_result_table')

### Step 4: Export the embeddings in TFRecord format

Run the `embeddings_exporter` step, which is an instance of the `BigQueryExampleGen` standard component. This component uses a SQL query to read the embedding records from BigQuery and produces an [Examples](https://www.tensorflow.org/tfx/api_docs/python/tfx/types/standard_artifacts/Examples) artifact containing training and evaluation datasets as an output. It then exports these datasets in TFRecord format by using a Beam pipeline. This pipeline can be run using the [DirectRunner or DataflowRunner](https://beam.apache.org/documentation/runners/capability-matrix/). Note that in this interactive context, the embedding records to read is limited to 1000, and the runner of the Beam pipeline is set to DirectRunner.


In [None]:
from tfx.proto import example_gen_pb2
from tfx.extensions.google_cloud_big_query.example_gen.component import BigQueryExampleGen

query = f'''
  SELECT item_Id, embedding, bias,
  FROM {BQ_DATASET_NAME}.item_embeddings
  LIMIT 1000
'''

output_config = example_gen_pb2.Output(
  split_config=example_gen_pb2.SplitConfig(splits=[
    example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=1)])
)

embeddings_exporter = BigQueryExampleGen(
  query=query,
  output_config=output_config
)

In [None]:
beam_pipeline_args = [
  '--runner=DirectRunner',
  f'--project={PROJECT_ID}',
  f'--temp_location=gs://{BUCKET}/bqml_scann/beam/temp',
]

context.run(embeddings_exporter, beam_pipeline_args=beam_pipeline_args)

### Step 5: Import the schema for the embeddings

Run the `schema_importer` step, which is an instance of the `ImporterNode` standard component. This component reads the [schema.pbtxt](tfx_pipeline/schema/schema.pbtxt) file from the solution's `schema` directory, and produces a `Schema` artifact as an output. The schema is used to validate the embedding files exported from BigQuery, and to parse the embedding records in the TFRecord files when they are read in the training components.


In [None]:
schema_importer = tfx.components.ImporterNode(
  source_uri='tfx_pipeline/schema',
  artifact_type=tfx.types.standard_artifacts.Schema,
  instance_name='SchemaImporter'
)

In [None]:
context.run(schema_importer)

In [None]:
context.show(schema_importer.outputs.result)

#### Read a sample embedding from the exported TFRecord files using the schema

In [None]:
schema_file = schema_importer.outputs.result.get()[0].uri + "/schema.pbtxt"
schema = tfdv.load_schema_text(schema_file)
feature_sepc = schema_utils.schema_as_feature_spec(schema).feature_spec

In [None]:
data_uri = embeddings_exporter.outputs.examples.get()[0].uri + "/train/*"

def _gzip_reader_fn(filenames):
  return tf.data.TFRecordDataset(filenames, compression_type='GZIP')

dataset = tf.data.experimental.make_batched_features_dataset(
  data_uri, 
  batch_size=1, 
  num_epochs=1,
  features=feature_sepc,
  reader=_gzip_reader_fn,
  shuffle=True
)

counter = 0
for _ in dataset: counter +=1
print(f'Number of records: {counter}')
print('')

for batch in dataset.take(1):
  print(f'item: {batch["item_Id"].numpy()[0][0].decode()}')
  print(f'embedding vector: {batch["embedding"].numpy()[0]}')

### Step 6: Validate the embeddings against the imported schema

Runs the `stats_generator`, which is an instance of the `StatisticsGen` standard component. This component accepts the output `Examples` artifact from the `embeddings_exporter` step and computes descriptive statistics for these examples by using an Apache Beam pipeline. The component produces a `Statistics` artifact as an output.

In [None]:
stats_generator = tfx.components.StatisticsGen(
  examples=embeddings_exporter.outputs.examples,
)

context.run(stats_generator)

Run the `stats_validator`, which is an instance of the `ExampleValidator` component. This component validates the output statistics against the schema. It accepts the `Statistics` artifact produced by the `stats_generator` step and the `Schema` artifact produced by the `schema_importer` step, and produces `Anomalies` artifacts as outputput if any anomalies are found.

In [None]:
stats_validator = tfx.components.ExampleValidator(
  statistics=stats_generator.outputs.statistics,
  schema=schema_importer.outputs.result,
)

context.run(stats_validator)

In [None]:
context.show(stats_validator.outputs.anomalies)

### Step 7: Create an embedding lookup SavedModel

Runs the `embedding_lookup_creator` step, which is an instance of the `Trainer` standard component. This component accepts the `Schema` artifact from the `schema_importer` step and the`Examples` artifact from the `embeddings_exporter` step as inputs, executes the [lookup_creator.py](tfx_pipeline/lookup_creator.py) module, and produces an embedding lookup `Model` artifact as an output.


In [None]:
from tfx.components.base import executor_spec
from tfx.components.trainer import executor as trainer_executor

_module_file = 'tfx_pipeline/lookup_creator.py'

embedding_lookup_creator = tfx.components.Trainer(
  custom_executor_spec=executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
  module_file=_module_file,
  train_args={'splits': ['train'], 'num_steps': 0},
  eval_args={'splits': ['train'], 'num_steps': 0},
  schema=schema_importer.outputs.result,
  examples=embeddings_exporter.outputs.examples,
)

In [None]:
context.run(embedding_lookup_creator)

#### Validate the lookup model

Use the [TFX InfraValidator](https://www.tensorflow.org/tfx/guide/infra_validator) to make sure the created model is mechanically fine and can be loaded successfully.

In [None]:
from tfx.proto import infra_validator_pb2

serving_config = infra_validator_pb2.ServingSpec(
  tensorflow_serving=infra_validator_pb2.TensorFlowServing(
      tags=['latest']),
  local_docker=infra_validator_pb2.LocalDockerConfig(),
)
  
validation_config = infra_validator_pb2.ValidationSpec(
  max_loading_time_seconds=60,
  num_tries=3,
)

infra_validator = tfx.components.InfraValidator(
  model=embedding_lookup_creator.outputs.model,
  serving_spec=serving_config,
  validation_spec=validation_config,
)

In [None]:
context.run(infra_validator)

In [None]:
tf.io.gfile.listdir(infra_validator.outputs.blessing.get()[0].uri)

### Step 8: Push the embedding lookup model to the model registry

Run the `embedding_lookup_pusher` step, which is an instance of the `Pusher` standard component. This component accepts the embedding lookup `Model` artifact from the `embedding_lookup_creator` step, and stores the SavedModel in the location specified by the `MODEL_REGISTRY_DIR` variable.


In [None]:
embedding_lookup_pusher = tfx.components.Pusher(
  model=embedding_lookup_creator.outputs.model,
  infra_blessing=infra_validator.outputs.blessing,
  push_destination=tfx.proto.pusher_pb2.PushDestination(
    filesystem=tfx.proto.pusher_pb2.PushDestination.Filesystem(
      base_directory=os.path.join(MODEL_REGISTRY_DIR, EMBEDDING_LOOKUP_MODEL_NAME))
  )
)

In [None]:
context.run(embedding_lookup_pusher)

In [None]:
lookup_savedmodel_dir = embedding_lookup_pusher.outputs.pushed_model.get()[0].get_string_custom_property('pushed_destination')
!saved_model_cli show --dir {lookup_savedmodel_dir} --tag_set serve --signature_def serving_default

In [None]:
loaded_model = tf.saved_model.load(lookup_savedmodel_dir)
vocab = [token.strip() for token in tf.io.gfile.GFile(
  loaded_model.vocabulary_file.asset_path.numpy().decode(), 'r').readlines()]

In [None]:
input_items = [vocab[0], ' '.join([vocab[1], vocab[2]]), 'abc123']
print(input_items)
output = loaded_model(input_items)
print(f'Embeddings retrieved: {len(output)}')
for idx, embedding in enumerate(output):
  print(f'{input_items[idx]}: {embedding[:5]}')

### Step 9: Build the ScaNN index

Run the `scann_indexer` step, which is an instance of the `Trainer` standard component. This component accepts the `Schema` artifact from the `schema_importer` step and the `Examples` artifact from the `embeddings_exporter` step as inputs, executes the [scann_indexer.py](tfx_pipeline/scann_indexer.py) module, and produces the ScaNN index `Model` artifact as an output.


In [None]:
from tfx.components.base import executor_spec
from tfx.components.trainer import executor as trainer_executor

_module_file = 'tfx_pipeline/scann_indexer.py'

scann_indexer = tfx.components.Trainer(
  custom_executor_spec=executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
  module_file=_module_file,
  train_args={'splits': ['train'], 'num_steps': 0},
  eval_args={'splits': ['train'], 'num_steps': 0},
  schema=schema_importer.outputs.result,
  examples=embeddings_exporter.outputs.examples
)

In [None]:
context.run(scann_indexer)

### Step 10: Evaluate and validate the ScaNN index

Runs the `index_evaluator` step, which is an instance of the `IndexEvaluator` custom TFX component. This component accepts the `Examples` artifact from the `embeddings_exporter` step, the `Schema` artifact from the `schema_importer` step, and ScaNN index `Model` artifact from the `scann_indexer` step. The IndexEvaluator component completes the following tasks:

+ Uses the schema to parse the embedding records. 
+ Evaluates the matching latency of the index.
+ Compares the recall of the produced matches with respect to the exact matches.
+ Validates the latency and recall against the `max_latency` and `min_recall` input parameters.

When it is finished, it produces a `ModelBlessing` artifact as output, which indicates whether the ScaNN index passed the validation criteria or not.

The IndexEvaluator custom component is implemented in the [tfx_pipeline/scann_evaluator.py](tfx_pipeline/scann_evaluator.py) module.

In [None]:
from tfx_pipeline import scann_evaluator

index_evaluator = scann_evaluator.IndexEvaluator(
  examples=embeddings_exporter.outputs.examples,
  model=scann_indexer.outputs.model,
  schema=schema_importer.outputs.result,
  min_recall=0.8,
  max_latency=0.01,
)

In [None]:
context.run(index_evaluator)

### Step 11: Push the ScaNN index to the model registry

Runs the `embedding_scann_pusher` step, which is an instance of the `Pusher` standard component. This component accepts the ScaNN index `Model` artifact from the `scann_indexer` step and the `ModelBlessing` artifact from the `index_evaluator` step, and stores the SavedModel in the location specified by the `MODEL_REGISTRY_DIR` variable.


In [None]:
embedding_scann_pusher = tfx.components.Pusher(
  model=scann_indexer.outputs.model,
  model_blessing=index_evaluator.outputs.blessing,
  push_destination=tfx.proto.pusher_pb2.PushDestination(
    filesystem=tfx.proto.pusher_pb2.PushDestination.Filesystem(
      base_directory=os.path.join(MODEL_REGISTRY_DIR, SCANN_INDEX_MODEL_NAME))
  )
)

In [None]:
context.run(embedding_scann_pusher)

In [None]:
from index_server.matching import ScaNNMatcher
scann_index_dir = embedding_scann_pusher.outputs.pushed_model.get()[0].get_string_custom_property('pushed_destination')
scann_matcher = ScaNNMatcher(scann_index_dir)

In [None]:
vector = np.random.rand(50)
scann_matcher.match(vector, 5)

## Check the local MLMD store 

In [None]:
mlmd_store.get_artifacts()

## View the model registry directory

In [None]:
!gsutil ls {MODEL_REGISTRY_DIR}

## License

Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

See the License for the specific language governing permissions and limitations under the License.

**This is not an official Google product but sample code provided for an educational purpose**