**Difficulty: Intermediate**

# Summary:

This example demonstrates how to build a pipeline from a mix of:
* lightweight components (functions defined here in Python code and built into components)
* off-the-shelf reusable components (defined by someone else and accessed directly from github)

In doing this, we build a **shareable** pipeline - one that you can share with others and they can rerun on a new problem without needing this notebook.

In particular, we use off-the-shelf components to enable our pipeline to write data to MinIO without needing to know how the MinIO Python package/CLI works.

This example builds on concepts from a few others - see those notebooks for more detail: 
* The problem solved here is from [Compute Pi](../mapreduce-pipeline/Compute-Pi.ipynb) 
* We use lightweight components, which have some important [quirks](../kfp-basics/demo_kfp_lightweight_components.ipynb)

In [1]:
from typing import NamedTuple

import kfp
from kfp import dsl, compiler
from kfp.components import func_to_container_op
from kfp.components import load_component_from_file

# TODO: Move utilities to a central repo
from utilities import get_minio_credentials

# Problem Description

Our task is to compute an estimate of Pi by:
1. picking some random points
1. evaluating whether the points are inside a unit circle
1. aggregating (2) to estimate pi

Our solution to this task here focuses on:
* making a fully reusable pipeline:
    * The pipeline should be sharable.  You should be able to share the pipeline by giving them the pipeline.yaml file **without** sharing this notebook
    * All user inputs are adjustable at runtime (no editing the YAML, changing hard-coded settings in the Python code, etc.)
* persisting data in MinIO
* using existing, reusable components where possible
    * Ex: rather than teach our sample function to store results in MinIO, we use an existing component to store results
    * This helps improve testability and reduces work when building new pipelines

# Pipeline pseudocode

To solve our problem, we need to: 
* Generate N random seeds
    * For each random seed, do a sample step
    * For each sample step, store the result to a location in MinIO
* Collect all sample results
* Compute pi (by averaging the results)
* Save the final result to MinIO

In pseudocode our pipeline looks like:

```python
def compute_pi(n_samples: int,
               output_location: str,
               minio_credentials, 
              ):
    seeds = create_seeds(n_samples)

    for seed in seeds:
        result = sample(seed)
        copy_to_minio(minio_credentials, result, output_location)
    
    all_sample_results = collect_all_results(minio_credentials,
                                             sample_output_location
                                            )
    
    final_result = average(all_sample_results)
    
    copy_to_minio(minio_credentials, final_result, output_location)
```

where we've pulled anything the user might want to set at runtime (the number of samples, the location in MinIO for results to be placed, and their MinIO credentials) out as pipeline arguments.

Now lets fill in all the function calls with components

# Define Components for our specific Business Logic

For any operation that is specific to our problem (for example, how we train a model, how we transform a data file, ...) we need a component that does our specific task.  These are defined below.

**NOTE:** We define the component Python code here, but you could pull these from .py files or elsewhere

## create_seeds

