### MLOps on Kubeflow and Feature store 

- TODO:
- Bucket and pipeline and featurestore on different regions, recreate them on same regions based on featurestore avaliablity
- Pipeline fails if import job already in progress, add code to wait till previous import is done
- There was some errors with tf probablity on tensorflow 2.7 so had to downgrade to 2.5
- Align on aiplatform sdk, currently both aiplatformv1 and aiplatformv1beta and kfp ai apltform is used
- Have a good usecase to demo for featurestore serving

In [216]:
# %unload_ext nb_black
# # magic cmd to autoformat code
# #!pip install nb_black

In [None]:
# ! pip3 install {USER_FLAG} google-cloud-aiplatform==1.1.1
# ! pip3 install {USER_FLAG} google-cloud-pipeline-components==0.1.3
# ! pip3 install {USER_FLAG} --upgrade kfp
# ! pip3 install {USER_FLAG} numpy==1.20.3
# ! pip3 install --upgrade tensorflow
# ! pip3 install tf-nightly

### Imports

In [219]:
import os
import sys

from google_cloud_pipeline_components import aiplatform as gcc_aip

from kfp.v2 import compiler, dsl
from kfp.v2.google.client import AIPlatformClient
from google.cloud.aiplatform_v1 import (
    FeaturestoreOnlineServingServiceClient,
    FeaturestoreServiceClient,
)
from google.cloud.aiplatform_v1.types import FeatureSelector, IdMatcher
from google.cloud.aiplatform_v1.types import entity_type as entity_type_pb2
from google.cloud.aiplatform_v1.types import feature as feature_pb2
from google.cloud.aiplatform_v1.types import featurestore as featurestore_pb2
from google.cloud.aiplatform_v1.types import (
    featurestore_online_service as featurestore_online_service_pb2,
)
from google.cloud.aiplatform_v1.types import (
    featurestore_service as featurestore_service_pb2,
)
from google.cloud.aiplatform_v1.types import io as io_pb2
from google.protobuf.duration_pb2 import Duration
from google.cloud import aiplatform
from src.trainer import trainer_component
from src.generator import generator_component
from src.ingester import ingester_component
from src import load_component
import importlib
from src import feature_store_helper


#### Envs

In [206]:
PROJECT_ID = "mlop-cg-data-and-insights"
PROJECT_NO = "389886591986"
BUCKET_NAME = "gs://mlops-vertex-capgemini"
REGION = "europe-west1"

API_ENDPOINT = "europe-west3-aiplatform.googleapis.com"
INPUT_CSV_FILE = ""
FEATURESTORE_ID = "movie_prediction"
FEATURE_STORE_REGION = "europe-west3"
ENTITY_TYPE_ID="movie_entity"
ENTITY_ID_FIELD="user_id"

# BigQuery parameters
BIGQUERY_DATASET_ID = f"{PROJECT_ID}.movielens_dataset"
BIGQUERY_LOCATION = "EU"
BIGQUERY_TABLE_ID = f"{BIGQUERY_DATASET_ID}.training_dataset"
BIGQUERY_RAW_TABLE_ID = f"{BIGQUERY_DATASET_ID}.raw_dataset"
BIGQUERY_INPUT_URI=f"bq://{BIGQUERY_RAW_TABLE_ID}"


# https://www.kaggle.com/prajitdatta/movielens-100k-dataset
# Dataset parameters
RAW_DATA_PATH = "gs://mlops-vertex-capgemini/dataset/u.data"

# u.data -- The full u data set, 100000 ratings by 943 users on 1682 items.
# Each user has rated at least 20 movies. Users and items are
# numbered consecutively from 1. The data is randomly
# ordered. This is a tab separated list of
# user id | item id | rating | timestamp.
# The time stamps are unix seconds since 1/1/1970 UTC

# Pipeline parameters
PIPELINE_NAME = "movie-prediction"
ENABLE_CACHING = False
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline"
PIPELINE_SPEC_PATH = "metadata_pipeline.json"
OUTPUT_COMPONENT_SPEC = "output-component.yaml"
BIGQUERY_TMP_FILE = "tmp.json"
BIGQUERY_MAX_ROWS = 5
TFRECORD_FILE = f"{BUCKET_NAME}/trainer_input_path/tf"
LOGGER_PUBSUB_TOPIC = "logger-pubsub-topic"
LOGGER_CLOUD_FUNCTION = "logger-cloud-function"

# Trainer parameters
TRAINING_ARTIFACTS_DIR = f"{BUCKET_NAME}/artifacts"
TRAINING_REPLICA_COUNT = "1"
TRAINING_MACHINE_TYPE = "n1-standard-4"
TRAINING_ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
TRAINING_ACCELERATOR_COUNT = "0"

# Deployer parameters
TRAINED_POLICY_DISPLAY_NAME = "movielens-trained-policy"
ENDPOINT_DISPLAY_NAME = "movielens-endpoint"
ENDPOINT_MACHINE_TYPE = "n1-standard-4"

