In [1]:
!pip install "kfp<2.0" --upgrade -q

In [2]:
import kfp
import kfp.components as comp
from kfp import dsl

import glob
import pandas as pd
import tarfile
import urllib.request

In [3]:
def merge_csv(file_path: comp.InputPath('Tarball'),
              output_csv: comp.OutputPath('CSV')):
    import glob
    import pandas as pd
    import tarfile

    tarfile.open(name=file_path, mode="r|gz").extractall('data')
    df = pd.concat(
        [pd.read_csv(csv_file, header=None)for csv_file in glob.glob('data/*.csv')])
    df.to_csv(output_csv, index=False, header=False)

In [4]:
create_step_merge_csv = kfp.components.create_component_from_func(
        func=merge_csv,
        output_component_file='component.yaml', # This is optional. It saves the component spec for future use.
        base_image='python:3.7',
        packages_to_install=['pandas==1.1.4'])

In [5]:
web_downloader_op = kfp.components.load_component_from_url(
           'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')

In [6]:
from kubernetes.client.models import V1Toleration, V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement

@dsl.pipeline(
    name="base_pipeline",
    description="my base pipeline",
)
def my_pipeline(url):
    web_downloader_task = web_downloader_op(url=url)
    merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data'])

    toleration = V1Toleration(
        key="kubernetes.azure.com/scalesetpriority",
        operator="Equal",
        value="spot",
        effect="NoSchedule",
    )
    web_downloader_task.add_toleration(toleration)
    merge_csv_task.add_toleration(toleration)

    aff = V1Affinity(node_affinity=V1NodeAffinity(
        required_during_scheduling_ignored_during_execution=V1NodeSelector(
            node_selector_terms=[V1NodeSelectorTerm(
                match_expressions=[V1NodeSelectorRequirement(
                    key='kubernetes.azure.com/scalesetpriority',
                    operator='In',
                    values=['spot'])])]))
    )
    web_downloader_task.add_affinity(aff)
    merge_csv_task.add_affinity(aff)
    
    web_downloader_task.set_retry(num_retries=5, backoff_duration="5m")
    merge_csv_task.set_retry(num_retries=5, backoff_duration="5m")

In [7]:
client = kfp.Client()

In [8]:
client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
    })

RunPipelineResult(run_id=fd871f30-d0b8-48b6-8df8-dce561649372)