# Vertex AI Pipelines: Lightweight Python function-based components, and component I/O

[[github](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/lightweight_functions_component_io_kfp.ipynb) - 2a25be6]

In [None]:
from typing import NamedTuple

import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, 
                        Model, Output, OutputPath, component)

## Define preprocess component

In [None]:
@component
def preprocess(
    message: str,
    output_dataset_one: Output[Dataset],
    output_dataset_two_path: OutputPath("Dataset"),
    output_parameter_path: OutputPath(str)
):
    output_dataset_one.metadata["hello"] = "there",

    with open(output_dataset_one.path, "w") as f:
        f.write(message)

    with open(output_dataset_two_path, "w") as f:
        f.write(message)

    with open(output_parameter_path, "w") as f:
        f.write(message)

## Define train component

In [None]:
@component
def train(
    message: str,
    dataset_one_path: InputPath("Dataset"),
    dataset_two: Input[Dataset],
    imported_dataset: Input[Dataset],
    model: Output[Model],
    num_steps: int = 3
) -> NamedTuple(
    "Outputs",
    [
        ("output_message", str),
        ("generic_artifact", Artifact)
    ]
):
    with open(dataset_one_path) as f:
        dataset_one_contents = f.read()

    with open(dataset_two.path) as f:
        dataset_two_contents = f.read()

    with open(model.path, "w") as f:
        f.write("My Model")

    with open(imported_dataset.path) as f:
        data = f.read()
    
    print("Imported Dataset:", data)

    model.metadata["accuracy"] = 0.9
    model.metadata["framework"] = "Tensorflow"
    model.metadata["time_to_train_in_seconds"] = 257

    artifact_contents = f"{dataset_one_contents}\n{dataset_two_contents}"
    output_message = " ".join([message for _ in range(num_steps)])
    return (output_message, artifact_contents)

## Define read artifact input component

In [None]:
@component
def read_artifact_input(
    generic: Input[Artifact]
):
    with open(generic.path) as f:
        generic_contents = f.read()
        print(f"generic contents: {generic_contents}")

## Define pipeline

In [None]:
@dsl.pipeline(name="metadata-pipeline-v2")
def pipeline(message: str):
    importer = kfp.dsl.importer(
        artifact_uri="gs://ml-pipeline-playground/shakespeare1.txt",
        artifact_class=Dataset,
        reimport=False
    )
    preprocess_task = preprocess(message=message)
    train_task = train(
        dataset_one_path=preprocess_task.outputs["output_dataset_one"],
        dataset_two=preprocess_task.outputs["output_dataset_two_path"],
        imported_dataset=importer.output,
        message=preprocess_task.outputs["output_parameter_path"],
        num_steps=5
    )
    read_task = read_artifact_input(generic=train_task.outputs["generic_artifact"])

## Run the pipeline locally

In [None]:
pipeline_file = 'lightweight-pipeline.json'

In [None]:
kfp.v2.compiler.Compiler().compile(pipeline_func=pipeline, 
                                   package_path=pipeline_file)

In [13]:
kfp.compiler.Compiler(
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
        pipeline_func=pipeline, 
        package_path=pipeline_file)

ValueError: dsl.importer is not supported with v1 compiler.

In [None]:
kfp_endpoint = "http://localhost:8080/pipeline"

kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(
    pipeline_file,
    arguments={"message": "Hello, World"},
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
    enable_caching=False)

In [12]:
kfp_endpoint = "http://localhost:8080/pipeline"

kfp.Client(host=kfp_endpoint).create_run_from_pipeline_package(
    pipeline_file,
    arguments={"message": "Hello, World"},
    # mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
    enable_caching=False)

ValueError: The package_file lightweight-pipeline.json should end with one of the following formats: [.tar.gz, .tgz, .zip, .yaml, .yml]