# Prediction container parameters
PREDICTION_CONTAINER = "prediction-container"
PREDICTION_CONTAINER_DIR = "src/prediction_container"

### Create Bucket

In [225]:
# ! gsutil mb -l $REGION $BUCKET_NAME
! gsutil ls -al $BUCKET_NAME

   1979226  2021-12-14T10:28:10Z  gs://mlops-vertex-capgemini/u.data#1639477690746113  metageneration=1
   1979259  2021-12-14T11:20:28Z  gs://mlops-vertex-capgemini/uasdf.csv#1639480828395119  metageneration=1
                                 gs://mlops-vertex-capgemini/artifacts/
                                 gs://mlops-vertex-capgemini/dataset/
                                 gs://mlops-vertex-capgemini/europe-west1-projects/
                                 gs://mlops-vertex-capgemini/pipeline/
                                 gs://mlops-vertex-capgemini/trainer_input_path/
TOTAL: 2 objects, 3958485 bytes (3.78 MiB)


### Create Featurestore

In [209]:
feature_store_helper.create_featurestore(
    project=PROJECT_ID,
    featurestore_id=FEATURESTORE_ID,
    location=FEATURE_STORE_REGION,
    api_endpoint=API_ENDPOINT,
)

Long running operation: projects/389886591986/locations/europe-west3/featurestores/movie_prediction/operations/4115397255974879232
create_featurestore_response: name: "projects/389886591986/locations/europe-west3/featurestores/movie_prediction"



### List Featurestores

In [226]:
# get featurestore list to verify, can use this code later to cleanup
featurestore_list = feature_store_helper.list_featurestore(
    project=PROJECT_ID, location=FEATURE_STORE_REGION
)
print(featurestore_list)

Featurestores found: 1
['projects/389886591986/locations/europe-west3/featurestores/movie_prediction']


### Create Entity

In [211]:
feature_store_helper.create_entity_type(project=PROJECT_ID,
                                        featurestore_id=FEATURESTORE_ID,
                                        entity_type_id=ENTITY_TYPE_ID,
                                        description="movies")

Long running operation: projects/389886591986/locations/europe-west3/featurestores/movie_prediction/entityTypes/movie_entity/operations/9006306451299237888
create_entity_type_response: name: "projects/389886591986/locations/europe-west3/featurestores/movie_prediction/entityTypes/movie_entity"



### Create Features

In [212]:
feature_store_helper.create_feature(project=PROJECT_ID,
                                    featurestore_id=FEATURESTORE_ID,
                                    entity_type_id=ENTITY_TYPE_ID,
                                    feature_id="user_id",
                                    value_type=aiplatform.gapic.Feature.ValueType.STRING)
feature_store_helper.create_feature(project=PROJECT_ID,
                                    featurestore_id=FEATURESTORE_ID,
                                    entity_type_id=ENTITY_TYPE_ID,
                                    feature_id="item_id",
                                    value_type=aiplatform.gapic.Feature.ValueType.STRING)
feature_store_helper.create_feature(project=PROJECT_ID,
                                    featurestore_id=FEATURESTORE_ID,
                                    entity_type_id=ENTITY_TYPE_ID,
                                    feature_id="rating",
                                    value_type=aiplatform.gapic.Feature.ValueType.STRING)
feature_store_helper.create_feature(project=PROJECT_ID,
                                    featurestore_id=FEATURESTORE_ID,
                                    entity_type_id=ENTITY_TYPE_ID,
                                    feature_id="timestamp",
                                    value_type=aiplatform.gapic.Feature.ValueType.STRING)

Long running operation: projects/389886591986/locations/europe-west3/featurestores/movie_prediction/operations/620603945135374336
create_feature_response: name: "projects/389886591986/locations/europe-west3/featurestores/movie_prediction/entityTypes/movie_entity/features/user_id"

Long running operation: projects/389886591986/locations/europe-west3/featurestores/movie_prediction/operations/3669540892865200128
create_feature_response: name: "projects/389886591986/locations/europe-west3/featurestores/movie_prediction/entityTypes/movie_entity/features/item_id"

Long running operation: projects/389886591986/locations/europe-west3/featurestores/movie_prediction/operations/6425743864815943680
create_feature_response: name: "projects/389886591986/locations/europe-west3/featurestores/movie_prediction/entityTypes/movie_entity/features/rating"

Long running operation: projects/389886591986/locations/europe-west3/featurestores/movie_prediction/operations/2165338617323454464
create_feature_respons

### Cleanup and delete featurestore

In [None]:
# feature_store_helper.cleanup_featurestore(PROJECT_ID, FEATURESTORE_ID)

### Cloudbuild file 

