# E2E Vertex AI Training Pipeline

## Importing required libraries

In [1]:
import os
import kfp
from kfp.v2 import compiler, dsl
from functools import partial
from jinja2 import Template
from google.cloud import aiplatform
from datetime import datetime
from google_cloud_pipeline_components.v1.dataflow import DataflowPythonJobOp
from google_cloud_pipeline_components.v1.wait_gcp_resources import WaitGcpResourcesOp
import warnings

warnings.filterwarnings("ignore")

## Defining parameters

In [3]:
sample_size = 50000
lemmatize_flag = "True"
dim_reduction_algo = "PCA"

In [4]:
project_id = "quantiphi-buzzwords"
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
job_id = f"e2e-training-pipeline-{timestamp}"
region = "us-central1"

## Loading Custom Component


Specifying the component directory to load the Data Ingestion Component using component.yaml.jinja file

The component.yaml.jinja file contains:

- Inputs and Outputs of the component
- Artifact registry path of container image
- Entrypoint command of docker container

In [5]:
def _load_custom_component(components_dir: str, component_name: str) -> kfp.components:
    component_path = os.path.join(
        components_dir, component_name, "component.yaml.jinja"
    )
    with open(component_path, "r") as f:
        component_text = Template(f.read()).render()
    return kfp.components.load_component_from_text(component_text)


In [6]:
components_dir = "../../components/"  # "../components/" or "../../components"
load_custom_component = partial(_load_custom_component, components_dir=components_dir)
data_prep_op = load_custom_component(component_name="data_prep")
training_op = load_custom_component(component_name="training")

## Defining the E2E Training Pipeline

In [7]:
@dsl.pipeline(name="e2e-training-pipeline")
def pipeline(
    sample_size : int,
    lemmatize_flag : str,
    job_id : str,
    dim_reduction_algo : str
):

    data_prep_task = (data_prep_op(sample_size=sample_size,lemmatize_flag=lemmatize_flag,job_id=job_id,dim_reduction_algo=dim_reduction_algo))
    training_task = (training_op(job_id=job_id)).after(data_prep_task)

## Compiling Pipeline

The pipeline is compiled using the Compiler and all the relevant information is saved in JSON file.

In [8]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="../pipeline_spec/e2e_training_pipeline.json"
)

## Defining the PipelineJob

The PipelineJob is defined which includes pipeline related details(job id, display-name), pipeline root folder to store the pipeline artifacts/metrics, location details and compiled JSON file to run the pipelinejob.

In [9]:
pipeline_job = aiplatform.PipelineJob(
    display_name="e2e-training-pipeline",
    template_path="../pipeline_spec/e2e_training_pipeline.json",
    job_id=job_id,
    parameter_values={
        "sample_size": sample_size,
        "lemmatize_flag": lemmatize_flag,
        "job_id":job_id,
        "dim_reduction_algo":dim_reduction_algo,
    },
    location=region,
    enable_caching=False
)

submitting pipeline job

## Submitting the Pipeline job

The Pipelinejob is submitted using the submit() and logs of the job can be viewed on the VertexAI platform

In [10]:
pipeline_job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/431547520072/locations/us-central1/pipelineJobs/e2e-training-pipeline-20221110072122
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/431547520072/locations/us-central1/pipelineJobs/e2e-training-pipeline-20221110072122')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/e2e-training-pipeline-20221110072122?project=431547520072
