Import libraries

In [1]:
from typing import NamedTuple

import google.cloud.aiplatform as aip
from google.cloud import storage  # noqa: F401

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (component,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics, 
                        OutputPath,
                        InputPath)





  from kfp.v2 import compiler


Initalise Vertex AI SDK for Python

Define pipeline components

In [3]:
@component(
    packages_to_install=["pandas", "gcsfs", "scikit-learn", "numpy"],
    output_component_file="pre_processing_data.yaml", 
    base_image="python:3.9")
def pre_processed_bf_data(
    train_name: str,
    test_name: str,
    BUCKET_URI: str,
    raw_folder: str,
    pre_processed_folder: str
    ) -> str:
    
    import pandas as pd
    from pre_processing_modules import basic_preprocessing, feature_engineering
    from data_utils import read_csv_GCS, write_csv_GCS


    # Define the bucket URI
    df_train_path = BUCKET_URI + raw_folder + train_name
    df_test_path = BUCKET_URI + raw_folder + test_name

    df_train, df_test = read_csv_GCS(df_train_path, df_test_path)

    df_train, df_test = basic_preprocessing(df_train, df_test)
    df_train, df_test = feature_engineering(df_train, df_test)

    write_csv_GCS(df_train, df_test,BUCKET_URI, pre_processed_folder, train_name, test_name)  

  @component(


In [4]:
@dsl.pipeline(
    name="intro-pipeline-unique",
    description="A simple intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline( train_name: str = "train.csv",
    test_name: str = "test.csv",
    BUCKET_URI: str = BUCKET_URI,
    raw_folder: str = "raw_data/",
    pre_processed_folder: str = "pre_processed_data/"):
    
    pre_processed_bf_data(
        train_name=train_name,
        test_name=test_name,
        BUCKET_URI=BUCKET_URI,
        raw_folder=raw_folder,
        pre_processed_folder=pre_processed_folder
    )
    
    

NameError: name 'train_name' is not defined

In [5]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="bf_pipeline.json")

In [6]:
import google.cloud.aiplatform as aip

PROJECT_ID = "prj-dev-mlbf-flt-01-29e5"
BUCKET_URI = "gs://pipeline_black_friday/"
REGION = "europe-west2"
DISPLAY_NAME = "bf_pipeline_job_unique"


aip.init(project=PROJECT_ID, location=REGION)

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="bf_pipeline.json",
    pipeline_root=f"{BUCKET_URI}pipeline-root/",
)

job.run()


Creating PipelineJob
PipelineJob created. Resource name: projects/573415920784/locations/europe-west2/pipelineJobs/intro-pipeline-unique-20240624180815
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/573415920784/locations/europe-west2/pipelineJobs/intro-pipeline-unique-20240624180815')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west2/pipelines/runs/intro-pipeline-unique-20240624180815?project=573415920784
PipelineJob projects/573415920784/locations/europe-west2/pipelineJobs/intro-pipeline-unique-20240624180815 current state:
2
PipelineJob projects/573415920784/locations/europe-west2/pipelineJobs/intro-pipeline-unique-20240624180815 current state:
3
PipelineJob projects/573415920784/locations/europe-west2/pipelineJobs/intro-pipeline-unique-20240624180815 current state:
3
PipelineJob projects/573415920784/locations/europe-west2/pipelineJobs/intro-pipeline-unique-20240624180815 current state:
3
PipelineJo

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [pre-processed-bf-data].; Job (project_id = prj-dev-mlbf-flt-01-29e5, job_id = 3244964477799497728) is failed due to the above error.; Failed to handle the job: {project_number = 573415920784, job_id = 3244964477799497728}"


In [23]:
@component(packages_to_install=["google-cloud-storage"])
def two_outputs(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("output_one", str),  # Return parameters
        ("output_two", str),
    ]
):
    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print("output one: {}; output_two: {}".format(o1, o2))
    return (o1, o2)

  return component_factory.create_component_from_func(


In [24]:
@component
def consumer(text1: str, text2: str, text3: str) -> str:
    print(f"text1: {text1}; text2: {text2}; text3: {text3}")
    return f"text1: {text1}; text2: {text2}; text3: {text3}"

Define a pipeline that uses the components

Compile the pipeline

In [26]:
from kfp.v2 import compiler  # noqa: F811



Run the pipeline

In [27]:
DISPLAY_NAME = "intro_pipeline_job_unique"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="intro_pipeline.json",
    pipeline_root=PIPELINE_ROOT

)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/573415920784/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240618112329
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/573415920784/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240618112329')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/intro-pipeline-unique-20240618112329?project=573415920784
PipelineJob projects/573415920784/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240618112329 current state:
2
PipelineJob projects/573415920784/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240618112329 current state:
3
PipelineJob projects/573415920784/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240618112329 current state:
3
PipelineJob projects/573415920784/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240618112329 current state:
3


RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [consumer].; Job (project_id = prj-dev-mlbf-flt-01-29e5, job_id = 8695161428190953472) is failed due to the above error.; Failed to handle the job: {project_number = 573415920784, job_id = 8695161428190953472}"


In [28]:
job.delete()

Deleting PipelineJob : projects/573415920784/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240618112329
Delete PipelineJob  backing LRO: projects/573415920784/locations/us-central1/operations/8122597379456106496
PipelineJob deleted. . Resource name: projects/573415920784/locations/us-central1/pipelineJobs/intro-pipeline-unique-20240618112329