In [None]:
# cloudbuild_yaml = """steps:
# - name: "gcr.io/kaniko-project/executor:latest"
#   args: ["--destination=gcr.io/{PROJECT_ID}/{PREDICTION_CONTAINER}:latest",
#          "--cache=true",
#          "--cache-ttl=99h"]
#   env: ["AIP_STORAGE_URI={ARTIFACTS_DIR}",
#         "PROJECT_ID={PROJECT_ID}",
#         "LOGGER_PUBSUB_TOPIC={LOGGER_PUBSUB_TOPIC}"]
# options:
#   machineType: "E2_HIGHCPU_8"
# """.format(
#     PROJECT_ID=PROJECT_ID,
#     PREDICTION_CONTAINER=PREDICTION_CONTAINER,
#     ARTIFACTS_DIR=TRAINING_ARTIFACTS_DIR,
#     LOGGER_PUBSUB_TOPIC=LOGGER_PUBSUB_TOPIC,
# )

# with open(f"{PREDICTION_CONTAINER_DIR}/cloudbuild.yaml", "w") as fp:
#     fp.write(cloudbuild_yaml)

In [None]:
# ! gcloud builds submit --config $PREDICTION_CONTAINER_DIR/cloudbuild.yaml $PREDICTION_CONTAINER_DIR

### Run code locally

In [None]:
importlib.reload(load_component)
load_component.load_raw_data_to_bigquery(PROJECT_ID,RAW_DATA_PATH,BIGQUERY_DATASET_ID, BIGQUERY_LOCATION, BIGQUERY_TABLE_ID)

In [None]:
importlib.reload(generator_component)
generator_component.generate_movielens_dataset_for_bigquery(PROJECT_ID,RAW_DATA_PATH,8,20,20,3, BIGQUERY_TMP_FILE, BIGQUERY_DATASET_ID, BIGQUERY_LOCATION,BIGQUERY_TABLE_ID)

In [None]:
importlib.reload(ingester_component)
ingester_component.ingest_bigquery_dataset_into_tfrecord(PROJECT_ID, BIGQUERY_TABLE_ID,TFRECORD_FILE, 10000)

In [None]:
# code for manually verifying your tfrecord dataset
import tensorflow as tf 
raw_dataset = tf.data.TFRecordDataset("tf")

for raw_record in raw_dataset.take(1):
    example = tf.train.Example()
    example.ParseFromString(raw_record.numpy())
    print(example)

In [None]:
importlib.reload(trainer_component)
trainer_component.training_op

In [None]:
feature_store_helper.import_feature_values(project=PROJECT_ID,
                                    featurestore_id=FEATURESTORE_ID,
                                    entity_type_id=ENTITY_TYPE_ID,
                                    bigquery_uri=bigquery_source,
                                    entity_id_field=ENTITY_ID_FIELD)

In [187]:
from src import bigquery_to_featurestore
importlib.reload(bigquery_to_featurestore)


<module 'src.bigquery_to_featurestore' from '/home/jupyter/pawan/mlops/notebook/pipeline/src/bigquery_to_featurestore.py'>

In [182]:
bigquery_to_featurestore.import_feature_values(project=PROJECT_ID,
                                    featurestore_id=FEATURESTORE_ID,
                                    entity_type_id=ENTITY_TYPE_ID,
                                    bigquery_uri=bigquery_source,
                                    entity_id_field=ENTITY_ID_FIELD)

Long running operation: projects/389886591986/locations/europe-west3/featurestores/movie_prediction/entityTypes/movie_entity/operations/2241899810988752896
import_feature_values_response: imported_entity_count: 100000
imported_feature_value_count: 400000



'movie_prediction'

### Kubeflow Pipeline

In [221]:
from kfp.components import load_component_from_url
from kfp.components import create_component_from_func

from src.trainer import trainer_component
from src.generator import generator_component
from src.ingester import ingester_component
from src import load_component
from src import bigquery_to_featurestore


# so jupyter kernel reloads the modules when we change them
importlib.reload(load_component)
importlib.reload(generator_component)
importlib.reload(ingester_component)
importlib.reload(bigquery_to_featurestore)


