# Summary

Uses the OpenAddresses tooling to 

* download data based on a user-defined JSON source specification
* save the data in a user-defined location in MinIO

Usage: 

* Edit the below settings and run notebook

In [None]:
import json

import kfp
from kfp.components import load_component_from_file
from kfp import dsl

# User settings:

In [None]:
minio_bucket_name = 'FIRSTNAME-LASTNAME'
json_source_file = "myfile.json"
minio_tenant = 'minimal'
openaddresses_args = "--layer addresses --layersource city"
minio_output_uri = f'{minio_bucket_name}/path/to/storage/location'

# Additional Inputs for Debugging/Testing:

If testing/debugging, you can use the following demo JSON file.  

You can also use this method to write your own JSON file from the notebook if that makes sense for your workflow. 

In [None]:
# %%writefile $json_source_file
# {
#     "coverage": {
#         "country": "ca",
#         "state": "ab",
#         "city": "Calgary",
#         "geometry": {
#             "type": "Point",
#             "coordinates": [
#                 -114.08,
#                 51.08
#             ]
#         }
#     },
#     "schema": 2,
#     "layers": {
#         "addresses": [
#             {
#                 "name": "city",
#                 "data": "https://data.calgary.ca/api/views/uwj2-d2wc/rows.csv?accessType=DOWNLOAD",
#                 "website": "https://data.calgary.ca/",
#                 "license": {
#                     "url": "https://data.calgary.ca/stories/s/Open-Calgary-Terms-of-Use/u45n-7awa",
#                     "text": "Contains information licensed under the Open Government Licence – City of Calgary.",
#                     "attribution name": "City of Calgary"
#                 },
#                 "protocol": "http",
#                 "conform": {
#                     "format": "csv",
#                     "lat": "latitude",
#                     "lon": "longitude",
#                     "number": {
#                         "function": "join",
#                         "fields": [
#                             "HOUSE_NUMBER",
#                             "HOUSE_ALPHA"
#                         ],
#                         "separator": ""
#                     },
#                     "street": [
#                         "STREET_NAME",
#                         "STREET_TYPE",
#                         "STREET_QUAD"
#                     ],
#                     "str_name": "STREET_NAME",
#                     "str_type": "STREET_TYPE",
#                     "str_dir": "STREET_QUAD",
#                     "full_addr": "ADDRESS"
#                 },
#                 "attribution": "City of Calgary"
#             }
#         ]
#     }
# }

# Component/Pipeline definitions

In [None]:
openaddresses_get_op = load_component_from_file(
    "./components/openaddresses_get_data.yaml"
)
copy_to_minio_op = load_component_from_file(
    "./components/copy_to_minio.yaml"
)

In [None]:
@dsl.pipeline(
    name="Download OpenAddresses Data to Minio"
)
def pipeline(
    source_json,
    minio_output_uri: str,
    # TODO: Handle these automatically once multitenancy is available
    minio_url,
    minio_access_key: str,
    minio_secret_key: str,
    openaddresses_args: str = "",
):
    operations = {}

    operations['Get Data'] = openaddresses_get_op(
        source_json=source_json,
        args=openaddresses_args,
    ).set_image_pull_policy("Always")

    operations['Store Data'] = copy_to_minio_op(
        local_source=operations['Get Data'].outputs['data'],
        minio_destination=minio_output_uri,
        minio_url=minio_url,
        minio_access_key=minio_access_key,
        minio_secret_key=minio_secret_key,
        flags="--recursive",  # Because outputs['data'] is a directory
    )
    # Set all operations display names to their key in the operations dict
    for name, op in operations.items():
        op.set_display_name(name)

# Build pipeline arguments

## Load the JSON source file

In [None]:
with open(json_source_file, 'r') as fin:
    source_json = json.load(fin)

## Get MinIO credentials from the Notebook Server (could also specify these things manually)

In [None]:
# Get minio credentials using a helper
from utilities import get_minio_credentials

minio_settings = get_minio_credentials(minio_tenant, strip_http=False)
minio_url = minio_settings["url"]
minio_access_key = minio_settings["access_key"]
minio_secret_key = minio_settings["secret_key"]

arguments = dict(
    source_json=json.dumps(source_json),
    openaddresses_args=openaddresses_args,
    minio_output_uri=minio_output_uri,
    minio_url=minio_url,
    minio_access_key=minio_access_key,
    minio_secret_key=minio_secret_key,
)

# Run the pipeline

In [None]:
pipeline_run = kfp.Client().create_run_from_pipeline_func(
    pipeline,
    arguments=arguments,
    run_name="openaddresses-get-store-data"
)