# Global Settings and Imports

## Import packages

In [None]:
%reload_ext autoreload
%autoreload 2

In [None]:
import os
import sys

sys.path.insert(0, '..')

from datetime import datetime

import seq_rec.utils.custom_logging

In [None]:
# This is for variable interpolation from notebook to writefile command (used when write the cloud function definition)
from IPython.core.magic import register_line_cell_magic

@register_line_cell_magic
def writetemplate(line, cell):
    with open(line, 'w') as f:
        f.write(cell.format(**globals()))

In [None]:
import google.cloud.aiplatform as aip
from kfp.v2 import dsl
import kfp
from kfp.v2.dsl import Artifact, Dataset

In [None]:
import seq_rec.utils as utils

In [None]:
from seq_rec.op.alert import slack_noti_exit_op, slack_noti_op, record_job_status_op, record_last_checkpoint_date_op
from seq_rec.op.commons import copy_output_to_gcs_op
from seq_rec.op.ingest import detect_resume_training_op, tf_download_bq_table_op
from seq_rec.op.data_validation import generate_tfdv_schema_op, validate_tfdv_schema_op
from seq_rec.op.preprocess import preprocess_op
from seq_rec.op.model import model_op
from seq_rec.op.evaluate import evaluate_op
from seq_rec.op.deploy import deploy_model_to_gcp_endpoint_op, update_user_recent_txn_in_recommend_api_op

## Parameters

In [None]:
HYDRA_CONFIG_PATH = '../seq_rec/conf/'
COUNTRY_CODE = 'SG'
cfg = utils.load_cfg(HYDRA_CONFIG_PATH)

# Google Cloud
PROJECT_ID = cfg.env.gcp.project_id
REGION = cfg.env.pipeline.kubeflow.region
BUCKET_NAME = cfg.env.pipeline.kubeflow.bucket_name
BUCKET_URL = cfg.env.pipeline.kubeflow.bucket_url
BUCKET_DIR = cfg.env.pipeline.kubeflow.bucket_dir

SERVICE_ACCOUNT_EMAIL = cfg.env.gcp.service_account_email

VERSION = "0.2.0"
VERSION_NODOT = VERSION.replace(".", "")
PIPELINE_NAME = f"seq_rec"
PIPELINE_ROOT_PREFIX = f"{BUCKET_URL}/pipeline_root/{PIPELINE_NAME}/{VERSION_NODOT}"

JOB_STATUS_FILE_NAME = "job_status.txt"

PIPELINE_STAKEHOLDERS_SLACK_UIDS = {
    "Quy MLE": "<EXAMPLE_SLACK_UID>"  # TODO: Retrieve from secrets manager
}

# Model params
RANDOM_SEED = 13  # Set None for non-deterministic result

# Define Pipeline

## Full Retraining