To create our random seeds, we define a component that takes n_samples list of n_samples seeds.  The seeds here are arbitrary (although we've defined them such that they will be unique and reproducible).

While it might feel like we could simply do this in `compute_pi()`: 
```
seeds = [i for i in range(n_samples)]
```
we cannot because n_samples is a pipeline runtime argument.  When we run this notebook to define the pipeline, n_samples is a **placeholder** rather than an actual integer.  Thus, the generation of samples must occur at pipeline runtime rather than in the pipeline definition itself.  

In [2]:
def create_seeds_func(n_samples: int) -> list:
    """
    Creates n_samples seeds and returns as a list

    Note: When used as an operation in a KF pipeline, the list is serialized
    to a string.  Can deserialize with strip and split or json package
    This sort of comma separated list will work natively with KF Pipelines'
    parallel for (we can feed this directly into a parallel for loop and it
    breaks into elements for us)

    """
    constant = 10  # just so I know something is happening
    return [constant + i for i in range(n_samples)]

By defining this function in Python first, we can test it here to make sure it works as expected (rigorous testing omitted here, but recommended for your own tasks)

In [3]:
# Very rigorous testing!
print(create_seeds_func(10))

[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


And we can then convert our tested function to a task constructor using `func_to_container_op`

In [4]:
# Define the base image our code will run from.
# This is reused in a few components
import sys
python_version_as_string = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
base_image_python = f"python:{python_version_as_string}-buster"
print(f"This example notebook was executed using python {python_version_as_string}")
print(f"Using base_image_python: {base_image_python}")

This example notebook was executed using python 3.8.8
Using base_image_python: python:3.8.8-buster


In [5]:
create_seeds_op = func_to_container_op(create_seeds_func,
                                       base_image=base_image_python,
                                       )

This task constructor `create_seeds_op` is what actually creates instances of these components in our pipeline.  

## sample

Similar to above, we a sample function and corresponding task constructor

In [6]:
def sample_func(seed: int) -> NamedTuple('Outputs', [('x', float), ('y', float), ('result', int), ('seed', int)]):
    """
    Define the "sample" pipeline operation

    Args:
        seed (int): Integer seed used for random calculation

    Returns:
        The result of this operation will be a named tuple with:
        {
             "x" : x-coordinate,
             "y" : y-coordinate,
             "result" : 4 if in unit-circle, 0 otherwise,
             "seed" : the input seed value,
        }
    """
    from collections import namedtuple
    import random
    random.seed(seed)

    print("Pick random point")
    # x,y ~ Uniform([-1,1])
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    print(f"Sample selected: ({x}, {y})")

    if (x ** 2 + y ** 2) <= 1:
        print(f"Random point is inside the unit circle")
        result = 4
    else:
        print(f"Random point is outside the unit circle")
        result = 0
    # Return output in the same structure as defined in the NamedTuple type hint
    output_spec = namedtuple("output", ("x", "y", "result", "seed"))
    output = output_spec(x, y, result, seed)
    return output

In [7]:
# (insert your testing here)

In [8]:
sample_op = func_to_container_op(sample_func,
                                 base_image=base_image_python,
                                 )

## average

In [9]:
def average_func(numbers) -> float:
    """
    Computes the average value of a JSON list of numbers, returned as a float
    """
    import json
    print(numbers)
    print(type(numbers))
    numbers = json.loads(numbers)
    return sum(numbers) / len(numbers)

In [10]:
# (and even more testing here!)

In [11]:
average_op = func_to_container_op(average_func,
                                  base_image=base_image_python,
                                  )

# Use Reusable Components for the rest

For any generic operations in our pipeline, we can reuse components that were defined by others.  In particular, interactions with MinIO here are good candidates for generic, reusable components.  

Reusable components can be loaded directly from compiled yaml files - we just have to point to them.  Think of them roughly as imported functions.  They can come from local text files, or be imported directly from github/internet.  

These yaml files are approachable - open them up to see how they work!

**TODO: Move reusables to github**

In [12]:
# Component that takes a file and puts it into minio
copy_to_minio_op = load_component_from_file('./components/copy_to_minio.yaml')

# Component that does an "mc find" operation, finding files in minio that 
# match a pattern
mc_find_op = load_component_from_file('./components/minio_find.yaml')

# Component that takes a list of files and concatenates their contents to a JSON
# list
mc_cat_files_to_json_op = load_component_from_file('./components/minio_cat_files_to_json.yaml')

# Define and Compile Pipeline

With our component constructors defined, we build our full pipeline.  Remember that while we use a Python function to define our pipeline here, anything that depends on a KFP-specific entity (an input argument, a component result, etc) is computed at runtime in kubernetes.  This means we can't do things like 
```
for seed in seeds:
    sample_op = sample_op(seed)
```
because Python would try to interpret seeds, which is a *placeholder* object for a future value, as an iterable.

In [13]:
@dsl.pipeline(
    name="Estimate Pi w/Minio",
    description="Extension of the Map-Reduce example using dynamic number of samples and Minio for storage"
)
def compute_pi(n_samples: int, output_location: str, minio_url,
               minio_access_key: str, minio_secret_key: str):
    seeds = create_seeds_op(n_samples)

    # We add the KFP RUN_ID here in the output location so that we don't
    # accidentally overwrite another run.  There's lots of ways to manage
    # data, this is just one possibility.
    this_run_output_location = f"{str(output_location).rstrip('/')}" \
                               f"/{kfp.dsl.RUN_ID_PLACEHOLDER}"

    sample_output_location = f"{this_run_output_location}/seeds"

    copy_ops = []
    with kfp.dsl.ParallelFor(seeds.output) as seed:
        sample_op_ = sample_op(seed)

        # NOTE: A current limitation of the ParallelFor loop in KFP is that it
        # does not give us an easy way to collect the results afterwards.  To
        # get around this problem, we store results in a known place in minio
        # and later glob the result files back out
        #
        # Save the result from this sample to minio in
        # ./seeds/{seed}/result.out.  We save with {seed} in the filepath to
        # prevent different paths from otherwriting each other.  Note that
        # this relies on seed being unique
        #
        # TODO: Could we do an append-to-file-in-minio and concatenate them 
        # on the fly? Would minio have issues with simultaneous writes?
        copy_sample = copy_to_minio_op(
            minio_url,
            minio_access_key,
            minio_secret_key,
            sample_op_.outputs['result'],
            f"{sample_output_location}{seed}/result.out",
        )

        # Make a list of copy_ops so we can do result collection after they finish
        copy_ops.append(copy_sample)

    # Collect all result.out files in the sample_output_location and concatenate
    # their contents as a json list
    search_pattern = r'/result.out'
    files_to_cat = mc_find_op(
        minio_url,
        minio_access_key,
        minio_secret_key,
        sample_output_location,
        search_pattern,
    )

    # files_to_cat requires all sample_ops to be done before running (all
    # results must be generated first).  Enforce this by setting files_to_cat
    # to be .after() all copy_op tasks
    for op in copy_ops:
        files_to_cat.after(op)

    all_samples = mc_cat_files_to_json_op(
        minio_url,
        minio_access_key,
        minio_secret_key,
        files_to_cat.output,
    )

    final_result = average_op(all_samples.output)

    copy_average = copy_to_minio_op(
        minio_url,
        minio_access_key,
        minio_secret_key,
        final_result.output,
        f"{this_run_output_location}/result.out",
    )

Compile our pipeline into a reusable YAML file

In [14]:
experiment_name = "compute-pi-with-reusables"
experiment_yaml_zip = experiment_name + '.zip'
compiler.Compiler().compile(
    compute_pi,
    experiment_yaml_zip
)
print(f"Exported pipeline definition to {experiment_yaml_zip}")

Exported pipeline definition to compute-pi-with-reusables.zip


# Run

Use our above pipeline definition to do our task.  Note that anything below here can be done **without** the above code.  All we need is the yaml file from the last step.  We can even do this from the Kubeflow Pipelines UI or from a terminal.

## User settings
Update the next block to match your own setup.  bucket will be your namespace (likely your firstname-lastname), and output_location is where inside the bucket you want to put your results

In [15]:
import os
bucket = os.environ['NB_NAMESPACE']
# Python Minio SDK expects bucket and output_location to be separate
output_location = bucket + "/map-reduce-output"
n_samples = 10
minio_tenant = "standard"  # probably can leave this as is

In [16]:
n_samples = 10

# Get minio credentials using a helper
minio_settings = get_minio_credentials(minio_tenant, strip_http=False)
minio_url = minio_settings["url"]
minio_access_key = minio_settings["access_key"]
minio_secret_key = minio_settings["secret_key"]

Trying to access minio credentials from:
/vault/secrets/minio-standard-tenant-1.json
Trying to access minio credentials from:
/vault/secrets/minio-standard-tenant-1.json


In [17]:
client = kfp.Client()
result = client.create_run_from_pipeline_func(
    compute_pi,
    arguments={"n_samples": n_samples,
               "output_location": output_location,
               "minio_url": minio_url,
               "minio_access_key": minio_access_key,
               "minio_secret_key": minio_secret_key,
               },
)

(Optional)

Wait for the run to complete, then print that it is done

In [18]:
wait_result = result.wait_for_run_completion(timeout=300)

In [19]:
print(f"Run {wait_result.run.id}\n\tstarted at \t{wait_result.run.created_at}\n\tfinished at \t{wait_result.run.finished_at}\n\twith status {wait_result.run.status}")

Run eb6f86cd-8117-4158-9f50-d4dd5478dd79
	started at 	2021-06-16 18:14:00+00:00
	finished at 	2021-06-16 18:15:26+00:00
	with status Succeeded
