In [9]:
# Parameters for automatic testing
kedro_env = 'base'

In [10]:
relative_project_path = "..//"

## Loads kedro yml data

In [11]:
from kedro.config import ConfigLoader
from kedro.framework.project import settings

conf_path = str(relative_project_path + settings.CONF_SOURCE)
conf_loader = ConfigLoader(conf_source=conf_path, env=kedro_env)
parameters = conf_loader['parameters']

In [12]:
gcp_params = parameters['gcp']

params_bucket_entry = 'bucket'
params_vertexai_entry = 'vertex_ai'
params_kedro_job = 'kedro_job'

project_id = gcp_params['project_id']
bucket_name = gcp_params[params_bucket_entry]['name']

vertex_ai_job_name = gcp_params[params_vertexai_entry]['job_name']
vertex_ai_region = gcp_params[params_vertexai_entry]['region']
vertex_ai_docker_uri = gcp_params[params_vertexai_entry]['docker_image_uri']

kedro_pipeline_name = gcp_params[params_kedro_job]['pipeline_name']
kedro_env_name = gcp_params[params_kedro_job]['env_name']

## Runs vertex ai custom job

In [14]:
from google.cloud import aiplatform

def run_custom_container_kedro_job(project_id, region, display_name, docker_image_uri, bucket_name,
        pipeline_name, env_name,
        machine_type="n1-standard-4", replica_count = 1):
    # Initialize the Vertex AI client
    aiplatform.init(project=project_id, location=region, staging_bucket=bucket_name)

    # Specify the machine specification for the training job
    machine_spec = aiplatform.gapic.MachineSpec(machine_type=machine_type)

    # Specify the environment variables for the container
    env_vars = {"pipeline_name": pipeline_name, "env_name": env_name}

    # Specify the worker pool specification
    worker_pool_spec = aiplatform.gapic.WorkerPoolSpec(
        machine_spec=machine_spec,
        replica_count=replica_count,
        container_spec=aiplatform.gapic.ContainerSpec(image_uri=docker_image_uri, env=env_vars),
        # You can also pass command-line arguments to the Docker container here
        # "args": ["--arg1", "value1"],
    )

    # Initialize a Vertex AI custom job
    job = aiplatform.CustomJob(
        display_name=display_name,
        worker_pool_specs=[worker_pool_spec],
    )

    # Run the job
    job.run(sync=False)

# Call the function
run_custom_container_kedro_job(project_id, vertex_ai_region, vertex_ai_job_name, vertex_ai_docker_uri, bucket_name,
        kedro_pipeline_name, kedro_env_name)