# Summary

Uses the OpenAddresses tooling to 

* download data based on a user-defined JSON source specification
* save the data in a user-defined location in MinIO

Usage: 

* Edit the below settings and run notebook

In [1]:
import json

import kfp
from kfp.components import load_component_from_file
from kfp.compiler import Compiler
from kfp import dsl

# Component/Pipeline definitions

In [2]:
openaddresses_get_op = load_component_from_file(
    "./components/openaddresses_get_data.yaml"
)
copy_to_minio_op = load_component_from_file(
    "./components/copy_to_minio.yaml"
)

In [3]:
@dsl.pipeline(
    name="Download OpenAddresses Data to Minio"
)
def pipeline(
    source_json,
    minio_output_uri: str,
    # TODO: Handle these automatically once multitenancy is available
    minio_url,
    minio_access_key: str,
    minio_secret_key: str,
    openaddresses_args: str = "",
):
    operations = {}

    operations['Get Data'] = openaddresses_get_op(
        source_json=source_json,
        args=openaddresses_args,
    ).set_image_pull_policy("Always").set_cpu_request('1000m').set_cpu_limit('1000m').set_memory_request('2G').set_memory_limit('2G')

    operations['Store Data'] = copy_to_minio_op(
        local_source=operations['Get Data'].outputs['data'],
        minio_destination=minio_output_uri,
        minio_url=minio_url,
        minio_access_key=minio_access_key,
        minio_secret_key=minio_secret_key,
        flags="--recursive",  # Because outputs['data'] is a directory
    ).set_cpu_request('1000m').set_cpu_limit('1000m').set_memory_request('2G').set_memory_limit('2G')
    # Set all operations display names to their key in the operations dict
    for name, op in operations.items():
        op.set_display_name(name)

# Compile pipeline into yaml file

In [4]:
Compiler().compile(pipeline_func=pipeline, package_path='OpenAddressesPipeline.tar.gz')

In [5]:
## Upload pipeline to server

In [7]:
kfp.Client().upload_pipeline(pipeline_package_path='OpenAddressesPipeline.tar.gz' , pipeline_name = "OpenAddressesPipeline_1CPU", description = "Read in json input file to run OpenAddresses processing scripts to download and process address data")

{'created_at': datetime.datetime(2021, 1, 14, 20, 41, 40, tzinfo=tzlocal()),
 'default_version': {'code_source_url': None,
                     'created_at': datetime.datetime(2021, 1, 14, 20, 41, 40, tzinfo=tzlocal()),
                     'id': '9a0c0248-0448-4b59-9345-57581d2bbb7c',
                     'name': 'OpenAddressesPipeline_1CPU',
                     'package_url': None,
                     'parameters': [{'name': 'source_json', 'value': None},
                                    {'name': 'minio_output_uri', 'value': None},
                                    {'name': 'minio_url', 'value': None},
                                    {'name': 'minio_access_key', 'value': None},
                                    {'name': 'minio_secret_key', 'value': None},
                                    {'name': 'openaddresses_args',
                                     'value': None}],
                     'resource_references': [{'key': {'id': '9a0c0248-0448-4b59-9345-57581d2bbb7c'