# Operationalize product data and content enrichment using GenAI and AutoSxS

### Overview

This series of notebooks showcases an end-to-end workflow for improving product catalog data using Generative AI and MLOps. The core focus is to operationalize the process for enriching product descriptions, a key element for effective product discovery and recommendation systems.

Building upon the previous notebook's work, where we enhanced descriptions using a GenAI model (Gemma), we now compare these kinds of responses with those from Google's text-bison models using an automated evaluation system.

In this notebook, downstream processes such as model tuning and logging results to data repositories are triggered based on these types of evaluations using AutoSxS. This work flow is then orchestrated and managed efficiently within Vertex AI Pipelines, ensuring a robust and scalable solution for continuous product data improvement.

### Objective

The steps performed include:

* Parameters, variables, and helper functions are defined
* A base Docker image containing dependencies for pipeline components is created and stored in Artifact Registry for future use
* Component Definition:
    * A component for generating product descriptions using the chosen GenAI model is defined.
    * An AutoSxS component is defined to evaluate the quality of generated descriptions.
    * Conditional downstream tasks, such as model retuning or data logging, are defined as separate components based on evaluation results.
* Components are integrated into a Vertex AI Pipeline, which is then submitted as a pipeline job to automate and manage the workflow.

### (prereq) Dataset Information