In [None]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT_PREFIX,
    # A name for the pipeline. Use to determine the pipeline Context.
    name=f"{PIPELINE_NAME}{COUNTRY_CODE.lower()}fullretraining",
)
def pipeline_full_retraining(
        deployment_threshold: dict,
        resume_training: bool = False
    ):
    """ Define the pipeline components for full retraining

    Args:
        deployment_threshold (dict): dictionary of metrics and thresholds to deploy
        resume_training (bool): whether to pick up the model state from previous train and continue training
    """
    run_id = dsl.PIPELINE_JOB_ID_PLACEHOLDER
    run_name = dsl.PIPELINE_JOB_NAME_PLACEHOLDER
    resource_name = dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER
    run_status = '{{$.pipeline_job_status}} - {{$.pipeline_status}} - {{$.pipeline_worlflow_status}}'
    url = f"https://console.cloud.google.com/vertex-ai/locations/{REGION}/pipelines/runs/{run_name}?project={PROJECT_ID}"

    country_code = COUNTRY_CODE
    model_blob_name = f"{BUCKET_DIR}/{VERSION}/{country_code}/model/"
    checkpoint_blob_name = f"{BUCKET_DIR}/{VERSION}/{country_code}/checkpoint/"
    pipeline_root_full = os.path.join(PIPELINE_ROOT_PREFIX, country_code, "full_retraining")
    last_checkpoint_date_blob_name = os.path.join(checkpoint_blob_name, "last_checkpoint_date.txt")
    ENV = 'prod'
    
    exit_text = (
        f"*Run name:* <{url}|{run_name}>",
        f"*Run ID:* {run_id}",
    )
    exit_text = '\n'.join(exit_text)
    message = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "Kubeflow pipeline has completed!",
                "emoji": True
            }
        },
        {
            "type": "divider"
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": exit_text
            }
        }
    ]
    
    slack_noti_exit_task = slack_noti_exit_op(
        webhook_url=os.environ.get('SLACK_INCOMING_WEBHOOK'),  # Please declare the SLACK_INCOMING_WEBHOOK as a var in .env file
        message=message,
        run_name=run_name,
        job_status_file_name=JOB_STATUS_FILE_NAME,
        bucket_url=BUCKET_URL,
        folder=BUCKET_DIR,
        pipeline_stakeholders_slack_uids=PIPELINE_STAKEHOLDERS_SLACK_UIDS
    ).set_display_name("Exit Handler Slack Noti")
    
    # Could not get the job status via pipeline utils so create a small hack here
    # The below task should be triggered after the final tasks has completed
    record_job_status_task = (
        record_job_status_op(
            run_name=run_name,
            job_status_file_name=JOB_STATUS_FILE_NAME,
            bucket_name=BUCKET_NAME,
            folder=BUCKET_DIR,
        )
        .set_display_name("Record Job Status")
    )
    # Since the input of this task does not change as we overwrite the same file, we need to disable caching for this task.
    # record_job_status_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    record_job_status_task.set_caching_options(False)

    with dsl.ExitHandler(slack_noti_exit_task):
        detect_resume_training_task = (
            detect_resume_training_op(
                resume_training=resume_training,
                checkpoint_bucket=BUCKET_NAME,
                last_checkpoint_date_blob_name=last_checkpoint_date_blob_name
            )
            .set_display_name("Detect and Prepare to Resume Training")
            .set_caching_options(False)
        )

        bq_download_train_test_task = (
            tf_download_bq_table_op(
                resume_training=detect_resume_training_task.outputs['resume_training'],
                last_checkpoint_date=detect_resume_training_task.outputs['last_checkpoint_date'],
                country_code=country_code
            )
            .set_cpu_limit("4")
            .set_memory_limit("16G")
            .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
            .set_gpu_limit("1")
            .set_display_name("Download Train and Test Data for Evaluation")
            .set_caching_options(False)
        )

        gen_schema_task = (
            generate_tfdv_schema_op(
                x_train_file=bq_download_train_test_task.outputs['raw_train_ds_file']
            )
            .set_display_name("Generate Schema")
            .set_caching_options(False)
        )

        save_schema_if_not_exist_task = (
            copy_output_to_gcs_op(
                output_obj=gen_schema_task.outputs['X_train_schema_file'],
                source_bucket_name=BUCKET_NAME,
                destination_bucket_name=BUCKET_NAME,
                destination_blob_name=f"{BUCKET_DIR}/X_train_schema_file",
                overwrite_if_exists=False
            )
            .set_display_name("Save Schema For Future Run Validation if not Exists")
            .set_caching_options(False)
        )

        gcs_import_schema_task = (
            kfp.dsl.importer(
                artifact_uri=f"gs://{BUCKET_NAME}/{BUCKET_DIR}/X_train_schema_file",
                artifact_class=Artifact,
                reimport=False,
        )
            .set_display_name("Import Base Schema")
            .after(save_schema_if_not_exist_task)
            .set_caching_options(False)
        )

        validate_schema_task = (
            validate_tfdv_schema_op(
                x_train_schema_base_file=gcs_import_schema_task.output,
                x_train_file=bq_download_train_test_task.outputs['raw_train_ds_file']
            )
            .set_cpu_limit("2")
            .set_memory_limit("8G")
            .set_display_name("Validate Schema, Drift and Skewness")
            .set_caching_options(False)
        )

        with dsl.Condition(
            validate_schema_task.outputs["is_abnormal"] == "true",
            name="Input Is Abnormal",
        ):
            text_stakeholders = (f"<@{slack_uid}>" for slack_uid in PIPELINE_STAKEHOLDERS_SLACK_UIDS.values())
            text_stakeholders = ' '.join(text_stakeholders)
            text = (
                f"*Run name:* <{url}|{run_name}>",
                f"*Run ID:* {run_id}",
                f"*Stakeholders:* {text_stakeholders}. Where are you now?"
            )
            text = '\n'.join(text)
            message = [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": ":exclamation: Kubeflow pipeline has abnormal input!",
                        "emoji": True
                    }
                },
                {
                    "type": "divider"
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": text
                    }
                }
            ]
            slack_noti_task = slack_noti_op(
                webhook_url=os.environ.get('SLACK_INCOMING_WEBHOOK'),
                message=message
            ).set_display_name("Input Abnormal Slack Noti")

        preprocess_train_task = (
            preprocess_op(
                bq_download_train_test_task.outputs['raw_train_ds_file'],
                country_code=country_code
            )
            .set_cpu_limit("4")
            .set_memory_limit("16G")
            .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
            .set_gpu_limit("1")
            .set_display_name(f"Preprocess Train Data for {country_code}")
            .set_caching_options(False)
        )

        preprocess_test_task = (
            preprocess_op(
                bq_download_train_test_task.outputs['raw_test_ds_file'],
                country_code=country_code
            )
            .set_cpu_limit("4")
            .set_memory_limit("16G")
            .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
            .set_gpu_limit("1")
            .set_display_name(f"Preprocess Test Data for {country_code}")
            .set_caching_options(False)
        )

        gcs_import_checkpoint_task = (
            kfp.dsl.importer(
                artifact_uri=f"gs://{BUCKET_NAME}/{checkpoint_blob_name}",
                artifact_class=Artifact,
                reimport=False,
        )
            .set_display_name("Import Checkpoint")
            .set_caching_options(False)
        )

        model_task = (
            model_op(
                train_prep_ds_file=preprocess_train_task.outputs['prep_ds_file'],
                test_prep_ds_file=preprocess_test_task.outputs['prep_ds_file'],
                merchant_vocab_file=preprocess_train_task.outputs['merchant_vocab_file'],
                search_vocab_file=preprocess_train_task.outputs['search_vocab_file'],
                checkpoint_dir=gcs_import_checkpoint_task.output,
                resume_training=detect_resume_training_task.outputs['resume_training'],
                country_code=country_code
            )
            .set_cpu_limit("4")
            .set_memory_limit("16G")
            .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
            .set_gpu_limit("1")
            .set_display_name(f"Model {country_code}")
            .set_caching_options(False)
        )

        evaluate_task = (
            evaluate_op(
                model_dir=model_task.outputs['model_output_dir'],
                test_prep_ds_file=preprocess_test_task.outputs['prep_ds_file'],
                merchant_vocab_file=preprocess_train_task.outputs['merchant_vocab_file'],
                country_code=country_code,
                k=10,
                deployment_threshold=deployment_threshold,
            )
            .set_cpu_limit("4")
            .set_memory_limit("16G")
            .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
            .set_gpu_limit("1")
            .set_display_name(f"Evaluate {country_code}")
            .set_caching_options(False)
        )

        with dsl.Condition(
            evaluate_task.outputs["dep_decision"] == "false",
            name="Evaluation Reports Degraded Accuracy! Deployment Cancelled!",
        ):
            text_stakeholders = (f"<@{slack_uid}>" for slack_uid in PIPELINE_STAKEHOLDERS_SLACK_UIDS.values())
            text_stakeholders = ' '.join(text_stakeholders)
            text = (
                f"*Run name:* <{url}|{run_name}>",
                f"*Run ID:* {run_id}",
                f"*Stakeholders:* {text_stakeholders}. Where are you now?"
            )
            text = '\n'.join(text)
            message = [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": ":exclamation: Evaluation reports degraded accuracy! Deployment cancelled!",
                        "emoji": True
                    }
                },
                {
                    "type": "divider"
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": text
                    }
                }
            ]
            slack_noti_task = slack_noti_op(
                webhook_url=os.environ.get('SLACK_INCOMING_WEBHOOK'),
                message=message
            ).set_display_name("Accuracy Degraded Slack Noti")
        
        with dsl.Condition(
            evaluate_task.outputs["dep_decision"] == "true",
            name="Deploy Decision",
        ):
            preprocess_full_task = (
                preprocess_op(
                    bq_download_train_test_task.outputs['raw_full_ds_file'],
                    country_code=country_code
                )
                .set_cpu_limit("4")
                .set_memory_limit("16G")
                .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                .set_gpu_limit("1")
                .set_display_name(f"Preprocess Full Training Data for {country_code}")
                .set_caching_options(False)
            )
            
            model_full_data_task = (
                model_op(
                    train_prep_ds_file=preprocess_full_task.outputs['prep_ds_file'],
                    test_prep_ds_file=preprocess_test_task.outputs['prep_ds_file'],
                    # Get the vocabs from preprocess_train_task not preprocess_full_task so that we can resume training
                    merchant_vocab_file=preprocess_train_task.outputs['merchant_vocab_file'],
                    search_vocab_file=preprocess_train_task.outputs['search_vocab_file'],
                    checkpoint_dir=gcs_import_checkpoint_task.output,
                    resume_training=True,
                    country_code=country_code,
                    cfg_overrides=['tfrs_gru.epochs=5']  # Limit 5 epochs to prevent overfit
                )
                .set_cpu_limit("4")
                .set_memory_limit("16G")
                .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                .set_gpu_limit("1")
                .set_display_name(f"Model {country_code}")
                .set_caching_options(False)
            )

            persist_model_task = (
                copy_output_to_gcs_op(
                    output_obj=model_full_data_task.outputs['model_output_dir'],
                    source_bucket_name=BUCKET_NAME,
                    destination_bucket_name=BUCKET_NAME,
                    destination_blob_name=model_blob_name,
                    overwrite_if_exists=True
                )
                .set_display_name("Save Model Artifacts to GCS")
                .set_caching_options(False)
            )

            deploy_model_task = (
                deploy_model_to_gcp_endpoint_op(
                    model_bucket_name=BUCKET_NAME,
                    model_blob_name=model_blob_name,
                    model_name='seq_rec',
                    model_version=VERSION,
                    country_code=country_code,
                    endpoint_id="",
                    traffic_split={"0": 100},
                    undeploy_zero_traffic_models=True
                )
                .set_display_name("Deploy Model to GCP Endpoint")
                .after(persist_model_task)
                .set_caching_options(False)
            )
            
            persist_merchant_vocab_task = (
                copy_output_to_gcs_op(
                    output_obj=preprocess_train_task.outputs['merchant_vocab_file'],
                    source_bucket_name=BUCKET_NAME,
                    destination_bucket_name=BUCKET_NAME,
                    destination_blob_name=os.path.join(model_blob_name, "merchant_vocab"),
                    overwrite_if_exists=True
                )
                .set_display_name("Save Merchant Vocab to GCS")
                .after(deploy_model_task)
                .set_caching_options(False)
            )
            
            persist_search_vocab_task = (
                copy_output_to_gcs_op(
                    output_obj=preprocess_train_task.outputs['search_vocab_file'],
                    source_bucket_name=BUCKET_NAME,
                    destination_bucket_name=BUCKET_NAME,
                    destination_blob_name=os.path.join(model_blob_name, "search_vocab"),
                    overwrite_if_exists=True
                )
                .set_display_name("Save Search Vocab to GCS")
                .after(deploy_model_task)
                .set_caching_options(False)
            )

            update_user_recent_txn_in_recommend_api_task = (
                update_user_recent_txn_in_recommend_api_op(
                    api_key=os.environ.get(f'RECOMMEND_{ENV.upper()}_API_KEY'),
                    env=ENV
                )
                .after(deploy_model_task)
                .set_display_name(f"Update User Recent Txn in {ENV} recommend Endpoint")
                .set_caching_options(False)
            )
            
            record_last_checkpoint_date_task = (
                record_last_checkpoint_date_op(
                    checkpoint_bucket=BUCKET_NAME,
                    last_checkpoint_date_blob_name=last_checkpoint_date_blob_name
                )
                .set_display_name("Record Last Checkpoint Date")
                .after(update_user_recent_txn_in_recommend_api_task)
                .set_caching_options(False)
            )

        record_job_status_task.after(deploy_model_task)
        record_job_status_task.after(persist_merchant_vocab_task)
        record_job_status_task.after(persist_search_vocab_task)
        record_job_status_task.after(evaluate_task)


