In [None]:
!pip install kfp
from kfp import dsl
from kfp import compiler

# Ignore FutureWarnings in kfp
import warnings
warnings.filterwarnings("ignore",
                        category=FutureWarning,
                        module='kfp.*')

In [None]:
### Simple example: component 1
@dsl.component
def say_hello(name: str) -> str:
    hello_text = f'Hello, {name}!'

    return hello_text

In [None]:
hello_task = say_hello(name="Erwin")
print(hello_task)

In [None]:
print(hello_task.output)

In [None]:
# this will give an error and ask you to specify the parameter name
hello_task = say_hello("Erwin")

In [None]:
### Simple example: component 2
@dsl.component
def how_are_you(hello_text: str) -> str:

    how_are_you = f"{hello_text}. How are you?"

    return how_are_you

In [None]:
how_task = how_are_you(hello_text=hello_task.output)
print(how_task)
print(how_task.output)

In [None]:
# This will give an error and ask you to pass in a built-in data type
how_task = how_are_you(hello_text=hello_task)
print(how_task)
print(how_task.output)

In [None]:
### Simple example: pipeline
@dsl.pipeline
def hello_pipeline(recipient: str) -> str:

    # notice, just recipient and not recipient.output
    hello_task = say_hello(name=recipient)

    # notice .output
    how_task = how_are_you(hello_text=hello_task.output)

    # notice .output
    return how_task.output

In [None]:
pipeline_output = hello_pipeline(recipient="Erwin")
print(pipeline_output)

In [None]:
### Pipeline with wrong return value type
@dsl.pipeline
def hello_pipeline_with_error(recipient: str) -> str:
    hello_task = say_hello(name=recipient)
    how_task = how_are_you(hello_text=hello_task.output)

    return how_task
    # returning the PipelineTask object itself will give you an error


In [None]:
compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

In [None]:
pipeline_arguments = {
    "recipient": "World!",
}

In [None]:
!cat pipeline.yaml

In [None]:
```Python
pipeline_root "./"

job = PipelineJob(
        ### path of the yaml file to execute
        template_path=template_path,
        ### name of the pipeline
        display_name=f"deep_learning_ai_pipeline-{date}",
        ### pipeline arguments (inputs)
        parameter_values=pipeline_arguments,
        ### region of execution
        location=REGION,
        ### root is where temporary files are being
        ### stored by the execution engine
        pipeline_root=pipeline_root,
        ### enable_caching=True will save the outputs
        ### of components for re-use, and will only re-run those
        ### components for which the code or data has changed.
        enable_caching=True,
)

### submit for execution
job.submit()

### check to see the status of the job
job.state
```