@dsl.pipeline(pipeline_root=PIPELINE_ROOT, name=f"{PIPELINE_NAME}-startup")
def pipeline(
    # Pipeline configs
    project_id: str,
    raw_data_path: str,
    training_artifacts_dir: str,
    featurestore_id: str,
    entity_type_id: str,
    bigquery_uri: str,
    entity_id_field: str,
    # BigQuery configs
    bigquery_dataset_id: str,
    bigquery_location: str,
    bigquery_table_id: str,
    bigquery_raw_table_id: str,
    bigquery_max_rows: int = 10000,
    # TF-Agents RL configs
    batch_size: int = 8,
    rank_k: int = 20,
    num_actions: int = 20,
    driver_steps: int = 3,
    num_epochs: int = 5,
    tikhonov_weight: float = 0.01,
    agent_alpha: float = 10,
) -> None:
    
    load_op = create_component_from_func(
    func=load_component.load_raw_data_to_bigquery,
    output_component_file="load-output-component.yaml",
    packages_to_install=[
      "google-cloud-bigquery==2.20.0",
    ],
  )
    load_task = load_op(
        project_id=project_id,
        raw_data_path=raw_data_path,
        bigquery_dataset_id=bigquery_dataset_id,
        bigquery_location=bigquery_location,
        bigquery_table_id=bigquery_raw_table_id,
    )
    
    preprocess_op = create_component_from_func(
    func=bigquery_to_featurestore.import_feature_values,
    output_component_file="preprocess-output-component.yaml",
    packages_to_install=[
      "google-cloud-aiplatform",
    ],
    )
    preprocess_task = preprocess_op(
        project=project_id,
        featurestore_id=featurestore_id,
        entity_type_id=entity_type_id,
        bigquery_uri=bigquery_uri,
        bigquery_table_id=load_task.outputs["bigquery_table_id"],
        entity_id_field=entity_id_field, 
    )
    
    generate_op = create_component_from_func(
    func=generator_component.generate_movielens_dataset_for_bigquery,
    base_image="tensorflow/tensorflow:2.5.0",
    output_component_file="generate-output-component.yaml",
    packages_to_install=[
      "google-cloud-bigquery==2.20.0",
      "tensorflow==2.5.0",
      "Image",
      "tf-agents==0.8.0",
    ],
  )

    # Run the Generator component.
    generate_task = generate_op(
        project_id=project_id,
        raw_data_path=raw_data_path,
        batch_size=batch_size,
        rank_k=rank_k,
        num_actions=num_actions,
        driver_steps=driver_steps,
        bigquery_tmp_file=BIGQUERY_TMP_FILE,
        bigquery_dataset_id=bigquery_dataset_id,
        bigquery_location=bigquery_location,
        bigquery_table_id=bigquery_table_id,
        feature_id=preprocess_task.outputs["featurestore_id"],
    )
    
    ingest_op = create_component_from_func(
    func=ingester_component.ingest_bigquery_dataset_into_tfrecord,
    base_image="tensorflow/tensorflow:2.5.0",
    output_component_file=f"ingest-{OUTPUT_COMPONENT_SPEC}",
    packages_to_install=[
      "google-cloud-bigquery==2.20.0",
      "tensorflow==2.5.0",
    ],
  )

    # Run the Ingester component.
    ingest_task = ingest_op(
        project_id=project_id,
        bigquery_table_id=generate_task.outputs["bigquery_table_id"],
        bigquery_max_rows=bigquery_max_rows,
        tfrecord_file=TFRECORD_FILE,
    )

    # Run the Trainer component and submit custom job to Vertex AI.
    train_op = create_component_from_func(
      func=trainer_component.training_op,
      output_component_file=f"trainer-{OUTPUT_COMPONENT_SPEC}",
      packages_to_install=[
          "tensorflow==2.5.0",
          "tf-agents==0.8.0",
      ])

    train_task = train_op(
      training_artifacts_dir=training_artifacts_dir,
      # tfrecord_file=ingest_task.outputs["tfrecord_file"],
      tfrecord_file="gs://mlops-vertex-capgemini/trainer_input_path/tf",
      num_epochs=num_epochs,
      rank_k=rank_k,
      num_actions=num_actions,
      tikhonov_weight=tikhonov_weight,
      agent_alpha=agent_alpha)

    worker_pool_specs = [
        {
            "containerSpec": {
                "imageUri": train_task.container.image,
            },
            "replicaCount": TRAINING_REPLICA_COUNT,
            "machineSpec": {
                "machineType": TRAINING_MACHINE_TYPE,
                "acceleratorType": TRAINING_ACCELERATOR_TYPE,
                "acceleratorCount": TRAINING_ACCELERATOR_COUNT,
            },
        },
    ]
    train_task.custom_job_spec = {
        "displayName": train_task.name,
        "jobSpec": {
            "workerPoolSpecs": worker_pool_specs,
        },
    }

    # # Run the Deployer components.
    # # Upload the trained policy as a model.
    model_upload_op = gcc_aip.ModelUploadOp(
      project=project_id,
      location=REGION,
      display_name=TRAINED_POLICY_DISPLAY_NAME,
      artifact_uri=training_artifacts_dir,
      serving_container_image_uri=f"gcr.io/{PROJECT_ID}/{PREDICTION_CONTAINER}:latest",
    )
    train_task.after(ingest_task)
    # # Model uploading has to occur after training completes.
    model_upload_op.after(train_task)
    # # Create a Vertex AI endpoint. (This operation can occur in parallel with
    # # the Generator, Ingester, Trainer components.)
    endpoint_create_op = gcc_aip.EndpointCreateOp(
      project=project_id,
      location=REGION,
      display_name=ENDPOINT_DISPLAY_NAME)
    # Deploy the uploaded, trained policy to the created endpoint. (This operation
    # has to occur after both model uploading and endpoint creation complete.)
    model_deploy_op = gcc_aip.ModelDeployOp(
      project=project_id,
      endpoint=endpoint_create_op.outputs["endpoint"],
      model=model_upload_op.outputs["model"],
      #endpoint="495264017615421440",
      #model="944845526120005632",
      deployed_model_display_name=TRAINED_POLICY_DISPLAY_NAME,
      machine_type=ENDPOINT_MACHINE_TYPE)


