### Creating an ArgoWorkflow with hera for INCA

This notebook shows how to create an ArgoWorkflow to write data to a zarr store for INCA data. The created workflow will write new data to the store once a day. To learn more about Hera visit https://hera.readthedocs.io/en/v4/index.html and https://hera.readthedocs.io/en/stable/

### Necessary imports

In [20]:
from dotenv import load_dotenv
import os
from hera.workflows import models, CronWorkflow, script, Artifact, Parameter, DAG, Steps, Step, NoneArchiveStrategy, Workflow
from hera.shared import global_config

load_dotenv("/home/otto/s1_zarr/.env")

True

### Global Settings

First the host, token, namespace and used image are set globally, as they stay don't change for different steps in this notebook. You can look at on how to create an image. It should have all the depencies you need to execute your scripts in the workflow.

In [21]:
global_config.host = "https://services.eodc.eu/workflows/"
global_config.namespace = "inca"
global_config.token = os.getenv("argo_token_prod")
global_config.image = "ghcr.io/oscipal/image_zarr:latest"

To get access to the EODC NFS, you need to define how the volume is is accessed. This is not default for all namespaces and might have to be set up. To have write access you need to define the correct security access. The runAsUser value is the UserID of the folder you want to write to, and the runAsGroup value the GroupID.

In [22]:
nfs_volume = [models.Volume(
    name="eodc-mount",
    persistent_volume_claim={"claimName": "eodc-nfs-claim"},
    )]

security_context = {"runAsUser": 74268,
                    "runAsGroup": 71473}

### Scripts

There are four ways to store data in a Workflow, *empty directories*, *NFS*, *artifacts* and *OpenStack via Cinder CSI*. For this case we will only go into artifact and NFS storage options. Data stored as an Artifact can be passed between pods in the workflow but artifact storage is only temporary meaning the file will be deleted after completion of the workflow.

You can write python scripts you want to use in your workflow under the `@script` decorator. You have to define if you use any storage options, and any inputs and outputs you want to use in the decorator. But more in that later.

As a first step we want to extend the time dimension in the zarr store to be able to append new data. We need to access the NFS to have access to the zarr store, so we have to define the volume mount in the `@script` decorator.

In [23]:
@script(volume_mounts=[models.VolumeMount(name="eodc-mount", mount_path="/eodc")])

def extend_time_dimension(store_path: str = "/eodc/products/eodc/geosphere_inca/INCA.zarr"):
    import datetime
    import numpy as np
    import zarr

    now = datetime.datetime.now()
    now_np = np.datetime64(now).astype('datetime64[h]')
    origin = np.datetime64("2011-03-15T00:00:00").astype("datetime64[h]")

    new_shape = int((now_np-origin).astype(int))
    new_extent = np.arange(0,new_shape,1)

    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)

    array_names=set(group.array_keys())
    coords = {"time", "x", "y"}
    data_arrays = array_names-coords

    group["time"].resize(new_shape)
    for array in data_arrays:
        group_shape  = group[array].shape
        group[array].resize((new_shape, group_shape[1], group_shape[2]))

    zarr.consolidate_metadata(store)
    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)

    group["time"][:]=new_extent


Next we want to download data. We don't need to access the NFS for this step, but we do need to create an Artifact to pass the downloaded file to the next step. Again, this has to be defined in the `@script` decorator, as it is an output of the step it is defined as `output`. The name of the Artifact is like a variable name and can be freely chosen, it just needs to be correctly referenced when creating the workflow, more on that later. The path to the file has to be under `/tmp`, multiple files can also be downloaded to a folder in this path. Use *NoneArchiveStrategy()* to not compress it to .tgz format as this can cause issues. 

In [None]:
@script(outputs=Artifact(name="inca-file", path="/tmp/INCA.nc", archive=NoneArchiveStrategy()))

def inca_download(param: str):
    from urllib.request import urlretrieve
    import datetime

    ym = (datetime.date.today()-datetime.timedelta(days=1)).strftime("%Y%m")
    print(ym)
    url = f"https://public.hub.geosphere.at/datahub/resources/inca-v1-1h-1km/filelisting/{param}/INCAL_HOURLY_{param}_{ym}.nc"
    urlretrieve(url, f"/tmp/INCA.nc")

As a last step we want to write the data to the zarr store on the NFS. So we need to mount the NFS like in the first step, and, as we also need the artifact from the download step, we need to pass this under the `inputs` parameter. The name of the artifact does not have to be the same as in the download step. 

In [25]:
@script(inputs=Artifact(name="inca-file", path="/tmp/INCA.nc"),
        volume_mounts=[models.VolumeMount(name="eodc-mount", mount_path="/eodc")])

