# Parallel execution

In [None]:
import digitalhub as dh

p_name = "cdt-parallelization"
project = dh.get_or_create_project(p_name)

# Parallelize only scenarios

In [None]:
func = project.new_function(name="run",
                            kind="python",
                            python_version="PYTHON3_12",
                            code_src="parallel_run.py",
                            requirements=["git+https://github.com/fbk-most/civic-digital-twins@feat-parallelization"],
                            handler="main")

In [None]:
func = project.get_function("run")

In [None]:
scenarios = {
    "Base": {},
    "GoodWeather": {"CV_weather": ["good", "unsettled"]},
    "BadWeather": {"CV_weather": ["bad"]},
}

for name, config in scenarios.items():
    run = func.run("job",
                   parameters={"name": name, "config": config},
                   wait=False)

# Run DAG - both scenario and ensembles in parallel

In [None]:
with open("parallel_workflow.py") as f:
    workflow_source = f.read()

wf = project.new_workflow(
    name="cdt-dag",
    kind="hera",
    source={
        "code": workflow_source,
        "lang": "python",
        "handler": "handler",
        "requirements": ["git+https://github.com/fbk-most/civic-digital-twins@feat-parallelization"],
    },
)

print("Workflow saved successfully!")

In [None]:
wf = project.get_workflow("cdt-dag")

In [None]:
import json

scenarios = {
    "Base": {},
    "GoodWeather": {"CV_weather": ["good", "unsettled"]},
    "BadWeather": {"CV_weather": ["bad"]},
}

# build your ensemble here
# e.g. a list of (weight, cvs) pairs
ensemble = [
    (1/20, {"CV_weather": "good"}) for _ in range(20)
]


run = wf.run(
    action="build",  # first build the workflow
    parameters={}
)

run2 = wf.run(
    action="pipeline",
    parameters={
        "SCENARIOS_JSON": json.dumps(scenarios),
        "ENSEMBLE_JSON": json.dumps(ensemble),
        "USE_BATCHING": "true",       # make sure these are strings if your workflow reads them from env
        "BATCH_SIZE": "5",
    },
    wait=False  # or True if you want to wait for the DAG to finish
)

print("Started pipeline run:", run2.id)