In [222]:
# Compile the authored pipeline.
compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_SPEC_PATH)

# Createa Vertex AI client.
api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)

# Create a pipeline run job.
response = api_client.create_run_from_job_spec(
    job_spec_path=PIPELINE_SPEC_PATH,
    parameter_values={
        # Pipeline configs
        "project_id": PROJECT_ID,
        "raw_data_path": RAW_DATA_PATH,
        "training_artifacts_dir": TRAINING_ARTIFACTS_DIR,
        "featurestore_id": FEATURESTORE_ID,
        "entity_type_id": ENTITY_TYPE_ID,
        "bigquery_uri": BIGQUERY_INPUT_URI,
        "entity_id_field": ENTITY_ID_FIELD,
        # BigQuery configs
        "bigquery_dataset_id": BIGQUERY_DATASET_ID,
        "bigquery_location": BIGQUERY_LOCATION,
        "bigquery_table_id": BIGQUERY_TABLE_ID,
        "bigquery_raw_table_id": BIGQUERY_RAW_TABLE_ID,
    },
    enable_caching=ENABLE_CACHING,
)



### Run locally for debugging

In [223]:
project_id=PROJECT_ID
raw_data_path= RAW_DATA_PATH
training_artifacts_dir= TRAINING_ARTIFACTS_DIR
# BigQuery configs
bigquery_dataset_id= BIGQUERY_DATASET_ID
bigquery_location= BIGQUERY_LOCATION
bigquery_table_id= BIGQUERY_TABLE_ID
tfrecord_file="gs://mlops-vertex-capgemini/trainer_input_path/tf"
bigquery_max_rows= 10000
# TF-Agents RL configs
batch_size=  8
rank_k= 20
num_actions= 20
driver_steps= 3
num_epochs= 5
tikhonov_weight: float = 0.01
agent_alpha: float = 10

In [None]:
print("start")
"""The Trainer component for training a policy on TFRecord files."""
# Import for the function return value type.
from typing import NamedTuple  # pylint: disable=unused-import

from kfp import components


import collections
from typing import Dict, List, NamedTuple  # pylint: disable=redefined-outer-name,reimported

import tensorflow as tf

from tf_agents import agents
from tf_agents import policies
from tf_agents import trajectories
from tf_agents.bandits.agents import lin_ucb_agent
from tf_agents.policies import policy_saver
from tf_agents.specs import tensor_spec

import logging

per_arm = False  # Using the non-per-arm version of the movie environment.

# Mapping from feature name to serialized value
feature_description = {
    "step_type": tf.io.FixedLenFeature((), tf.string),
    "observation": tf.io.FixedLenFeature((), tf.string),
    "action": tf.io.FixedLenFeature((), tf.string),
    "policy_info": tf.io.FixedLenFeature((), tf.string),
    "next_step_type": tf.io.FixedLenFeature((), tf.string),
    "reward": tf.io.FixedLenFeature((), tf.string),
    "discount": tf.io.FixedLenFeature((), tf.string),
}

def _parse_record(raw_record: tf.Tensor) -> Dict[str, tf.Tensor]:
    """Parses a serialized `tf.train.Example` proto.
    Args:
    raw_record: A serialized data record of a `tf.train.Example` proto.
    Returns:
    A dict mapping feature names to values as `tf.Tensor` objects of type
    string containing serialized protos, following `feature_description`.
    """
    return tf.io.parse_single_example(raw_record, feature_description)

def build_trajectory(
    parsed_record: Dict[str, tf.Tensor],
    policy_info: policies.utils.PolicyInfo) -> trajectories.Trajectory:
    """Builds a `trajectories.Trajectory` object from `parsed_record`.
    Args:
    parsed_record: A dict mapping feature names to values as `tf.Tensor`
        objects of type string containing serialized protos.
    policy_info: Policy information specification.
    Returns:
    A `trajectories.Trajectory` object that contains values as de-serialized
    `tf.Tensor` objects from `parsed_record`.
    """
    return trajectories.Trajectory(
        step_type=tf.expand_dims(
            tf.io.parse_tensor(parsed_record["step_type"], out_type=tf.int32),
            axis=1),
        observation=tf.expand_dims(
            tf.io.parse_tensor(
                parsed_record["observation"], out_type=tf.float32),
            axis=1),
        action=tf.expand_dims(
            tf.io.parse_tensor(parsed_record["action"], out_type=tf.int32),
            axis=1),
        policy_info=policy_info,
        next_step_type=tf.expand_dims(
            tf.io.parse_tensor(
                parsed_record["next_step_type"], out_type=tf.int32),
            axis=1),
        reward=tf.expand_dims(
            tf.io.parse_tensor(parsed_record["reward"], out_type=tf.float32),
            axis=1),
        discount=tf.expand_dims(
            tf.io.parse_tensor(parsed_record["discount"], out_type=tf.float32),
            axis=1))

