In [None]:
from dotenv import load_dotenv
import os
load_dotenv("s3.env")
from hera.workflows import models, CronWorkflow, Workflow, script, Artifact, Parameter, DAG, Steps, Step, NoneArchiveStrategy
from hera.shared import global_config

global_config.host = "https://dev.services.eodc.eu/workflows/"
global_config.namespace = "default"
global_config.token = os.getenv("argo_token")
global_config.image = "ghcr.io/oscipal/image_zarr:latest"

security_context = {"runAsUser": 74268,
                    "runAsGroup": 71473}

nfs_volume = [models.Volume(
    name="eodc-mount",
    persistent_volume_claim={"claimName": "eodc-nfs-claim"},
    )]

In [None]:
@script(volume_mounts=[models.VolumeMount(name="eodc-mount", mount_path="/eodc")])

def add_timestamps2zarr(store_path: str = "/eodc/private/openeo_platform/zarr_nacho/INCA_test.zarr"):
    import datetime
    import numpy as np
    import zarr

    now = datetime.datetime.now()
    now_np = np.datetime64(now).astype('datetime64[h]')
    origin = np.datetime64("2011-03-15T00:00:00").astype("datetime64[h]")

    new_shape = int((now_np-origin).astype(int))
    new_extent = np.arange(0,new_shape,1)

    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)

    array_names=set(group.array_keys())
    coords = {"time", "x", "y"}
    data_arrays = array_names-coords

    group["time"].resize(new_shape)
    for array in data_arrays:
        group_shape  = group[array].shape
        group[array].resize((new_shape, group_shape[1], group_shape[2]))

    zarr.consolidate_metadata(store)
    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)

    group["time"][:]=new_extent


In [None]:
@script(outputs=Artifact(name="inca-file", path="/tmp/INCA_{{inputs.parameters.variable}}.nc", archive=NoneArchiveStrategy()))

def inca_download(variable: str):
    from urllib.request import urlretrieve
    import datetime

    ym = (datetime.date.today()-datetime.timedelta(days=8)).strftime("%Y%m")
    print(ym)
    url = f"https://public.hub.geosphere.at/datahub/resources/inca-v1-1h-1km/filelisting/{variable}/INCAL_HOURLY_{variable}_{ym}.nc"
    urlretrieve(url, f"/tmp/INCA_{variable}.nc")

In [None]:
@script(inputs=Artifact(name="inca-file", path="/tmp/INCA_{{inputs.parameters.variable}}.nc"),
        volume_mounts=[models.VolumeMount(name="eodc-mount", mount_path="/eodc")])

def inca_write(variable: str, store_path: str="/eodc/private/openeo_platform/zarr_nacho/INCA_test.zarr"):
    import xarray as xr
    import numpy as np
    import zarr

    artifact_path = f"/tmp/INCA_{variable}.nc"

    def get_idx(array1, array2):
        min_idx = np.where(array1 == array2[0])[0][0]
        max_idx = np.where(array1 == array2[-1])[0][0] + 1
        return min_idx, max_idx

    data = xr.open_dataset(artifact_path, mask_and_scale=False).load()

    store = zarr.storage.LocalStore(store_path)
    group = zarr.group(store=store)
    x_extent = group["x"][:]
    y_extent = group["y"][:]

    x_min, x_max = get_idx(x_extent, data["x"].values)
    y_min, y_max = get_idx(y_extent, data["y"].values)

    origin = np.datetime64("2011-03-15T00:00:00").astype("datetime64[h]")
    time_min, time_max = data.time.values[0].astype("datetime64[h]"), data.time.values[-1].astype("datetime64[h]") + 1
    time_delta_min, time_delta_max = (time_min - origin).astype("int64"), (time_max - origin).astype("int64")

    group[variable][time_delta_min:time_delta_max, y_min:y_max, x_min:x_max] = data[variable].values

In [None]:
items = ["RR", "T2M", "TD2M", "P0", "UU", "VV", "RH2M", "GL"]

with CronWorkflow(
    generate_name="inca-zarr-",
    schedule="58 9 * * *",
    volumes = nfs_volume,
    security_context=security_context,
    entrypoint="workflow"
) as w:
    with DAG(name="pipeline", inputs=[Parameter(name="item")]) as pipeline:
        
        download = inca_download(arguments={"variable":"{{inputs.parameters.item}}"},
                                 )

        process = inca_write(arguments=[{"variable": "{{inputs.parameters.item}}"}, 
                                        download.get_artifact("inca-file").with_name("inca-file")],
                                        )

        download >> process

    with Steps(name="workflow"):
        add_timestamps2zarr()
        Step(name="parallel-pipelines", template=pipeline, with_param=items, arguments={"item":"{{item}}"})

In [None]:
with open("hera_workflow.yaml", "w") as f:
    f.write(w.to_yaml())

In [None]:
w.create()