## Build Pipeline

In [2]:
import kfp 
import kfp.components as comp

In [3]:
import glob
import pandas as pd
import tarfile
import urllib.request

In [6]:
def download_and_merge_csv(url: str, output_csv: str):
    with urllib.request.urlopen(url) as res:
        tarfile.open(fileobj=res, mode="r|gz").extractall('data')
        
    df = pd.concat(
    [
        pd.read_csv(csv_file, header=None) for csv_file in glob.glob('data/*.csv')
    ]
    )
    
    df.to_csv(output_csv, index=False, header=None)

In [7]:
#test the function

download_and_merge_csv(
    url='https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz',
    output_csv="merged_data.csv"
)

In [9]:
def merge_csv(file_path: comp.InputPath('Tarball'),
              output_csv: comp.OutputPath('CSV')):
    import glob
    import pandas as pd
    import tarfile
    
    tarfile.open(nam=file_path, model="r|gz").extractall('data')
    df = pd.concat(
    [
        pd.read_csv(csv_file, header=None) for csv_file in glob.glob('data/*.csv')
    ])
    
    df.to_csv(output_csv, index=False, header=False)

In [11]:
create_step_merge_csv = kfp.components.create_component_from_func(
    func=merge_csv,
    output_component_file='component.yaml',
    base_image='python:3.7',
    packages_to_install=['pandas==1.1.4']
)

In [14]:
#build pipeline
web_downloader_op = kfp.components.load_component_from_url(
    "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component-sdk-v2.yaml"
)

In [17]:
def my_pipeline(url):
    web_downloader_task = web_downloader_op(url=url)
    merge_csv_task = create_step_merge_csv(file=web_downloader_task.outputs['data'])

In [18]:
#complie and run pipeline

# option1: compile and then run in UI
#https://www.kubeflow.org/docs/components/pipelines/overview/quickstart/
kfp.compiler.Compiler().compile(
    pipeline_func=my_pipeline,
    package_path='pipeline.yaml'
)

In [19]:
#option2 : using kubeflow pipeline sdk client

client = kfp.Client()

In [20]:
client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
    }
)

RunPipelineResult(run_id=2b765abd-4a78-4a98-9550-1e2cd06c1fca)