### Compile the pipeline

In [None]:
TRAINING_MODE = 'full_retraining'
PIPELINE_ROOT_FULL = os.path.join(PIPELINE_ROOT_PREFIX, COUNTRY_CODE, TRAINING_MODE)
COMPILED_PIPELINE_FILENAME = f"recsys_{PIPELINE_NAME}_{COUNTRY_CODE}_{TRAINING_MODE}_pipeline.json"
DISPLAY_NAME = f"recsys_{PIPELINE_NAME}_{COUNTRY_CODE}_{VERSION}_{TRAINING_MODE}_pipeline"
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline_full_retraining, package_path=COMPILED_PIPELINE_FILENAME
)

### Run the pipeline

In [None]:
job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=COMPILED_PIPELINE_FILENAME,
    pipeline_root=PIPELINE_ROOT_FULL,
    parameter_values={
        "deployment_threshold": {
            "hit_rate": 0.1
        },
        "resume_training": False
    },
    location=REGION
)

In [None]:
job.submit(service_account=SERVICE_ACCOUNT_EMAIL)

### Schedule

#### Upload pipeline job to GCS

In [None]:
os.environ['TRAINING_MODE'] = TRAINING_MODE
os.environ['COUNTRY_CODE'] = COUNTRY_CODE
os.environ['COMPILED_PIPELINE_FILENAME'] = COMPILED_PIPELINE_FILENAME
os.environ['PIPELINE_ROOT'] = PIPELINE_ROOT_FULL
os.environ['SERVICE_ACCOUNT_EMAIL'] = SERVICE_ACCOUNT_EMAIL
os.environ['REGION'] = REGION