def train_policy_on_trajectory(
    agent: agents.TFAgent,
    tfrecord_file: str,
    num_epochs: int
) -> NamedTuple("TrainOutputs", [
    ("policy", policies.TFPolicy),
    ("train_loss", Dict[str, List[float]]),
]):
    """Trains the policy in `agent` on the dataset of `tfrecord_file`.
    Parses `tfrecord_file` as `tf.train.Example` objects, packages them into
    `trajectories.Trajectory` objects, and trains the agent's policy on these
    trajectory objects.
    Args:
    agent: A TF-Agents agent that carries the policy to train.
    tfrecord_file: Path to the TFRecord file containing the training dataset.
    num_epochs: Number of epochs to train the policy.
    Returns:
    A NamedTuple of (a trained TF-Agents policy, a dict mapping from
    "epoch<i>" to lists of loss values produced at each training step).
    """
    raw_dataset = tf.data.TFRecordDataset([tfrecord_file])
    parsed_dataset = raw_dataset.map(_parse_record)

    train_loss = collections.defaultdict(list)
    for epoch in range(num_epochs):
        for parsed_record in parsed_dataset:
            trajectory = build_trajectory(parsed_record, agent.policy.info_spec)
            loss, _ = agent.train(trajectory)
            train_loss[f"epoch{epoch + 1}"].append(loss.numpy())

    train_outputs = collections.namedtuple(
        "TrainOutputs",
        ["policy", "train_loss"])
    return train_outputs(agent.policy, train_loss)

def execute_training_and_save_policy(
    training_artifacts_dir: str,
    tfrecord_file: str,
    num_epochs: int,
    rank_k: int,
    num_actions: int,
    tikhonov_weight: float,
    agent_alpha: float) -> None:
    """Executes training for the policy and saves the policy.
    Args:
    training_artifacts_dir: Path to store the Trainer artifacts (trained
        policy).
    tfrecord_file: Path to file to write the ingestion result TFRecords.
    num_epochs: Number of training epochs.
    rank_k: Rank for matrix factorization in the movie environment; also
        the observation dimension.
    num_actions: Number of actions (movie items) to choose from.
    tikhonov_weight: LinUCB Tikhonov regularization weight of the Trainer.
    agent_alpha: LinUCB exploration parameter that multiplies the confidence
        intervals of the Trainer.
    """
    # Define time step and action specs for one batch.
    time_step_spec = trajectories.TimeStep(
        step_type=tensor_spec.TensorSpec(
            shape=(), dtype=tf.int32, name="step_type"),
        reward=tensor_spec.TensorSpec(
            shape=(), dtype=tf.float32, name="reward"),
        discount=tensor_spec.BoundedTensorSpec(
            shape=(), dtype=tf.float32, name="discount", minimum=0.,
            maximum=1.),
        observation=tensor_spec.TensorSpec(
            shape=(rank_k,), dtype=tf.float32,
            name="observation"))

    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(),
        dtype=tf.int32,
        name="action",
        minimum=0,
        maximum=num_actions - 1)

    # Define RL agent/algorithm.
    agent = lin_ucb_agent.LinearUCBAgent(
        time_step_spec=time_step_spec,
        action_spec=action_spec,
        tikhonov_weight=tikhonov_weight,
        alpha=agent_alpha,
        dtype=tf.float32,
        accepts_per_arm_features=per_arm)
    agent.initialize()
    logging.info("TimeStep Spec (for each batch):\n%s\n", agent.time_step_spec)
    logging.info("Action Spec (for each batch):\n%s\n", agent.action_spec)

    # Perform off-policy training.
    policy, _ = train_policy_on_trajectory(
        agent=agent,
        tfrecord_file=tfrecord_file,
        num_epochs=num_epochs)

    # Save trained policy.
    logging.info("saving policy")
    saver = policy_saver.PolicySaver(policy)
    saver.save(training_artifacts_dir)

execute_training_and_save_policy(
    training_artifacts_dir=training_artifacts_dir,
    tfrecord_file=tfrecord_file,
    num_epochs=num_epochs,
    rank_k=rank_k,
    num_actions=num_actions,
    tikhonov_weight=tikhonov_weight,
    agent_alpha=agent_alpha)

