# Example of Creating a Pipeline

In [6]:
import kfp
from kfp import dsl
# kfp-server-api==0.1.18.3

In [2]:

def gcs_download_op(url):
    return dsl.ContainerOp(
        name='GCS - Download',
        image='google/cloud-sdk:272.0.0',
        command=['sh', '-c'],
        arguments=['gsutil cat $0 | tee $1', url, '/tmp/results.txt'],
        file_outputs={
            'data': '/tmp/results.txt',
        }
    )


def echo2_op(text1, text2):
    return dsl.ContainerOp(
        name='echo',
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['echo "Text 1: $0"; echo "Text 2: $1"', text1, text2]
    )


@dsl.pipeline(
  name='Parallel pipeline',
  description='Download two messages in parallel and prints the concatenated result.'
)
def download_and_join(
    url1='gs://ml-pipeline-playground/shakespeare1.txt',
    url2='gs://ml-pipeline-playground/shakespeare2.txt'
):
    """A three-step pipeline with first two running in parallel."""

    download1_task = gcs_download_op(url1)
    download2_task = gcs_download_op(url2)

    echo_task = echo2_op(download1_task.output, download2_task.output)


* Docomentation: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html

In [4]:
# creates pipeline package which can be uploaded to pipelines.
kfp.compiler.Compiler().compile(download_and_join, 'test3.tar.gz')

In [5]:
kfp.Client().upload_pipeline('test3.tar.gz', pipeline_name='MyFirstPipeline223')

{'created_at': datetime.datetime(2020, 4, 8, 17, 58, 21, tzinfo=tzlocal()),
 'description': None,
 'error': None,
 'id': '51699106-36de-4d96-9ebb-6fd1eaf09e3b',
 'name': 'MyFirstPipeline223',
 'parameters': [{'name': 'url1',
                 'value': 'gs://ml-pipeline-playground/shakespeare1.txt'},
                {'name': 'url2',
                 'value': 'gs://ml-pipeline-playground/shakespeare2.txt'}],
 'url': None}

In [14]:
#Submit a pipeline run directly (does not create template pipeline in contrast to upload_pipeline)
kfp.Client().create_run_from_pipeline_func(download_and_join, arguments={}, run_name=None, experiment_name='bleepbloop')

RunPipelineResult(run_id=54376811-f0f2-47ed-ae05-7eea139a8a3f)