def inca_write(param: str, store_path: str="/eodc/products/eodc/geosphere_inca/INCA.zarr"):
    import xarray as xr
    import numpy as np
    import zarr
    import pandas as pd

    artifact_path = f"/tmp/INCA.nc"

    def get_idx(array1, array2):
        min_idx = np.where(array1 == array2[0])[0][0]
        max_idx = np.where(array1 == array2[-1])[0][0] + 1
        return min_idx, max_idx

    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)

    dtype = group[param].dtype
    fill_value = group[param].attrs.get('_FillValue')
    freq = group.attrs.get('freq')

    x_extent = group["x"][:]
    y_extent = group["y"][:]
    origin = xr.open_zarr(store_path).time[0].values

    data = xr.open_dataset(artifact_path, mask_and_scale=False).load()

    x_min, x_max = get_idx(x_extent, data["x"].values)
    y_min, y_max = get_idx(y_extent, data["y"].values)

    time_min, time_max = data.time.values[0], data.time.values[-1]
    time_delta_min, time_delta_max = (time_min.astype("datetime64[h]") - origin.astype("datetime64[h]")).astype("int64"), (time_max.astype("datetime64[h]") - origin.astype("datetime64[h]")).astype("int64")+1

    full_range = pd.date_range(time_min, time_max, freq=freq).values.astype("datetime64[ns]")

    for value in data.time.values:
        if value in set(full_range):
            continue
        else:
            empty_array = np.full((full_range.shape[0], data["x"].values.shape[0], data["y"].values.shape[0]),
                                fill_value=fill_value, dtype=dtype)

            template = xr.Dataset({f"{param}": (("time", "x", "y"), empty_array)},
                                  coords={
                                    "time": full_range,
                                    "x": data["x"].values,
                                    "y": data["y"].values
                                  }
                                  )

            data = data.combine_first(template)
            break

    group[param][time_delta_min:time_delta_max, y_min:y_max, x_min:x_max] = data[param].values

### Creating the workflow

As the scripts only act as template Workflows we still need to define a workflow to let Argo know how to execute the scripts. 

As we have 8 different parameter for our INCA data we want the processing for them to be done in parallel. The syntax for Hera can be quite confusing, but the code below creates a CronWorkflow to achieve exactly what we want.

In [None]:
inca_parameters = ["RR", "T2M", "TD2M", "P0", "UU", "VV", "RH2M", "GL"]

# At first a CronWorkflow is created with the necessary inputs.
with CronWorkflow(
    generate_name="inca-zarr-",
    schedule="0 6 * * *",
    volumes = nfs_volume,
    security_context=security_context,
    entrypoint="workflow"
) as w:
    
    # Secondly, a DAG is defined which shall be executed for each parameter. The inputs are defined in the Steps below. So this DAG acts like a function being defined and executed in a different step.
    with DAG(name="pipeline", inputs=[Parameter(name="args")]) as pipeline:
        
        # The arguments for the scripts are passed as a dictionary, wherease the arguments for the 'param' parameter are taken from the input of the DAG
        download = inca_download(arguments={"param":"{{inputs.parameters.args}}"},)

        # The Artifact written with download also has to be given to the function.
        process = inca_write(arguments=[{"param": "{{inputs.parameters.args}}"}, 
                                        download.get_artifact("inca-file").with_name("inca-file")],)

        # Here the sequence of the steps is defined
        download >> process

    # As we defined "workflow" as the entrypoint in the CronWorkflow this part gets executed first, in contrast to DAGs Steps will be executed in the order they are in
    with Steps(name="workflow"):
        # First the time dimension is extended in the zarr store
        extend_time_dimension()

        # Now the DAG is executed, it is used as a template, passing the inca_parameters as with_param and using "{{item}}" in arguments the DAG will be executed parallel for each parameter.
        Step(name="parallel-pipelines", template=pipeline, with_param=inca_parameters, arguments={"args":"{{item}}"})

The Workflow can be written to a yaml

In [8]:
with open("INCA_workflow.yaml", "w") as f:
    f.write(w.to_yaml())

Or passed directly to ArgoWorkflows

In [29]:
w.create()

CronWorkflow(api_version=None, kind=None, metadata=ObjectMeta(annotations=None, cluster_name=None, creation_timestamp=Time(__root__=datetime.datetime(2025, 8, 5, 6, 15, 2, tzinfo=datetime.timezone.utc)), deletion_grace_period_seconds=None, deletion_timestamp=None, finalizers=None, generate_name='inca-monthly-', generation=1, labels={'workflows.argoproj.io/creator': 'system-serviceaccount-default-jenkins'}, managed_fields=[ManagedFieldsEntry(api_version='argoproj.io/v1alpha1', fields_type='FieldsV1', fields_v1=FieldsV1(), manager='argo', operation='Update', subresource=None, time=Time(__root__=datetime.datetime(2025, 8, 5, 6, 15, 2, tzinfo=datetime.timezone.utc)))], name='inca-monthly-ws44l', namespace='inca', owner_references=None, resource_version='37097073', self_link=None, uid='36f6b29b-09e7-4204-8134-8e0c13240a52'), spec=CronWorkflowSpec(concurrency_policy=None, failed_jobs_history_limit=None, schedule='0 6 10 * *', starting_deadline_seconds=None, successful_jobs_history_limit=None