outputs = collections.namedtuple(
    "Outputs",
    ["training_artifacts_dir"])

print(outputs(training_artifacts_dir))

In [None]:
import json
import os
from typing import Dict, List

import fastapi

from google.cloud import pubsub_v1

import tensorflow as tf
import tf_agents
from tf_agents import policies

In [None]:
AIP_STORAGE_URI="gs://mlops-vertex-capgemini/artifacts"
tf.saved_model.load(AIP_STORAGE_URI)

### Simulate re-training pipeline on streaming data using pubsub and cloud functions

In [None]:
# Simulator parameters
SIMULATOR_PUBSUB_TOPIC = (
    "simulator-pubsub-topic" 
)
SIMULATOR_CLOUD_FUNCTION = (
    "simulator-cloud-function"  
)
SIMULATOR_SCHEDULER_JOB = (
    "simulator-scheduler-job"  
)
SIMULATOR_SCHEDULE = "*/5 * * * *"  
SIMULATOR_SCHEDULER_MESSAGE = (
    "simulator-message"  
)

In [None]:
! gcloud pubsub topics create $SIMULATOR_PUBSUB_TOPIC


In [None]:
scheduler_job_args = " ".join(
    [
        SIMULATOR_SCHEDULER_JOB,
        f"--schedule='{SIMULATOR_SCHEDULE}'",
        f"--topic={SIMULATOR_PUBSUB_TOPIC}",
        f"--message-body={SIMULATOR_SCHEDULER_MESSAGE}",
    ]
)

! echo $scheduler_job_args

In [None]:
! gcloud scheduler jobs create pubsub $scheduler_job_args


In [None]:
endpoints = ! gcloud beta ai endpoints list \
    --region=$REGION \
    --filter=display_name=$ENDPOINT_DISPLAY_NAME
print("\n".join(endpoints), "\n")

ENDPOINT_ID = endpoints[2].split(" ")[0]
print(f"ENDPOINT_ID={ENDPOINT_ID}")

In [None]:
BATCH_SIZE=8
RANK_K=20
NUM_ACTIONS=20

In [None]:
ENV_VARS = ",".join(
    [
        f"PROJECT_ID={PROJECT_ID}",
        f"REGION={REGION}",
        f"ENDPOINT_ID={ENDPOINT_ID}",
        f"RAW_DATA_PATH={RAW_DATA_PATH}",
        f"BATCH_SIZE={BATCH_SIZE}",
        f"RANK_K={RANK_K}",
        f"NUM_ACTIONS={NUM_ACTIONS}",
    ]
)

! echo $ENV_VARS

In [None]:
! gcloud functions deploy $SIMULATOR_CLOUD_FUNCTION \
    --region=$REGION \
    --trigger-topic=$SIMULATOR_PUBSUB_TOPIC \
    --runtime=python37 \
    --memory=512MB \
    --timeout=200s \
    --source=src/simulator \
    --entry-point=simulate \
    --stage-bucket=$BUCKET_NAME \
    --update-env-vars=$ENV_VARS

In [None]:
from google.cloud import aiplatform

In [None]:
! gcloud pubsub topics create $LOGGER_PUBSUB_TOPIC


In [None]:
ENV_VARS = ",".join(
    [
        f"PROJECT_ID={PROJECT_ID}",
        f"RAW_DATA_PATH={RAW_DATA_PATH}",
        f"BATCH_SIZE={BATCH_SIZE}",
        f"RANK_K={RANK_K}",
        f"NUM_ACTIONS={NUM_ACTIONS}",
        f"BIGQUERY_TMP_FILE={BIGQUERY_TMP_FILE}",
        f"BIGQUERY_DATASET_ID={BIGQUERY_DATASET_ID}",
        f"BIGQUERY_LOCATION={BIGQUERY_LOCATION}",
        f"BIGQUERY_TABLE_ID={BIGQUERY_TABLE_ID}",
    ]
)

! echo $ENV_VARS

In [None]:
! gcloud functions deploy $LOGGER_CLOUD_FUNCTION \
    --region=$REGION \
    --trigger-topic=$LOGGER_PUBSUB_TOPIC \
    --runtime=python37 \
    --memory=512MB \
    --timeout=200s \
    --source=src/logger \
    --entry-point=log_prediction_to_bigquery \
    --stage-bucket=$BUCKET_NAME \
    --update-env-vars=$ENV_VARS

In [None]:
TRIGGER_SCHEDULE = "*/30 * * * *"  # Schedule to trigger the pipeline. Eg. "*/30 * * * *" means every 30 mins.


