<a href="https://colab.research.google.com/github/BahodirML/Coding_Practices/blob/main/pipeline_intro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [40]:
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                         google-cloud-storage \
                         kfp \
                         google-cloud-pipeline-components

Collecting google-cloud-storage
  Downloading google_cloud_storage-2.16.0-py2.py3-none-any.whl (125 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m125.6/125.6 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
Collecting google-cloud-pipeline-components
  Downloading google_cloud_pipeline_components-2.13.1-py3-none-any.whl (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
Collecting google-api-core<3.0.0dev,>=2.15.0 (from google-cloud-storage)
  Downloading google_api_core-2.18.0-py3-none-any.whl (138 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m138.3/138.3 kB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
INFO: pip is looking at multiple versions of google-api-core[grpc] to determine which version is compatible with other requirements. This could take a while.
Installing collected packages: google-api-core, google-cloud-storage, google-cloud-pipeline-components
  Attempting

In [1]:
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)


{'status': 'ok', 'restart': True}

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 2.7.0


In [20]:
PROJECT_ID = "poised-conduit-420907"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [3]:
REGION = "asia-northeast3"  # @param {type: "string"}


In [4]:

from google.colab import auth
auth.authenticate_user()

In [5]:
BUCKET_URI = f"gs://intro_pip-{PROJECT_ID}-unique"  # @param {type:"string"}

In [21]:
SERVICE_ACCOUNT = "709551633939-compute@developer.gserviceaccount.com"  # @param {type:"string"}

In [22]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "709551633939-compute@developer.gserviceaccount.com"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

In [8]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator

In [9]:
from typing import NamedTuple

import google.cloud.aiplatform as aip
from kfp import compiler, dsl
from kfp.dsl import component

In [10]:
# API service endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

In [23]:
RUN_ID = "pipeline_run_123"  # Replace with actual run ID
PIPELINE_ROOT = "{}/pipeline_runs/{}/intro".format(BUCKET_URI, RUN_ID)


In [24]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

In [33]:
from google.cloud import aiplatform
aiplatform.googleapis.com/custom_model_training_cpus

AttributeError: module 'google.cloud.aiplatform' has no attribute 'googleapis'

In [44]:
@component(base_image="python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text


compiler.Compiler().compile(hello_world, "hw.yaml")

In [27]:
@component(packages_to_install=["google-cloud-storage"])
def two_outputs(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ],
):
    # the import is not actually used for this simple example, but the import
    # is successful, as it was included in the `packages_to_install` list.
    from google.cloud import storage  # noqa: F401

    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)

In [62]:
@component()
def consumer(text1: str, text2: str, text3: str) -> str:
    print(f"text1: {text1}; text2: {text2}; text3: {text3}")
    return f"text1: {text1}; text2: {text2}; text3: {text3}"

  return component_factory.create_component_from_func(


In [63]:
@dsl.pipeline(
    name="intro-pipeline-unique",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(text: str = "hi there"):
    hw_task = hello_world(text=text)
    two_outputs_task = two_outputs(text=text)
    consumer_task = consumer(  # noqa: F841
        text1=hw_task.output,
        text2=two_outputs_task.outputs["output_one"],
        text3=two_outputs_task.outputs["output_two"],
    )

In [64]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="intro_pipeline.yaml")

In [67]:
from google.cloud.aiplatform import pipeline_jobs
DISPLAY_NAME = "intro_pipeline_job_unique"
job = pipeline_jobs.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="intro_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT
)
job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/709551633939/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240421160626
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/709551633939/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240421160626')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/intro-pipeline-unique-20240421160626?project=709551633939


In [None]:
# DISPLAY_NAME = "intro_pipeline_job_unique"

# job = aip.PipelineJob(
#     display_name=DISPLAY_NAME,
#     template_path="intro_pipeline.yaml",
#     pipeline_root=PIPELINE_ROOT,
# )

# job.run()