In [1]:
import kfp
from kfp.components import load_component_from_url
import datetime

# Settings

In [2]:
n_parallel_writes = 10
filesize_per_write = "200M"  # (in fallocate -l format, eg 200M, 3G, ...)

# Define components and pipeline

In [3]:
create_junk = load_component_from_url("https://raw.githubusercontent.com/ca-scribner/kubeflow-pipelines-components/master/components/files/create_junk/component.yaml")
put_to_minio = load_component_from_url("https://raw.githubusercontent.com/ca-scribner/kubeflow-pipelines-components/master/components/minio/put_to_minio/component.yaml")

In [4]:
@kfp.components.create_component_from_func
def range_component(start: int=0, stop: int=None, step: int=1) -> list:
    """Operates like Python's range, but in kfp as a component"""
    if stop is None:
        raise ValueError(f"Invalid value '{stop}' for stop")
    return list(range(int(start), int(stop), step))

In [5]:
def get_current_namespace():
    import kubernetes
    kubernetes.config.load_incluster_config()
    kube_client = kubernetes.client.CoreV1Api()
    current_namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
    return current_namespace
namespace = get_current_namespace()

In [9]:
def pipeline(n_writes: int=10, filesize_per_write: str="200M"):
    data = create_junk(size=filesize_per_write)

    iterations = range_component(stop=n_writes)

    with kfp.dsl.ParallelFor(iterations.output) as iteration:
        put_to_minio(
            source=data.output,
            target=f'{namespace}/debug_minio/{kfp.dsl.RUN_ID_PLACEHOLDER}/{iteration}',
            tenant='standard',
        )

# Run pipeline

In [10]:
c = kfp.Client()

In [11]:
now = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
c.create_run_from_pipeline_func(
    pipeline,
    arguments={'n_writes': n_parallel_writes, 'filesize_per_write': filesize_per_write},
    run_name=f'MinIO Parallel Writes Test ({namespace}/{now})',
    experiment_name=f'MinIO Parallel Writes Test',
    namespace=namespace

)

RunPipelineResult(run_id=8e0069d6-3409-40ef-bafe-536798d72167)