In [None]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=f"{PIPELINE_NAME}-retraining")
def pipeline(
    # Pipeline configs
    project_id: str,
    training_artifacts_dir: str,

    # BigQuery configs
    bigquery_table_id: str,
    bigquery_max_rows: int = 10000,

    # TF-Agents RL configs
    rank_k: int = 20,
    num_actions: int = 20,
    num_epochs: int = 5,
    tikhonov_weight: float = 0.01,
    agent_alpha: float = 10) -> None:

  # Run the Ingester component.
  ingest_op = create_component_from_func(
      func=ingester_component.ingest_bigquery_dataset_into_tfrecord,
      output_component_file=f"ingester-{OUTPUT_COMPONENT_SPEC}",
      packages_to_install=[
          "google-cloud-bigquery==2.20.0",
          "tensorflow==2.5.0",
      ])
  ingest_task = ingest_op(
      project_id=project_id,
      bigquery_table_id=bigquery_table_id,
      bigquery_max_rows=bigquery_max_rows,
      tfrecord_file=TFRECORD_FILE)

  # Run the Trainer component and submit custom job to Vertex AI.
  train_op = create_component_from_func(
      func=trainer_component.training_op,
      output_component_file=f"trainer-{OUTPUT_COMPONENT_SPEC}",
      packages_to_install=[
          "tensorflow==2.5.0",
          "tf-agents==0.8.0",
      ])
  train_task = train_op(
      training_artifacts_dir=training_artifacts_dir,
      tfrecord_file=ingest_task.outputs["tfrecord_file"],
      num_epochs=num_epochs,
      rank_k=rank_k,
      num_actions=num_actions,
      tikhonov_weight=tikhonov_weight,
      agent_alpha=agent_alpha)

  worker_pool_specs = [
      {
          "containerSpec": {
              "imageUri":train_task.container.image,
          },
          "replicaCount": TRAINING_REPLICA_COUNT,
          "machineSpec": {
              "machineType": TRAINING_MACHINE_TYPE,
              "acceleratorType": TRAINING_ACCELERATOR_TYPE,
              "acceleratorCount": TRAINING_ACCELERATOR_COUNT,
          },
      },
  ]
  train_task.custom_job_spec = {
      "displayName": train_task.name,
      "jobSpec": {
          "workerPoolSpecs": worker_pool_specs,
      }
  }

  # Run the Deployer components.
  # Upload the trained policy as a model.
  model_upload_op = gcc_aip.ModelUploadOp(
      project=project_id,
      display_name=TRAINED_POLICY_DISPLAY_NAME,
      artifact_uri=training_artifacts_dir,
      serving_container_image_uri=f"gcr.io/{PROJECT_ID}/{PREDICTION_CONTAINER}:latest",
  )
  # Model uploading has to occur after training completes.
  model_upload_op.after(train_task)
  # Create a Vertex AI endpoint. (This operation can occur in parallel with
  # the Generator, Ingester, Trainer components.)
  endpoint_create_op = gcc_aip.EndpointCreateOp(
      project=project_id,
      display_name=ENDPOINT_DISPLAY_NAME)
  # Deploy the uploaded, trained policy to the created endpoint. (This operation
  # has to occur after both model uploading and endpoint creation complete.)
  model_deploy_op = gcc_aip.ModelDeployOp(
      project=project_id,
      endpoint=endpoint_create_op.outputs["endpoint"],
      model=model_upload_op.outputs["model"],
      deployed_model_display_name=TRAINED_POLICY_DISPLAY_NAME,
      machine_type=ENDPOINT_MACHINE_TYPE)

In [None]:
# Compile the authored pipeline.
compiler.Compiler().compile(pipeline_func=pipeline,                                                     
                            package_path=PIPELINE_SPEC_PATH)

# Createa Vertex AI client.
api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION)

# Schedule a recurring pipeline.
response = api_client.create_schedule_from_job_spec(
    job_spec_path=PIPELINE_SPEC_PATH,
    schedule=TRIGGER_SCHEDULE,
    parameter_values={
        # Pipeline configs
        "project_id": PROJECT_ID,
        "training_artifacts_dir": TRAINING_ARTIFACTS_DIR,

        # BigQuery config
        "bigquery_table_id": BIGQUERY_TABLE_ID,
    })
response["name"]

In [None]:
# # # Delete endpoint resource.
# ! gcloud ai endpoints delete $ENDPOINT_ID --quiet --region $REGION

# # # Delete Pub/Sub topics.
# ! gcloud pubsub topics delete $SIMULATOR_PUBSUB_TOPIC --quiet
# ! gcloud pubsub topics delete $LOGGER_PUBSUB_TOPIC --quiet

# # Delete Cloud Functions.
# ! gcloud functions delete $SIMULATOR_CLOUD_FUNCTION --quiet
# ! gcloud functions delete $LOGGER_CLOUD_FUNCTION --quiet

# # Delete Scheduler job.
# ! gcloud scheduler jobs delete $SIMULATOR_SCHEDULER_JOB --quiet

# Delete Cloud Storage objects that were created.
# ! gsutil -m rm -r $PIPELINE_ROOT
# ! gsutil -m rm -r $TRAINING_ARTIFACTS_DIR