# Combined pluvial and fluvial flood risk

In [None]:
# Import packages
import subprocess
from pathlib import Path

from hydroflows import Workflow, WorkflowConfig
from hydroflows.log import setuplog
from hydroflows.methods import discharge, fiat, rainfall, sfincs, wflow
from hydroflows.utils.example_data import fetch_data

logger = setuplog(level="INFO")

In [None]:
# Fetch the global build data
cache_dir = fetch_data(data="global-data")

In [None]:
# Define case name and root directory
name = "pluvial_fluvial_hazard"
pwd = Path().resolve()  # Get the current file location
case_root = Path(pwd, "cases", name)  # output directory

In [None]:
config = WorkflowConfig(
    # general settings
    region=Path(pwd, "data/build/region.geojson"),
    catalog_path=Path(cache_dir, "data_catalog.yml"),
    plot_fig=True,
    start_date="2014-01-01",
    end_date="2021-12-31",
    # sfincs settings
    hydromt_sfincs_config=Path(pwd, "hydromt_config/sfincs_config.yml"),
    sfincs_exe=Path(pwd, "bin/sfincs_v2.1.1/sfincs.exe"),
    # wflow settings
    hydromt_wflow_config=Path(pwd, "hydromt_config/wflow_config.yml"),
    wflow_exe=Path(pwd, "bin/wflow_v0.8.1/bin/wflow_cli.exe"),
    # fiat settings
    hydromt_fiat_config=Path(pwd, "hydromt_config/fiat_config.yml"),
    fiat_exe=Path(pwd, "bin/fiat/fiat.exe"),
    risk=True,
    # design events settings
    rps=[5, 10, 25],
)


In [None]:
# create and empty workflow
wf = Workflow(config=config, name=name, root=case_root)

In [None]:
# Build SFINCS model
sfincs_build = sfincs.SfincsBuild(
    region=wf.get_ref("$config.region"),
    config=wf.get_ref("$config.hydromt_sfincs_config"),
    sfincs_root="models/sfincs",
    catalog_path=wf.get_ref("$config.catalog_path"),
    plot_fig=wf.get_ref("$config.plot_fig"),
)
wf.add_rule(sfincs_build, rule_id="sfincs_build")

In [None]:
# Wflow build
# additional "gauges" are created at sfincs discharge source points
wflow_build = wflow.WflowBuild(
    region=sfincs_build.output.sfincs_region,
    wflow_root="models/wflow",
    config=wf.get_ref("$config.hydromt_wflow_config"),
    catalog_path=wf.get_ref("$config.catalog_path"),
    gauges=Path(sfincs_build.params.sfincs_root, "gis", "src.geojson"),
    plot_fig=wf.get_ref("$config.plot_fig"),
)
wf.add_rule(wflow_build, rule_id="wflow_build")


In [None]:
# Fiat build
fiat_build = fiat.FIATBuild(
    region=sfincs_build.output.sfincs_region,
    ground_elevation=sfincs_build.output.sfincs_subgrid_dep,
    fiat_root="models/fiat",
    catalog_path=wf.get_ref("$config.catalog_path"),
    config=wf.get_ref("$config.hydromt_fiat_config"),
)
wf.add_rule(fiat_build, rule_id="fiat_build")


In [None]:
# Update and run wflow & generate fluvial design events by postprocessing wflow output
# Update forcing
wflow_update = wflow.WflowUpdateForcing(
    wflow_toml=wflow_build.output.wflow_toml,
    catalog_path=wf.get_ref("$config.catalog_path"),
    start_time=wf.get_ref("$config.start_date"),
    end_time=wf.get_ref("$config.end_date"),
)
wf.add_rule(wflow_update, rule_id="wflow_update")

# Run wflow
wflow_run = wflow.WflowRun(
    wflow_toml=wflow_update.output.wflow_out_toml,
    wflow_bin=wf.get_ref("$config.wflow_exe"),
)
wf.add_rule(wflow_run, rule_id="wflow_run")