The 'Input Feed' data is based on a collection of 1,000-row random sample of data from the public BigQuery dataset 'theLook eCommerce'. The data including text and enriched attributes was generated by [FeedGen](https://github.com/google-marketing-solutions/feedgen) and was extracted as a CSV from the FeedGen [input feed - template sheet](https://docs.google.com/spreadsheets/d/19eKTJrbZaUfipAvL5ZQmq_hoxEbLQIlDqURKFJA2OBU/edit#gid=1661242997).

## (Optional) Create endpoint of incumbent model

## Install additional packages
Install the following packages required to execute this notebook.

In [None]:
# # Install the packages
# ! pip3 install --upgrade --quiet google-cloud-aiplatform \
#                                  google-cloud-storage \
#                                  kfp \
#                                  google-cloud-pipeline-components \

## Import Libraries & Define Parameters

In [1]:
import pandas as pd
import os
import sys
from datetime import datetime
from typing import Tuple
import time
import random
import string

from google.cloud import aiplatform, language
from google.cloud import bigquery
from google.cloud import storage
from google_cloud_pipeline_components.preview import model_evaluation
from kfp import compiler

import vertexai
from vertexai.preview.language_models import TextGenerationModel
from typing import NamedTuple

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (
    component,
    pipeline,
    Input,
    Output,
)


  return component_factory.create_component_from_func(
  from kfp.v2 import compiler


### Set Parameters and Variables


* `PROJECT_ID`: The ID of your Google Cloud project where the pipeline and resources reside
* `REGION`: The Google Cloud region where pipeline components are executed and resources are located
* `ARTIFACT_REPO`: The name of the repository used for storing pipeline artifacts and container images in Artifact Registry
* `BUCKET_URI`: The URI of the Google Cloud Storage bucket where data and pipeline artifacts are stored (e.g., "gs://passage-gen-test").
* `BUCKET_NAME`: The name of the GCS bucket derived from the BUCKET_URI
* `MODEL_RESOURCE`: The resource name of the baseline language model used for comparison in AutoSxS (e.g., "publishers/google/models/text-bison-32k@002").
* `IMAGE_URI`: The URI of the container image used for running the pipeline components. It's dynamically constructed using the REGION, PROJECT_ID, and ARTIFACT_REPO.
* `input_feed_data`: The GCS path to the input CSV file containing product data for generating descriptions
* `evaluation_dataset_name`: The base name for the generated evaluation dataset files (without extension), used for both CSV and JSONL formats.
* `judgement_threshold`: The minimum win rate required for either model (custom or baseline) to be considered for updating the product catalog (e.g., 0.75).
* `SERVICE_ACCOUNT`: The service account used for running the pipeline components and accessing Google Cloud resources.
* `input_file_path`: (Appears to be the same as input_feed_data) The GCS path to the input CSV file containing product data.
* `PIPELINE_ROOT`: The GCS location where pipeline metadata and execution information are stored. It's set to the same value as BUCKET_URI.
* `DISPLAY_NAME`: The display name for the pipeline run, constructed using the ARTIFACT_REPO and a random string.
* `DATASET_ID`: The ID of the BigQuery dataset where evaluation results or other data might be stored.

In [2]:
PROJECT_ID = "sandbox-401718"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}
ARTIFACT_REPO = "passage-gen-example"

BUCKET_URI = f"gs://{PROJECT_ID}-passage-gen-test"  # @param {type:"string"}
BUCKET_NAME = "/".join(BUCKET_URI.split("/")[:3])
MODEL_RESOURCE = "publishers/google/models/text-bison-32k@002"
# MODEL_RESOURCE = "publishers/google/models/gemini-1.0-pro-001"
random_str = "".join(random.choices(string.ascii_lowercase + string.digits, k=5))
IMAGE_URI=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{ARTIFACT_REPO}/passage_gen_image:latest"

input_feed_data = "gs://passage-gen-test/FeedGen-Input-Feed.csv"  # INPUT DATA GCS path
evaluation_dataset_name = "evaluation_dataset_pipe"
# endpoint_resource_name = "projects/757654702990/locations/us-central1/endpoints/2501228424492744704" # if not specified textbison001 will be used

judgement_threshold = 0.75

SERVICE_ACCOUNT = (
    "757654702990-compute@developer.gserviceaccount.com"  # @param {type:"string"}
)

PIPELINE_ROOT = BUCKET_URI
DISPLAY_NAME = ARTIFACT_REPO + random_str
DATASET_ID = "passage_gen_autosxs"


**Only if your bucket doesn't already exist:** Run the following cell to create your Cloud Storage bucket

In [None]:
# ! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

## Create Python File with Helper Functions

In [3]:
%%writefile ./utils.py
    
import pandas as pd
import os
import sys
from datetime import datetime
from typing import Tuple
import time
import random
import string

from google.cloud import aiplatform, language
from google.cloud import bigquery
from google.cloud import storage
from google_cloud_pipeline_components.preview import model_evaluation
from kfp import compiler

import vertexai
from vertexai.preview.language_models import TextGenerationModel

def prompt_func(prompt_input: str):
    """Prompts designed to enrich Product description information"""
    prompt = f"""
        You are a leading digital marketer working for a top retail organization. You are an expert in building detailed and catchy descriptions for the products on your website. 

        Context: {prompt_input}

        Generate ONLY the product description in English that highlights the product's features using the above "Context" information. 
        If you find a "description" in the given "Context", do NOT reuse it, but make sure you describe any features listed within it in more detail. 
        Do NOT repeat sentences. The generated description should strictly be about the provided product. 
        Correct product type, number of items contained in the the product as well as product features such as color should be followed. 
        Any product features that are not present in the input should not be present in the generated description.
        Hyperbolic text, over promising or guarantees are to be avoided.
        The generated description should be at least 50 words long, preferably at least 150. 
        The generated description MUST NOT use special characters or any Markdown or JSON syntax. 

        New Detailed Product Description:"""
    return prompt



# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    """Generates a custom-length UUID-like string.

    Uses a combination of lowercase letters and digits to create a 
    randomized string resembling a shortened UUID (Universally Unique Identifier).

    Args:
        length (int, optional): The desired length of the generated string. 
                                Defaults to 8 characters.

    Returns:
        str: The generated UUID-like string.
    """
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))

UUID = generate_uuid()


# Gemma deployment

def get_job_name_with_datetime(prefix: str) -> str:
    """Gets the job name with date time when triggering deployment jobs."""
    return prefix + datetime.now().strftime("_%Y%m%d_%H%M%S")


def deploy_model_vllm(
    model_name: str,
    model_id: str,
    service_account: str,
    machine_type: str = "g2-standard-12",
    accelerator_type: str = "NVIDIA_L4",
    accelerator_count: int = 1,
    max_model_len: int = 8192,
    dtype: str = "bfloat16",
) -> Tuple[aiplatform.Model, aiplatform.Endpoint]:
    """Deploys models with vLLM on GPU in Vertex AI."""
    endpoint = aiplatform.Endpoint.create(display_name=f"{model_name}-endpoint")

    vllm_args = [
        "--host=0.0.0.0",
        "--port=7080",
        f"--model={model_id}",
        f"--tensor-parallel-size={accelerator_count}",
        "--swap-space=16",
        "--gpu-memory-utilization=0.9",
        f"--max-model-len={max_model_len}",
        f"--dtype={dtype}",
        "--disable-log-stats",
    ]

    env_vars = {
        "MODEL_ID": model_id,
    }
    # if HF_TOKEN:
    #     env_vars["HF_TOKEN"] = HF_TOKEN

    model = aiplatform.Model.upload(
        display_name=model_name,
        serving_container_image_uri=VLLM_DOCKER_URI,
        serving_container_command=["python", "-m", "vllm.entrypoints.api_server"],
        serving_container_args=vllm_args,
        serving_container_ports=[7080],
        serving_container_predict_route="/generate",
        serving_container_health_route="/ping",
        serving_container_environment_variables=env_vars,
        serving_container_shared_memory_size_mb=(16 * 1024),  # 16 GB
        serving_container_deployment_timeout=7200,
    )

    model.deploy(
        endpoint=endpoint,
        machine_type=machine_type,
        accelerator_type=accelerator_type,
        accelerator_count=accelerator_count,
        deploy_request_timeout=1800,
        service_account=service_account,
        sync=True,
        enable_access_logging=True,
    )
    return model, endpoint


def save_csv_gcs(BUCKET_NAME: str, evaluation_dataset_name: str):
    """
    Saves a CSV file to a Google Cloud Storage bucket.

    Args:
        BUCKET_NAME (str):  The name of the GCS bucket (excluding the 'gs://' prefix).
        evaluation_dataset_name (str):  The filename (without extension) to use for the saved CSV.

    """
    
    # save to GCS
    storage_client = storage.Client()
    bucket = storage_client.bucket(BUCKET_NAME[5:])
    blob = bucket.blob(f"data/{evaluation_dataset_name}.csv")
    blob.upload_from_filename(f"{evaluation_dataset_name}.csv")

    print(f"File uploaded to cloud storage in {BUCKET_NAME}/data/{evaluation_dataset_name}.csv")
    
    
    
def save_jsonl_gcs(BUCKET_NAME: str, evaluation_dataset_name: str):
    """
    Saves a JSON Lines (.jsonl) file to a Google Cloud Storage bucket.

    Args:
        BUCKET_NAME (str):  The name of the GCS bucket (excluding the 'gs://' prefix).
        evaluation_dataset_name (str):  The filename (without extension) to use for the saved JSONL file.
    """
    
    # save to GCS 
    storage_client = storage.Client()
    bucket = storage_client.bucket(BUCKET_NAME[5:])
    blob = bucket.blob(f"data/{evaluation_dataset_name}.jsonl")
    blob.upload_from_filename(f"{evaluation_dataset_name}.jsonl")

    print(f"File uploaded to cloud storage in {BUCKET_NAME}/data/{evaluation_dataset_name}.jsonl")

Overwriting ./utils.py


## Create Dockerfile

In [4]:
%%writefile Dockerfile

FROM python:3.10-slim

COPY ./utils.py ./utils.py
COPY ./requirements.txt ./requirements.txt

WORKDIR ./app
RUN apt-get update && apt-get install gcc libffi-dev -y

RUN pip install -r requirements.txt

Overwriting Dockerfile


In [5]:
! docker build -t {IMAGE_URI} .
! gcloud auth configure-docker us-central1-docker.pkg.dev --quiet
! docker push {IMAGE_URI}

Sending build context to Docker daemon  6.348MB
Step 1/6 : FROM python:3.10-slim
 ---> 797a4d7093b1
Step 2/6 : COPY ./utils.py ./utils.py
 ---> Using cache
 ---> ccde40f93680
Step 3/6 : COPY ./requirements.txt ./requirements.txt
 ---> Using cache
 ---> 44247a683f3e
Step 4/6 : WORKDIR ./app
 ---> Using cache
 ---> 42707595c675
Step 5/6 : RUN apt-get update && apt-get install gcc libffi-dev -y
 ---> Using cache
 ---> a482880a5ff1
Step 6/6 : RUN pip install -r requirements.txt
 ---> Running in b14351fd55f0
[91mERROR: Could not open requirements file: [Errno 2] No such file or directory: 'requirements.txt'
[0m[91m
[notice] A new release of pip is available: 23.0.1 -> 24.0
[notice] To update, run: pip install --upgrade pip
[0mThe command '/bin/sh -c pip install -r requirements.txt' returned a non-zero code: 1

{
  "credHelpers": {
    "gcr.io": "gcloud",
    "us.gcr.io": "gcloud",
    "eu.gcr.io": "gcloud",
    "asia.gcr.io": "gcloud",
    "staging-k8s.gcr.io": "gcloud",
    "marketplac

## Define Components

Components are building blocks for creating kfp pipelines

In [3]:
@component(
    base_image=IMAGE_URI,
    packages_to_install=["google-cloud-aiplatform","google-cloud-language", "fsspec", "gcsfs"],
)
def gen_model_endpoint_response(
    BUCKET_NAME: str,
    input_feed_data: str,
    evaluation_dataset_name: str,
    max_tokens: int = 1000,
    temperature: float = 0.5,
    top_p: float = 0.5,
    top_k: int = 10,
    endpoint_resource_name: str = None,
):

    """Generates enriched product descriptions using a language model and saves results.

    This component leverages a language model to create detailed, marketing-oriented 
    product descriptions based on input product data. It takes a CSV file and 
    output dataset name, processes the product information, prompts a language model,
    and saves the generated descriptions along with relevant metadata.

    Args:
        input_feed_data (str): Path to the CSV file containing product data 
                             (columns like 'Title', 'Item ID', etc.).
        evaluation_dataset_name (str): Name for the dataset of results, used when
                                     saving to cloud storage.
        max_tokens (int):  Maximum number of tokens in generated responses (default: 1000).  
        temperature (float): Controls the randomness of generated text (default: 0.5).
        top_p (float): Nucleus sampling parameter (default: 0.5).
        top_k (int): Number of tokens to consider for nucleus sampling (default: 10).
        endpoint_resource_name (str): Resource name of a custom language model endpoint. If not provided, a default base model is used.
    """

    import os
    import pandas as pd
    from google.cloud import storage
    from google.cloud import aiplatform, language
    import random
    import utils
    import vertexai
    from vertexai.preview.language_models import TextGenerationModel
    
    # Import input
    df = pd.read_csv(input_feed_data) \
      .drop(['Link', 'Image Link'], axis=1) \
      .head(5)
    
    eval_df = []

    # Generate description for each product row
    for index, row in df.iterrows():
        if index % 5 == 0:
            print("Processing row:", index+1)

        # prompt_input = row.result
        prompt_input = row.to_dict()
        prompt = utils.prompt_func(prompt_input)
        instances = [
            {
                "prompt": prompt,
                "max_tokens": max_tokens,
                "temperature": temperature,
                "top_p": top_p,
                "top_k": top_k,
                "raw_response": True,
            },
        ]

        # try: 
        if endpoint_resource_name is not None:
            endpoint_vllm = aiplatform.Endpoint(endpoint_resource_name)
            response = endpoint_vllm.predict(instances=instances)
            prediction = response.predictions[0]
        # except:
        elif endpoint_resource_name is None:
            model = TextGenerationModel.from_pretrained("text-bison@001")
            prediction = model.predict(
                instances[0]["prompt"],
                temperature=instances[0]["temperature"],
                max_output_tokens=instances[0]["max_tokens"],
                top_k=instances[0]["top_k"],
                top_p=instances[0]["top_p"],
            ).text

        # Append results
        eval_df.append(
            {
                "prompt_id": prompt_input,
                "prompt": prompt,
                "response": prediction,
                "name": prompt_input["Title"],
                "id": prompt_input["Item ID"],
            }
        )

        eval_df_ = pd.DataFrame(eval_df)

    # save locally
    eval_df_.to_csv(f"{evaluation_dataset_name}.csv", index=False)

    # save to GCS
    utils.save_csv_gcs(BUCKET_NAME, evaluation_dataset_name)

    eval_df_.to_json(f"{evaluation_dataset_name}.jsonl", orient="records", lines=True)

    # save to GCS
    utils.save_jsonl_gcs(BUCKET_NAME, evaluation_dataset_name)

In [4]:
@component(
    base_image=IMAGE_URI,
    packages_to_install=["google-cloud-aiplatform","google-cloud-language", "fsspec", "gcsfs"],
)
def autoSxS_compare_base_model(
    BUCKET_NAME: str,
    evaluation_dataset_name: str,
    PROJECT_ID: str,
    REGION: str,
    BUCKET_URI: str,
    # MODEL_RESOURCE: str = "publishers/google/models/text-bison-32k@002",
    MODEL_RESOURCE: str = "publishers/google/models/gemini-1.0-pro-001",
    context_column: str = "name",
    question_column: str = "prompt",
    response_column: str = "response",
    model_prompt: str = "prompt",
) -> NamedTuple(
    "Outputs", [("judgements_path", str), ("win_rate_a", float), ("win_rate_b", float)]
):
    """
    Executes an AutoSxS model evaluation job on Google Cloud AI Platform, comparing 
    responses to a baseline model.

    This component performs the following: Configures AutoSxS Job. Initializes an AutoSxS Pipeline Job
    Launches the Job. Submits the Pipeline Job to AI Platform and waits for completion.
    Extracts Results and logs outputs.


    Args:
        BUCKET_NAME (str): Name of the Google Cloud Storage bucket.
        evaluation_dataset_name (str): Name of the evaluation dataset (a JSONL file in GCS).
        PROJECT_ID (str): Project ID for the Google Cloud project.
        REGION (str): Region where the AutoSxS job will run.
        BUCKET_URI (str):  URI of a GCS bucket for staging pipeline artifacts.
        MODEL_RESOURCE (str): Resource name of the baseline model (default: 'publishers/google/models/text-bison-32k@002')
        context_column (str): Column in the dataset containing contextual information for prompts.
        question_column (str):  Column in the dataset containing the prompts.
        response_column (str): Name of the column where the custom model's responses will be stored.
        model_prompt (str): Prompt used with the custom model.

    Returns:
        NamedTuple('Outputs'):
            judgements_path (str): Path to the judgements.jsonl file (GCS location).
            win_rate_a (float): Win rate of the custom model.
            win_rate_b (float): Win rate of the baseline model.
    """

    import os
    import pandas as pd
    from google.cloud import storage
    from google.cloud import aiplatform, language
    import random
    from typing import NamedTuple
    import utils

    UUID = utils.generate_uuid()
    display_name = f"examples-resp-model-full-32k-{UUID}"
    context_column = context_column
    question_column = question_column
    response_column = response_column
    model_prompt = model_prompt
    model_resource = MODEL_RESOURCE

    parameters = {
        "evaluation_dataset": f"{BUCKET_NAME}/data/{evaluation_dataset_name}.jsonl",
        "id_columns": [question_column],
        "autorater_prompt_parameters": {
            "inference_context": {"column": context_column},
            "inference_instruction": {"column": question_column},
        },
        "task": "question_answering@001",
        "model_a": model_resource,
        "model_a_prompt_parameters": {"prompt": {"column": model_prompt}},
        "response_column_b": response_column,
    }

    aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)
    job = aiplatform.PipelineJob(
        job_id=display_name,
        display_name=display_name,
        pipeline_root=os.path.join(BUCKET_URI, display_name),
        # template_path=template_uri,
        template_path=(
            "https://us-kfp.pkg.dev/ml-pipeline/google-cloud-registry/autosxs-template/default"
        ),
        parameter_values=parameters,
        enable_caching=False,
    )
    job.run(sync=True)
    print("Pipeline Complete")

    # Aggregate Metrics

    for details in job.task_details:
        if details.task_name == "model-evaluation-text-generation-pairwise":
            break
    # Extract judgements output path

    string = str(details.__dict__)

    items = string.strip("{}").split("\n")
    for item in items:
        if "judgments.jsonl" in item:
            _, value = item.split('string_value: "')
            judgements_path = value.strip('"')
    # Extract win rates

    win_rate = pd.DataFrame([details.outputs["autosxs_metrics"].artifacts[0].metadata])
    win_rate_a = win_rate["autosxs_model_a_win_rate"][0]
    win_rate_b = win_rate["autosxs_model_b_win_rate"][0]

    return (judgements_path, win_rate_a, win_rate_b)


In [5]:
@component(
    base_image=IMAGE_URI,
    packages_to_install=["google-cloud-aiplatform","google-cloud-language", "fsspec", "gcsfs"],
)
def update_catalog(
    BUCKET_NAME: str,
    evaluation_dataset_name: str,
    judgements_uri: str,
    win_rate_a: float,
    win_rate_b: float,
    judgement_threshold: float,
):
    """
    Updates a catalog based on evaluation results and a win-rate threshold.

    Reads evaluation results and judgements from specified sources, determines 
    the winning model (if any), and saves the winning responses to GCS.

    Args:
        BUCKET_NAME (str): The name of the GCS bucket (excluding 'gs://' prefix).
        evaluation_dataset_name (str): Filename of the evaluation dataset (without extension).
        judgements_uri (str): URI of the judgements file (likely a GCS object path).
        judgement_threshold (float):  Win-rate threshold for determining the winning model (default=0.75).
        win_rate_a (float): Win rate of model A.
        win_rate_b (float): Win rate of model B.
    """
    import os
    import pandas as pd
    from google.cloud import storage
    import utils

    # Fetch Judgements from AutoSxS job
    judgements_df = pd.read_json(judgements_uri, lines=True)
    eval_df_ = pd.read_csv(f"{BUCKET_NAME}/data/{evaluation_dataset_name}.csv")
    response_df = pd.merge(
        eval_df_[["prompt", "id"]], judgements_df, on="prompt", how="left"
    ).dropna()

    # Evaluate if win rate meets threshold

    if win_rate_a >= judgement_threshold:
        drop_column = "response_b"
    elif win_rate_b >= judgement_threshold:
        drop_column = "response_a"
        
    # Write winner to GCS

    response_df = response_df.drop(
        ["explanation", "choice", "confidence", drop_column], axis=1
    )
    response_df.to_json(
        f"{evaluation_dataset_name}_win_responses.jsonl", orient="records", lines=True
    )
    utils.save_jsonl_gcs(BUCKET_NAME, f"{evaluation_dataset_name}_win_responses")


In [6]:
@component(
    base_image="us-central1-docker.pkg.dev/sandbox-401718/passage-gen-example/passage_gen_image:latest",
    packages_to_install=["google-cloud-aiplatform","google-cloud-language", "fsspec", "gcsfs"],
)
def hello_world():
    print("hello world")

## Define and Run Pipeline

This Kubeflow pipeline automates product description enrichment and evaluation. This leverages a language model in Vertex AI to generate detailed descriptions based on input product data. Then, it employs AutoSxS to compare these descriptions against a baseline model.

* A component for generating product descriptions using the chosen GenAI model is defined.
* An AutoSxS component is defined to evaluate the quality of generated descriptions.
* Conditional downstream tasks, such as model tuning or data logging, are defined as separate components based on evaluation results.

Reference notebook for fine tuning Gemma from the Google Cloud repository: [model_garden_gemma_finetuning_on_vertex.ipynb](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/model_garden/model_garden_gemma_finetuning_on_vertex.ipynb)

<br>

![pipeline.png](./imgs/passage-gen-pipeline.png)

In [7]:
@dsl.pipeline(pipeline_root=PIPELINE_ROOT, name="passage-gen-example")
def pipeline():

    generate_responses = gen_model_endpoint_response(
        BUCKET_NAME=BUCKET_NAME,
        input_feed_data=input_feed_data,
        evaluation_dataset_name=evaluation_dataset_name,
    )

    autosxs_evals = autoSxS_compare_base_model(
        BUCKET_NAME=BUCKET_NAME,
        MODEL_RESOURCE=MODEL_RESOURCE,
        evaluation_dataset_name=evaluation_dataset_name,
        PROJECT_ID=PROJECT_ID,
        REGION=REGION,
        BUCKET_URI=BUCKET_URI,
        context_column="name",
        question_column="prompt",
        response_column="response",
        model_prompt="prompt",
    ).after(generate_responses)

    with dsl.If(
        (autosxs_evals.outputs["win_rate_a"] >= judgement_threshold)
        or (autosxs_evals.outputs["win_rate_b"] >= judgement_threshold)
    ):
        update_catalog(
            BUCKET_NAME=BUCKET_NAME,
            evaluation_dataset_name=evaluation_dataset_name,
            judgements_uri=autosxs_evals.outputs["judgements_path"],
            judgement_threshold=judgement_threshold,
            win_rate_a=autosxs_evals.outputs["win_rate_a"],
            win_rate_b=autosxs_evals.outputs["win_rate_b"],
        )
    with dsl.Else():
        hello_world().set_display_name("Trigger Tuning Job") # Placeholder component for Tuning


In [8]:
template_uri = 'pipeline.yaml'
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path=template_uri,
)

job = aiplatform.PipelineJob(
    job_id=DISPLAY_NAME,
    display_name="pipeline-passage-gen",
    pipeline_root=os.path.join(BUCKET_URI, DISPLAY_NAME),
    template_path=template_uri,
    enable_caching=False,
)
job.run(sync=True)

Creating PipelineJob
PipelineJob created. Resource name: projects/757654702990/locations/us-central1/pipelineJobs/passage-gen-examplenibpw
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/757654702990/locations/us-central1/pipelineJobs/passage-gen-examplenibpw')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/passage-gen-examplenibpw?project=757654702990
PipelineJob projects/757654702990/locations/us-central1/pipelineJobs/passage-gen-examplenibpw current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/757654702990/locations/us-central1/pipelineJobs/passage-gen-examplenibpw current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/757654702990/locations/us-central1/pipelineJobs/passage-gen-examplenibpw current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/757654702990/locations/us-central1/pipelineJobs/passage-gen-examplenibpw current state