In [None]:
%%bash

gsutil cp ${COMPILED_PIPELINE_FILENAME} ${PIPELINE_ROOT}/${COMPILED_PIPELINE_FILENAME}

echo ${PIPELINE_ROOT}/${COMPILED_PIPELINE_FILENAME}

#### Create Cloud Functions to submit job to Vertex AI Pipelines

In [None]:
print(
    PROJECT_ID,
    REGION,
    PIPELINE_ROOT_FULL,
    SERVICE_ACCOUNT_EMAIL,
    DISPLAY_NAME,
    sep='\n'
)

In [None]:
%%writetemplate main.py

import json
from google.cloud import aiplatform
import functions_framework

PROJECT_ID = "{PROJECT_ID}"
REGION = "{REGION}"
PIPELINE_ROOT = "{PIPELINE_ROOT_FULL}"
SERVICE_ACCOUNT_EMAIL = "{SERVICE_ACCOUNT_EMAIL}"
DISPLAY_NAME = "{DISPLAY_NAME}"

@functions_framework.http
def recsys_seq_rec_{COUNTRY_CODE}_{TRAINING_MODE}_vertex_ai_pipeline_job(request):
    """Processes the incoming HTTP request.

    Args:
     request (flask.Request): HTTP request object.

    Returns:
     The response text or any set of values that can be turned into a Response
     object using `make_response
     <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    import subprocess
    import os
    
    # decode http request payload and translate into JSON object
    request_str = request.data.decode('utf-8')
    request_json = json.loads(request_str)

    pipeline_spec_uri = request_json['pipeline_spec_uri']
    parameter_values = request_json['parameter_values']

    aiplatform.init(
        project=PROJECT_ID,
        location=REGION
    )

    job = aiplatform.PipelineJob(
        display_name=DISPLAY_NAME,
        template_path=pipeline_spec_uri,
        pipeline_root=PIPELINE_ROOT,
        enable_caching=False,
        parameter_values=parameter_values,
    )

    job.submit(service_account=SERVICE_ACCOUNT_EMAIL)

    return "Job submitted"

In [None]:
%%writefile requirements.txt

google-api-python-client>=1.7.8,<2
google-cloud-aiplatform
google-cloud-storage
PyYAML

In [None]:
FUNCTION_DIR = '../functions/deploy_pipeline'

os.environ['FUNCTION_DIR'] = FUNCTION_DIR

In [None]:
%%bash

mkdir -p $FUNCTION_DIR
mv main.py $FUNCTION_DIR
mv requirements.txt $FUNCTION_DIR

In [None]:
%%bash

cd $FUNCTION_DIR

gcloud functions deploy recsys_seq_rec_${COUNTRY_CODE}_${TRAINING_MODE}_vertex_ai_pipeline_job \
    --runtime python39 --trigger-http \
    --service-account $SERVICE_ACCOUNT_EMAIL

#### Cloud Scheduler

In [None]:
%%bash

read -r -d '' message_body << EOM
{
    "pipeline_spec_uri": "${PIPELINE_ROOT}/${COMPILED_PIPELINE_FILENAME}",
    "parameter_values": {
        "deployment_threshold": {
            "hit_rate": 0.10
        },
        "resume_training": false
    }
}
EOM

# Run at 1:00AM every Monday
cron="0 1 * * 1"

# https://cloud.google.com/scheduler/docs/http-target-auth#using-gcloud

gcloud scheduler jobs create http recsys_seq_rec_${COUNTRY_CODE}_${TRAINING_MODE}_vertex_ai_pipeline_job \
    --schedule="$cron" \
    --uri="https://us-central1-seq-rec-gcp-project-id.cloudfunctions.net/recsys_seq_rec_${COUNTRY_CODE}_${TRAINING_MODE}_vertex_ai_pipeline_job" \
    --http-method=POST \
    --message-body="$message_body" \
    --time-zone="Asia/Singapore" \
    --oidc-service-account-email=$SERVICE_ACCOUNT_EMAIL \
    --location=$REGION

## Incremental Retraining

In [None]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT_PREFIX,
    # A name for the pipeline. Use to determine the pipeline Context.
    name=f"{PIPELINE_NAME}{COUNTRY_CODE.lower()}incrementalretraining",
)
def pipeline_incremental_retraining(
        deployment_threshold: dict,
        resume_training: bool = True,
    ):
    """ Define the pipeline components for incremental retraining

    Args:
        deployment_threshold (dict): dictionary of metrics and thresholds to deploy
        resume_training (bool): whether to pick up the model state from previous train and continue training
    """
    run_id = dsl.PIPELINE_JOB_ID_PLACEHOLDER
    run_name = dsl.PIPELINE_JOB_NAME_PLACEHOLDER
    resource_name = dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER
    run_status = '{{$.pipeline_job_status}} - {{$.pipeline_status}} - {{$.pipeline_worlflow_status}}'
    url = f"https://console.cloud.google.com/vertex-ai/locations/{REGION}/pipelines/runs/{run_name}?project={PROJECT_ID}"
    
    country_code = COUNTRY_CODE
    model_blob_name = f"{BUCKET_DIR}/{VERSION}/{country_code}/model/"
    checkpoint_blob_name = f"{BUCKET_DIR}/{VERSION}/{country_code}/checkpoint/"
    pipeline_root_full = os.path.join(PIPELINE_ROOT_PREFIX, country_code, "incremental_retraining")
    last_checkpoint_date_blob_name = os.path.join(checkpoint_blob_name, "last_checkpoint_date.txt")
    
    exit_text = (
        f"*Run name:* <{url}|{run_name}>",
        f"*Run ID:* {run_id}",
    )
    exit_text = '\n'.join(exit_text)
    message = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": "Kubeflow pipeline has completed!",
                "emoji": True
            }
        },
        {
            "type": "divider"
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": exit_text
            }
        }
    ]
    
    slack_noti_exit_task = slack_noti_exit_op(
        webhook_url=os.environ.get('SLACK_INCOMING_WEBHOOK'),  # Please declare the SLACK_INCOMING_WEBHOOK as a var in .env file
        message=message,
        run_name=run_name,
        job_status_file_name=JOB_STATUS_FILE_NAME,
        bucket_url=BUCKET_URL,
        folder=BUCKET_DIR,
        pipeline_stakeholders_slack_uids=PIPELINE_STAKEHOLDERS_SLACK_UIDS
    ).set_display_name("Exit Handler Slack Noti")
    
    # Could not get the job status via pipeline utils so create a small hack here
    # The below task should be triggered after the final tasks has completed
    record_job_status_task = (
        record_job_status_op(
            run_name=run_name,
            job_status_file_name=JOB_STATUS_FILE_NAME,
            bucket_name=BUCKET_NAME,
            folder=BUCKET_DIR,
        )
        .set_display_name("Record Job Status")
    )
    # Since the input of this task does not change as we overwrite the same file, we need to disable caching for this task.
    # record_job_status_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    record_job_status_task.set_caching_options(False)

    with dsl.ExitHandler(slack_noti_exit_task):
        detect_resume_training_task = (
            detect_resume_training_op(
                resume_training=resume_training,
                checkpoint_bucket=BUCKET_NAME,
                last_checkpoint_date_blob_name=last_checkpoint_date_blob_name
            )
            .set_display_name("Detect and Prepare to Resume Training")
            .set_caching_options(False)
        )

        with dsl.Condition(
            detect_resume_training_task.outputs["start_training"] == "true",
            name="Start Training",
        ):
            bq_download_train_test_task = (
                tf_download_bq_table_op(
                    resume_training=detect_resume_training_task.outputs['resume_training'],
                    last_checkpoint_date=detect_resume_training_task.outputs['last_checkpoint_date'],
                    country_code=country_code
                )
                .set_cpu_limit("4")
                .set_memory_limit("16G")
                .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                .set_gpu_limit("1")
                .set_display_name("Download Train and Test Data for Evaluation")
                .set_caching_options(False)
            )

            gen_schema_task = (
                generate_tfdv_schema_op(
                    x_train_file=bq_download_train_test_task.outputs['raw_train_ds_file']
                )
                .set_display_name("Generate Schema")
                .set_caching_options(False)
            )

            save_schema_if_not_exist_task = (
                copy_output_to_gcs_op(
                    output_obj=gen_schema_task.outputs['X_train_schema_file'],
                    source_bucket_name=BUCKET_NAME,
                    destination_bucket_name=BUCKET_NAME,
                    destination_blob_name=f"{BUCKET_DIR}/X_train_schema_file",
                    overwrite_if_exists=False
                )
                .set_display_name("Save Schema For Future Run Validation if not Exists")
                .set_caching_options(False)
            )

            gcs_import_schema_task = (
                kfp.dsl.importer(
                    artifact_uri=f"gs://{BUCKET_NAME}/{BUCKET_DIR}/X_train_schema_file",
                    artifact_class=Artifact,
                    reimport=False,
            )
                .set_display_name("Import Base Schema")
                .after(save_schema_if_not_exist_task)
                .set_caching_options(False)
            )

            validate_schema_task = (
                validate_tfdv_schema_op(
                    x_train_schema_base_file=gcs_import_schema_task.output,
                    x_train_file=bq_download_train_test_task.outputs['raw_train_ds_file']
                )
                .set_cpu_limit("2")
                .set_memory_limit("8G")
                .set_display_name("Validate Schema, Drift and Skewness")
                .set_caching_options(False)
            )

            with dsl.Condition(
                validate_schema_task.outputs["is_abnormal"] == "true",
                name="Input Is Abnormal",
            ):
                text_stakeholders = (f"<@{slack_uid}>" for slack_uid in PIPELINE_STAKEHOLDERS_SLACK_UIDS.values())
                text_stakeholders = ' '.join(text_stakeholders)
                text = (
                    f"*Run name:* <{url}|{run_name}>",
                    f"*Run ID:* {run_id}",
                    f"*Stakeholders:* {text_stakeholders}. Where are you now?"
                )
                text = '\n'.join(text)
                message = [
                    {
                        "type": "header",
                        "text": {
                            "type": "plain_text",
                            "text": ":exclamation: Kubeflow pipeline has abnormal input!",
                            "emoji": True
                        }
                    },
                    {
                        "type": "divider"
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": text
                        }
                    }
                ]
                slack_noti_task = slack_noti_op(
                    webhook_url=os.environ.get('SLACK_INCOMING_WEBHOOK'),
                    message=message
                ).set_display_name("Input Abnormal Slack Noti")

            preprocess_train_task = (
                preprocess_op(
                    bq_download_train_test_task.outputs['raw_train_ds_file'],
                    country_code=country_code
                )
                .set_cpu_limit("4")
                .set_memory_limit("16G")
                .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                .set_gpu_limit("1")
                .set_display_name(f"Preprocess Train Data for {country_code}")
                .set_caching_options(False)
            )

            preprocess_test_task = (
                preprocess_op(
                    bq_download_train_test_task.outputs['raw_test_ds_file'],
                    country_code=country_code
                )
                .set_cpu_limit("4")
                .set_memory_limit("16G")
                .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                .set_gpu_limit("1")
                .set_display_name(f"Preprocess Test Data for {country_code}")
                .set_caching_options(False)
            )

            gcs_import_checkpoint_task = (
                kfp.dsl.importer(
                    artifact_uri=f"gs://{BUCKET_NAME}/{checkpoint_blob_name}",
                    artifact_class=Artifact,
                    reimport=False,
            )
                .set_display_name("Import Checkpoint")
                .set_caching_options(False)
            )

            gcs_import_merchant_vocab_task = (
                kfp.dsl.importer(
                    artifact_uri=os.path.join("gs://", BUCKET_NAME, model_blob_name, "merchant_vocab"),
                    artifact_class=Dataset,
                    reimport=False,
            )
                .set_display_name("Import Checkpoint Merchant Vocab")
                .set_caching_options(False)
            )

            gcs_import_search_vocab_task = (
                kfp.dsl.importer(
                    artifact_uri=os.path.join("gs://", BUCKET_NAME, model_blob_name, "search_vocab"),
                    artifact_class=Dataset,
                    reimport=False,
            )
                .set_display_name("Import Checkpoint Search Vocab")
                .set_caching_options(False)
            )

            merchant_vocab_artifact = gcs_import_merchant_vocab_task.output
            search_vocab_artifact = gcs_import_search_vocab_task.output

            model_task = (
                model_op(
                    train_prep_ds_file=preprocess_train_task.outputs['prep_ds_file'],
                    test_prep_ds_file=preprocess_test_task.outputs['prep_ds_file'],
                    merchant_vocab_file=merchant_vocab_artifact,
                    search_vocab_file=search_vocab_artifact,
                    checkpoint_dir=gcs_import_checkpoint_task.output,
                    resume_training=detect_resume_training_task.outputs['resume_training'],
                    country_code=country_code,
                    cfg_overrides=['tfrs_gru.epochs=1']  # Limit 1 epochs to prevent overfit
                )
                .set_cpu_limit("4")
                .set_memory_limit("16G")
                .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                .set_gpu_limit("1")
                .set_display_name(f"Model {country_code}")
                .set_caching_options(False)
            )

            evaluate_task = (
                evaluate_op(
                    model_dir=model_task.outputs['model_output_dir'],
                    test_prep_ds_file=preprocess_test_task.outputs['prep_ds_file'],
                    merchant_vocab_file=merchant_vocab_artifact,
                    country_code=country_code,
                    k=10,
                    deployment_threshold=deployment_threshold,
                )
                .set_cpu_limit("4")
                .set_memory_limit("16G")
                .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                .set_gpu_limit("1")
                .set_display_name(f"Evaluate {country_code}")
                .set_caching_options(False)
            )

            with dsl.Condition(
                evaluate_task.outputs["dep_decision"] == "false",
                name="Evaluation Reports Degraded Accuracy! Deployment Cancelled!",
            ):
                text_stakeholders = (f"<@{slack_uid}>" for slack_uid in PIPELINE_STAKEHOLDERS_SLACK_UIDS.values())
                text_stakeholders = ' '.join(text_stakeholders)
                text = (
                    f"*Run name:* <{url}|{run_name}>",
                    f"*Run ID:* {run_id}",
                    f"*Stakeholders:* {text_stakeholders}. Where are you now?"
                )
                text = '\n'.join(text)
                message = [
                    {
                        "type": "header",
                        "text": {
                            "type": "plain_text",
                            "text": ":exclamation: Evaluation reports degraded accuracy! Deployment cancelled!",
                            "emoji": True
                        }
                    },
                    {
                        "type": "divider"
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": text
                        }
                    }
                ]
                slack_noti_task = slack_noti_op(
                    webhook_url=os.environ.get('SLACK_INCOMING_WEBHOOK'),
                    message=message
                ).set_display_name("Accuracy Degraded Slack Noti")

            with dsl.Condition(
                evaluate_task.outputs["dep_decision"] == "true",
                name="Deploy Decision",
            ):
                preprocess_full_task = (
                    preprocess_op(
                        bq_download_train_test_task.outputs['raw_full_ds_file'],
                        country_code=country_code
                    )
                    .set_cpu_limit("4")
                    .set_memory_limit("16G")
                    .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                    .set_gpu_limit("1")
                    .set_display_name(f"Preprocess Full Training Data for {country_code}")
                    .set_caching_options(False)
                )

                model_full_data_task = (
                    model_op(
                        # Use test dataset here because this is incremental training, previously model has already been trained on train data
                        train_prep_ds_file=preprocess_test_task.outputs['prep_ds_file'],
                        # Use preprocess_train_task here as mock, since use_val_ds = False we don't really use this dataset
                        # However, if set it to preprocess_test_task then weird error train_prep_ds_file will be None.
                        test_prep_ds_file=preprocess_train_task.outputs['prep_ds_file'],
                        use_val_ds=False,
                        merchant_vocab_file=merchant_vocab_artifact,
                        search_vocab_file=search_vocab_artifact,
                        checkpoint_dir=gcs_import_checkpoint_task.output,
                        resume_training=detect_resume_training_task.outputs['resume_training'],
                        country_code=country_code,
                        cfg_overrides=['tfrs_gru.epochs=5']  # Limit 5 epochs to prevent overfit
                    )
                    .set_cpu_limit("4")
                    .set_memory_limit("16G")
                    .add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')
                    .set_gpu_limit("1")
                    .set_display_name(f"Model {country_code}")
                    .set_caching_options(False)
                )

                persist_model_task = (
                    copy_output_to_gcs_op(
                        output_obj=model_full_data_task.outputs['model_output_dir'],
                        source_bucket_name=BUCKET_NAME,
                        destination_bucket_name=BUCKET_NAME,
                        destination_blob_name=model_blob_name,
                        overwrite_if_exists=True
                    )
                    .set_display_name("Save Model Artifacts to GCS")
                    .set_caching_options(False)
                )

                deploy_model_task = (
                    deploy_model_to_gcp_endpoint_op(
                        model_bucket_name=BUCKET_NAME,
                        model_blob_name=model_blob_name,
                        model_name='seq_rec',
                        model_version=VERSION,
                        country_code=country_code,
                        endpoint_id="",
                        traffic_split={"0": 100},
                        undeploy_zero_traffic_models=True
                    )
                    .set_display_name("Deploy Model to GCP Endpoint")
                    .after(persist_model_task)
                    .set_caching_options(False)
                )

                record_last_checkpoint_date_task = (
                    record_last_checkpoint_date_op(
                        checkpoint_bucket=BUCKET_NAME,
                        last_checkpoint_date_blob_name=last_checkpoint_date_blob_name
                    )
                    .set_display_name("Record Last Checkpoint Date")
                    .after(deploy_model_task)
                    .set_caching_options(False)
                )

            record_job_status_task.after(deploy_model_task)
            record_job_status_task.after(evaluate_task)


### Compile the pipeline

In [None]:
TRAINING_MODE = 'incremental_retraining'
PIPELINE_ROOT_FULL = os.path.join(PIPELINE_ROOT_PREFIX, COUNTRY_CODE, TRAINING_MODE)
COMPILED_PIPELINE_FILENAME = f"recsys_{PIPELINE_NAME}_{COUNTRY_CODE}_{TRAINING_MODE}_pipeline.json"
DISPLAY_NAME = f"recsys_{PIPELINE_NAME}_{COUNTRY_CODE}_{VERSION}_{TRAINING_MODE}_pipeline"
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline_incremental_retraining, package_path=COMPILED_PIPELINE_FILENAME
)

### Run the pipeline

In [None]:
job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=COMPILED_PIPELINE_FILENAME,
    pipeline_root=PIPELINE_ROOT_FULL,
    parameter_values={
        "deployment_threshold": {
            "hit_rate": 0.10
        },
        "resume_training": True
    },
    location=REGION
)

In [None]:
job.submit(service_account=SERVICE_ACCOUNT_EMAIL)

### Schedule

#### Upload pipeline job to GCS

In [None]:
os.environ['TRAINING_MODE'] = TRAINING_MODE
os.environ['COUNTRY_CODE'] = COUNTRY_CODE
os.environ['COMPILED_PIPELINE_FILENAME'] = COMPILED_PIPELINE_FILENAME
os.environ['PIPELINE_ROOT'] = PIPELINE_ROOT_FULL
os.environ['SERVICE_ACCOUNT_EMAIL'] = SERVICE_ACCOUNT_EMAIL
os.environ['REGION'] = REGION

In [None]:
%%bash

gsutil cp ${COMPILED_PIPELINE_FILENAME} ${PIPELINE_ROOT}/${COMPILED_PIPELINE_FILENAME}

echo ${PIPELINE_ROOT}/${COMPILED_PIPELINE_FILENAME}

#### Create Cloud Functions to submit job to Vertex AI Pipelines

In [None]:
print(
    PROJECT_ID,
    REGION,
    PIPELINE_ROOT_FULL,
    SERVICE_ACCOUNT_EMAIL,
    DISPLAY_NAME,
    sep='\n'
)

In [None]:
%%writetemplate main.py

import json
from google.cloud import aiplatform
import functions_framework

PROJECT_ID = "{PROJECT_ID}"
REGION = "{REGION}"
PIPELINE_ROOT = "{PIPELINE_ROOT_FULL}"
SERVICE_ACCOUNT_EMAIL = "{SERVICE_ACCOUNT_EMAIL}"
DISPLAY_NAME = "{DISPLAY_NAME}"

@functions_framework.http
def recsys_seq_rec_{COUNTRY_CODE}_{TRAINING_MODE}_pipeline_job(request):
    """Processes the incoming HTTP request.

    Args:
     request (flask.Request): HTTP request object.

    Returns:
     The response text or any set of values that can be turned into a Response
     object using `make_response
     <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    import subprocess
    import os
    
    # decode http request payload and translate into JSON object
    request_str = request.data.decode('utf-8')
    request_json = json.loads(request_str)

    pipeline_spec_uri = request_json['pipeline_spec_uri']
    parameter_values = request_json['parameter_values']

    aiplatform.init(
        project=PROJECT_ID,
        location=REGION
    )

    job = aiplatform.PipelineJob(
        display_name=DISPLAY_NAME,
        template_path=pipeline_spec_uri,
        pipeline_root=PIPELINE_ROOT,
        enable_caching=False,
        parameter_values=parameter_values,
    )

    job.submit(service_account=SERVICE_ACCOUNT_EMAIL)

    return "Job submitted"

In [None]:
%%writefile requirements.txt

google-api-python-client>=1.7.8,<2
google-cloud-aiplatform
google-cloud-storage
PyYAML

In [None]:
FUNCTION_DIR = '../functions/deploy_pipeline'

os.environ['FUNCTION_DIR'] = FUNCTION_DIR

In [None]:
%%bash

mkdir -p $FUNCTION_DIR
mv main.py $FUNCTION_DIR
mv requirements.txt $FUNCTION_DIR

In [None]:
%%bash

cd $FUNCTION_DIR

gcloud functions deploy recsys_seq_rec_${COUNTRY_CODE}_${TRAINING_MODE}_pipeline_job \
    --runtime python39 --trigger-http \
    --service-account $SERVICE_ACCOUNT_EMAIL

#### Cloud Scheduler

In [None]:
%%bash

read -r -d '' message_body << EOM
{
    "pipeline_spec_uri": "${PIPELINE_ROOT}/${COMPILED_PIPELINE_FILENAME}",
    "parameter_values": {
        "deployment_threshold": {
            "hit_rate": 0.10
        },
        "resume_training": true
    }
}
EOM

# Run at 4:00AM every day
# Should run after the normal run of full_training pipeline because then we can skip the incremental retraining for that same day
cron="0 4 * * *"

# https://cloud.google.com/scheduler/docs/http-target-auth#using-gcloud

gcloud scheduler jobs create http recsys_seq_rec_${COUNTRY_CODE}_${TRAINING_MODE}_pipeline_job \
    --schedule="$cron" \
    --uri="https://us-central1-seq-rec-gcp-project-id.cloudfunctions.net/recsys_seq_rec_${COUNTRY_CODE}_${TRAINING_MODE}_pipeline_job" \
    --http-method=POST \
    --message-body="$message_body" \
    --time-zone="Asia/Singapore" \
    --oidc-service-account-email=$SERVICE_ACCOUNT_EMAIL \
    --location=$REGION