# Generate fluvial events
fluvial_events = discharge.FluvialDesignEvents(
    discharge_nc=wflow_run.output.wflow_output_timeseries,
    rps=wf.get_ref("$config.rps"),
    wildcard="fluvial_events",
    event_root="data/events",
    index_dim="Q_gauges_bounds",
)
wf.add_rule(fluvial_events, rule_id="fluvial_events")

In [None]:
# Pluvial events
# Get the data
pluvial_data = rainfall.GetERA5Rainfall(
    region=sfincs_build.output.sfincs_region,
    data_root="data/era5",
    start_date=wf.get_ref("$config.start_date"),
    end_date=wf.get_ref("$config.end_date"),
)
wf.add_rule(pluvial_data, rule_id="pluvial_data")
precip_nc = pluvial_data.output.precip_nc

# Actual derivation of events based on precip
pluvial_events = rainfall.PluvialDesignEvents(
    precip_nc=precip_nc,
    rps=wf.get_ref("$config.rps"),
    wildcard="pluvial_events",
    event_root="data/events",
)
wf.add_rule(pluvial_events, rule_id="pluvial_events")

In [None]:
# Combine fluvial and pluvial events into one set in order to run SFINCS for all in parallel
all_events = wf.wildcards.get("pluvial_events") + wf.wildcards.get("fluvial_events")
wf.wildcards.set("all_events", all_events)


In [None]:
# Updating, running and postprocessing SFINCS model for combined events
# Sfincs update with precip
sfincs_update = sfincs.SfincsUpdateForcing(
    sfincs_inp=sfincs_build.output.sfincs_inp,
    event_yaml="data/events/{all_events}.yml",
)
wf.add_rule(sfincs_update, rule_id="sfincs_update")

# Run SFINCS model
sfincs_run = sfincs.SfincsRun(
    sfincs_inp=sfincs_update.output.sfincs_out_inp,
    sfincs_exe=wf.get_ref("$config.sfincs_exe"),
)
wf.add_rule(sfincs_run, rule_id="sfincs_run")

# Postprocesses SFINCS results
sfincs_post = sfincs.SfincsPostprocess(
    sfincs_map=sfincs_run.output.sfincs_map,
    event_name="{all_events}",
)
wf.add_rule(sfincs_post, rule_id="sfincs_post")

In [None]:
# Update and run FIAT for both event sets
# Set the combined event_set of both fluvial and pluvial
wf.wildcards.set("event_set", ["fluvial_design_events", "pluvial_design_events"])

# Update hazard
fiat_update = fiat.FIATUpdateHazard(
    fiat_cfg=fiat_build.output.fiat_cfg,
    event_set_yaml="data/events/{event_set}.yml",
    map_type="water_level",
    hazard_maps=sfincs_post.output.sfincs_zsmax,
    risk=wf.get_ref("$config.risk"),
)
wf.add_rule(fiat_update, rule_id="fiat_update")

# Run FIAT
fiat_run = fiat.FIATRun(
    fiat_cfg=fiat_update.output.fiat_out_cfg,
    fiat_exe=wf.get_ref("$config.fiat_exe"),
)
wf.add_rule(fiat_run, rule_id="fiat_run")

In [None]:
# Test the workflow
wf.dryrun()

In [None]:
# Write the workflow to a Snakefile
wf.to_snakemake()

# show the files in the case directory
print(f"{wf.root.relative_to(pwd)}:")
for f in wf.root.iterdir():
    print(f"- {f.name}")

In [None]:
from IPython.display import SVG

# (test) run the workflow with snakemake and visualize the directed acyclic graph
# make sure to have snakemake installed in your environment
subprocess.run('snakemake --dag | dot -Tsvg > dag.svg', cwd=wf.root, shell=True).check_returncode()

# show the dag
SVG(Path(wf.root, "dag.svg").as_posix())

In [None]:
# uncomment to run the workflow
# subprocess.run(["snakemake", "-c", "1"], cwd=wf.root)