# Combine data layers

1. Get elevation relative to sea level
2. Match to nearest country, impact region, protection zone (e.g. levees)
3. Uniformly distribute exposure over all surface area > 0 elevation within a 30" pixel
4. Aggregate both surface area and exposure up to adm1 X coastal segment X protection zone X wetland flag X .1-meter elevation bin

In [None]:
import warnings
from types import SimpleNamespace

from pathlib import Path
import dask.distributed as dd
import geopandas as gpd
import numpy as np
import pandas as pd
import regionmask
import rhg_compute_tools.gcs as rhgcs
import rhg_compute_tools.kubernetes as rhgk
import rhg_compute_tools.utils as rhgu
import xarray as xr
from shapely.geometry import box

from sliiders import settings as sset
from sliiders import spatial as spatial

spatial.filter_spatial_warnings()


@rhgu.block_globals
def load_exposure(bbox, sset):
    """Get asset value and population within the bounds defined by `bbox`"""
    llon, llat, ulon, ulat = bbox.bounds

    # Get corners of `bbox` by their indices
    lx_ix, ux_ix = spatial.grid_val_to_ix(
        np.array([llon, ulon]),
        sset.LITPOP_GRID_WIDTH,
    )

    ly_ix, uy_ix = spatial.grid_val_to_ix(
        np.array([llat, ulat]),
        sset.LITPOP_GRID_WIDTH,
    )

    # Define filters for reading parquet (saves computation and memory)
    parquet_filters = [
        [
            ("x_ix", ">=", lx_ix),
            ("x_ix", "<", ux_ix),
            ("y_ix", ">=", ly_ix),
            ("y_ix", "<", uy_ix),
        ]
    ]

    exp_filters = [parquet_filters[0] + [("value", ">", 0)]]
    pop_filters = [parquet_filters[0] + [("population", ">", 0)]]

    # asset value
    exp = pd.read_parquet(
        sset.PATH_EXPOSURE_BLENDED,
        columns=["value", "x_ix", "y_ix"],
        filters=exp_filters,
    )

    pop_landscan = pd.read_parquet(
        sset.PATH_LANDSCAN_INT,
        columns=["population", "x_ix", "y_ix"],
        filters=pop_filters,
    ).rename(columns={"population": "pop_landscan"})

    exp = pd.merge(
        exp,
        pop_landscan,
        how="outer",
        left_on=["x_ix", "y_ix"],
        right_on=["x_ix", "y_ix"],
    )

    exp["value"] = exp["value"].fillna(0)
    exp["pop_landscan"] = exp["pop_landscan"].fillna(0)

    return exp


@rhgu.block_globals
def get_protected_area_matches(elev_tile, bbox, sset):
    """
    Get IDs of protected areas in `bbox`, returning a flattened array
    corresponding to the flattened indices of `elev_tile`
    """
    protected_areas = gpd.read_parquet(sset.PATH_GLOBAL_PROTECTED_AREAS)

    return spatial.get_partial_covering_matches(
        elev_tile, bbox, protected_areas, id_name="protection_zone_id"
    )


@rhgu.block_globals
def get_wetland_matches(elev_tile, bbox, sset):
    """
    Get flag indicating existence of wetlands in `bbox`, returning a flattened array
    corresponding to the flattened indices of `elev_tile`
    """
    wetlands = gpd.read_file(sset.PATH_WETLANDS_INT, bbox=(bbox.bounds))

    return spatial.get_partial_covering_matches(elev_tile, bbox, wetlands)

@rhgu.block_globals
def get_seg_adm(elev_tile, bbox, sset):
    seg_adm = gpd.read_file(
        sset.PATH_CIAM_ADM1_VORONOI_INTERSECTIONS_SHP,
        bbox=box(*bbox.buffer(0.1).bounds),
    )

    return spatial.get_vor_matches(elev_tile, bbox, seg_adm, "seg_adm", "seg_adm", None)

@rhgu.block_globals
def match_elev_pixels_to_shapes(elev_tile, this_exp, bbox, sset):

    out_df = get_empty_exp_grid(elev_tile, this_exp, sset.LITPOP_GRID_WIDTH)

    out_df["seg_adm"] = get_seg_adm(elev_tile, bbox, sset)
    out_df["seg_adm"] = out_df["seg_adm"].astype("category")

    out_df["protection_zone"] = get_protected_area_matches(elev_tile, bbox, sset)
    out_df["protection_zone"] = out_df["protection_zone"].astype("category")

    out_df["wetland_flag"] = get_wetland_matches(elev_tile, bbox, sset)
    out_df["wetland_flag"] = out_df["wetland_flag"].astype(bool)

    return out_df


@rhgu.block_globals
def get_valid_points_df(
    elev_tile,
    bbox,
    all_points,
    sset,
):
    elev_array = elev_tile.values.flatten()

    all_points["z_ix"] = spatial.grid_val_to_ix(
        elev_array, sset.EXPOSURE_BIN_WIDTH_V, map_nans=0
    )

    all_points["valid"] = (~np.isnan(elev_array)) & (
        (all_points["z_ix"] >= 0) | (all_points["protection_zone"] != -1)
    )

    all_points["area_km"] = spatial.get_cell_size_km(elev_tile, bbox)

    out_types = {
        "x_ix": np.int16,
        "y_ix": np.int16,
        "z_ix": np.int32,
        "seg_adm": "category",
        "protection_zone": "category",
        "wetland_flag": bool,
        "area_km": np.float32,
    }

    # compress
    all_points = all_points.astype(
        {k: v for k, v in out_types.items() if k in all_points.columns}
    )

    poselev_pts = (
        all_points[all_points["valid"]].drop(columns=["valid"]).reset_index(drop=True)
    )

    negelev_pts = (
        all_points[(~all_points["valid"]) & (all_points["wetland_flag"])]
        .drop(columns=["valid"])
        .reset_index(drop=True)
    )

    return poselev_pts, negelev_pts

@rhgu.block_globals
def get_agg_fields():
    """Get fields to aggregate over"""
    return [
        "z_ix",
        "seg_adm",
        "protection_zone",
    ]


@rhgu.block_globals
def write_empty_csv(out_path):
    # write CSV placeholder to indicate this tile has been processed, but doesn't have exposure
    pd.DataFrame().to_csv(out_path, index=False)
    return out_path


@rhgu.block_globals
def get_tile_out_path(tile_name, sset):
    """Get output path from the coastalDEM input path"""
    return sset.DIR_EXPOSURE_BINNED_TMP_TILES / f"{tile_name}.csv"


@rhgu.block_globals
def get_exp_noland_out_path(tile_name, sset):
    """Get output path for exposure that couldn't be matched to land within its 1-degree elevation tile"""
    return sset.DIR_EXPOSURE_BINNED_TMP_TILES_NOLAND / f"{tile_name}.csv"


@rhgu.block_globals
def get_seg_area_out_path(tile_name, sset):
    """Get output path for segment areas"""
    return sset.DIR_EXPOSURE_BINNED_TMP_TILES_SEGMENT_AREA / f"{tile_name}.csv"

@rhgu.block_globals
def merge_exposure_to_highres_grid(this_exp, out, sset):
    agg_fields = get_agg_fields()

    ix_merge = pd.merge(
        this_exp[["x_ix", "y_ix"]],
        out[["x_ix", "y_ix", "seg_adm"]].drop_duplicates(),
        left_on=["x_ix", "y_ix"],
        right_on=["x_ix", "y_ix"],
        how="left",
    )

    missing_exp_tiles = ix_merge[ix_merge["seg_adm"].isnull()].drop(columns=["seg_adm"])
    valid_exp_tiles = ix_merge[ix_merge["seg_adm"].notnull()].drop(columns=["seg_adm"])

    if valid_exp_tiles.shape[0] == 0:
        valid_exp_tiles = out[["x_ix", "y_ix"]].drop_duplicates()

    missing_exp_tiles["lon"] = spatial.grid_ix_to_val(
        missing_exp_tiles["x_ix"], sset.LITPOP_GRID_WIDTH
    )
    missing_exp_tiles["lat"] = spatial.grid_ix_to_val(
        missing_exp_tiles["y_ix"], sset.LITPOP_GRID_WIDTH
    )

    valid_exp_tiles["lon"] = spatial.grid_ix_to_val(
        valid_exp_tiles["x_ix"], sset.LITPOP_GRID_WIDTH
    )
    valid_exp_tiles["lat"] = spatial.grid_ix_to_val(
        valid_exp_tiles["y_ix"], sset.LITPOP_GRID_WIDTH
    )

    exp_ix_mappings = (
        spatial.get_closest_valid_exp_tiles(missing_exp_tiles, valid_exp_tiles)
        if missing_exp_tiles.shape[0] > 0
        else None
    )

    if exp_ix_mappings is not None:
        this_exp = pd.merge(
            this_exp,
            exp_ix_mappings,
            left_on=["x_ix", "y_ix"],
            right_on=["x_ix", "y_ix"],
            how="left",
        )

        this_exp["x_ix"] = this_exp["valid_x_ix"].fillna(this_exp["x_ix"]).astype(int)
        this_exp["y_ix"] = this_exp["valid_y_ix"].fillna(this_exp["y_ix"]).astype(int)

        this_exp = (
            this_exp.groupby(["x_ix", "y_ix"])[["value", "pop_landscan"]]
            .sum()
            .reset_index(drop=False)
        )

    exp_tile_areas = (
        out.groupby(["x_ix", "y_ix"])[["area_km"]]
        .sum()
        .rename(columns={"area_km": "tile_area_km"})
    )

    out = out.join(exp_tile_areas, on=["x_ix", "y_ix"])

    out = pd.merge(
        out,
        this_exp,
        how="left",
        left_on=["x_ix", "y_ix"],
        right_on=["x_ix", "y_ix"],
    ).reset_index(drop=True)

    out = out.drop(columns=["x_ix", "y_ix"])

    out["value"] = out["value"] * out["area_km"] / out["tile_area_km"]
    out["pop_landscan"] = out["pop_landscan"] * out["area_km"] / out["tile_area_km"]

    out = out.drop(columns=["tile_area_km"])

    out["value"] = out["value"].fillna(0)
    out["pop_landscan"] = out["pop_landscan"].fillna(0)

    assert out.notnull().all().all()

    out = out.drop(columns=["lon", "lat"])

    out = out.groupby(agg_fields, observed=True).sum().reset_index()

    # make sure no exposure was dropped or added from the original exposure within tile (within some margin of float error)
    # include very low sums for 0 / 0 division (areas where there is no exposure, but we calculate anyway for diva areas)
    assert (
        this_exp["value"].sum() < 0.00001
        or np.abs(this_exp["value"].sum() / out["value"].sum() - 1) < 0.00001
    )

    return out

@rhgu.block_globals
def process_tile(
    tile_name,
    sset,
    calc_elev=True,
    calc_exp=True,
):
    warnings.filterwarnings("ignore", message="Geometry is in a geographic CRS")
    warnings.filterwarnings("ignore", message="CRS mismatch between the CRS")
    warnings.filterwarnings(
        "ignore", message="Sequential read of iterator was interrupted"
    )

    out_path = get_tile_out_path(tile_name, sset)
    bbox = spatial.get_bbox(tile_name)

    this_exp = load_exposure(bbox, sset) if calc_exp else None

    # Set the "higher than coastal" value. This makes groupby calls faster and reduces
    # memory footprint.
    CAP = sset.HIGHEST_WITHELEV_EXPOSURE_METERS + 1

    if calc_elev:
        elev_tile = (
            xr.open_rasterio(sset.DIR_MSS / f"{tile_name}.tif")
            .squeeze("band")
            .drop("band")
        )
        # Bundle higher-than-coastal elevation values into one to simplify later data processing
        elev_tile = xr.where(elev_tile > CAP, CAP, elev_tile)
    else:
        elev_tile = spatial.get_granular_grid(bbox, cap=CAP)

    # match tile points with countries, impact regions, protection zones
    out = match_elev_pixels_to_shapes(elev_tile, this_exp, bbox, sset)

    # get points on land, assign impact regions and countries at exposure grid level
    out, negelev_pts = get_valid_points_df(elev_tile, bbox, out, sset)

    # if calc_elev:
    seg_areas = out.groupby(
        ["seg_adm", "protection_zone", "wetland_flag", "z_ix"],
        as_index=False,
        observed=True,
    )["area_km"].sum()

    negelev_areas = negelev_pts.groupby(
        ["seg_adm", "protection_zone", "wetland_flag"],
        as_index=False,
        observed=True,
    )["area_km"].sum()
    negelev_areas["z_ix"] = -1

    seg_areas = pd.concat([seg_areas, negelev_areas], ignore_index=True)

    seg_areas = seg_areas[
        (seg_areas["z_ix"] <= 200) & (seg_areas["protection_zone"] == -1)
    ]

    seg_out_path = get_seg_area_out_path(tile_name, sset)
    seg_areas.to_csv(seg_out_path, index=False)
    if not calc_exp:
        return seg_out_path

    if out.shape[0] == 0:
        if calc_exp:
            this_exp.to_csv(get_exp_noland_out_path(tile_name, sset), index=False)
        return write_empty_csv(out_path)

    out = (
        out[~out["wetland_flag"]].drop(columns=["wetland_flag"]).reset_index(drop=True)
    )

    out = merge_exposure_to_highres_grid(this_exp, out, sset)

    out.to_csv(out_path, index=False)

    return out_path

#### Copy CIAM seg shapefiles if they haven't been updated for this version of the exposure grid

In [None]:
def get_maj_min(vers_name):
    major, minor = vers_name.split(".")
    return int(major), int(minor)


exp_vers_maj, exp_vers_min = get_maj_min(sset.EXPOSURE_BINNED_VERS[1:])

dir_shp = sset.DIR_CIAM_VORONOI.parent

existing_vers = [get_maj_min(p.name[1:]) for p in list(dir_shp.glob("v*.*"))]

existing_vers.sort(key=lambda s: s[1])
existing_vers.sort(key=lambda f: f[0])

latest_vers_maj, latest_vers_min = existing_vers[-1]

if (exp_vers_maj, exp_vers_min) not in existing_vers:

    src_dir = dir_shp / ("v" + str(latest_vers_maj) + "." + str(latest_vers_min))
    dst_dir = dir_shp / ("v" + str(exp_vers_maj) + "." + str(exp_vers_min))

    rhgcs.cp(src_dir, dst_dir, flags=["r"])

#### Prepare output directories

In [None]:
sset.DIR_EXPOSURE_BINNED.mkdir(exist_ok=True)

sset.DIR_EXPOSURE_BINNED_TMP.mkdir(exist_ok=True)

sset.DIR_EXPOSURE_BINNED_TMP_TILES.mkdir(exist_ok=True)
sset.DIR_EXPOSURE_BINNED_TMP_TILES_NOLAND.mkdir(exist_ok=True)
sset.DIR_EXPOSURE_BINNED_TMP_TILES_SEGMENT_AREA.mkdir(exist_ok=True)

#### Get list of tiles to process

In [None]:
tile_meta_path = sset.PATH_EXPOSURE_TILE_LIST

tile_meta = pd.read_parquet(tile_meta_path)

tile_groups = tile_meta.groupby("PROCESSING_SET")["tile_name"].unique().to_dict()

In [None]:
all_tiles = np.concatenate(list(tile_groups.values()))

finished_tiles = [t[:-4][:7] for t in rhgcs.ls(sset.DIR_EXPOSURE_BINNED_TMP_TILES)]
finished_segs = [
    t[:-4][:7] for t in rhgcs.ls(sset.DIR_EXPOSURE_BINNED_TMP_TILES_SEGMENT_AREA)
]

remaining_tiles = [
    t for t in all_tiles if (t not in finished_tiles and t not in finished_segs)
]

print(len(remaining_tiles))

In [None]:
client, cluster = rhgk.get_standard_cluster()

In [None]:
nworkers = 200
cluster.scale(nworkers)
cluster

In [None]:
import os
import zipfile
from pathlib import Path

from sliiders import __file__

sliiders_dir = Path(__file__).parent
zipf = zipfile.ZipFile("sliiders.zip", "w", zipfile.ZIP_DEFLATED)
for root, dirs, files in os.walk(sliiders_dir):
    for file in files:
        zipf.write(
            os.path.join(root, file),
            os.path.relpath(os.path.join(root, file), os.path.join(sliiders_dir, "..")),
        )
zipf.close()
client.upload_file("sliiders.zip")

# Without elevation

Note: when running the below three cells, one occasionally may run into the Dask cluster being stuck on making a progress. We find that this occurrence is not tile-specific. In such cases, we advise the user to follow these steps:


1. Close the current Dask cluster and client by running `client.restart(); cluster.scale(0); client.close(); cluster.close()`
2. Once the Dask cluster and client have successfully closed, restart the notebook kernel.
3. Run all of the codes up to this section, and the cell directly below. Make sure that the Dask cluster is successfully running.
4. Since we only need to remaining tiles that has not been processed, run (in via `client.map`) `process_tile` on these remaining ones. This can be done by running the below cells again, since the already-processed tiles would not be included in `remaining_tiles` anymore.

In [None]:
withoutelev_tiles = np.array(
    [t for t in tile_groups["WITHOUTELEV"] if t in remaining_tiles]
)

withoutelev_tiles.shape[0]

In [None]:
withoutelev_futures = client.map(
    process_tile, withoutelev_tiles, sset=sset, calc_elev=False
)

In [None]:
dd.progress(withoutelev_futures)

# With elevation

Note: similar to the without elevation cases, there could be cases in which Dask becomes stuck on making a progress. In such cases, we advise the user to follow similar steps to those explained above (but without having to re-run the steps involving without elevation workflow (i.e., after restarting the notebook, run all except the three cells under **Without elevation**, and work on the remaining tiles).

### With Exposure

In [None]:
withelev_tiles = np.array([t for t in tile_groups["WITHELEV"] if t in remaining_tiles])

withelev_tiles.shape[0]

In [None]:
withelev_futures = client.map(
    process_tile,
    withelev_tiles,
    sset=sset,
)

In [None]:
dd.progress(withelev_futures)

#### No exposure (CIAM)

In [None]:
ciam_tiles = np.array([t for t in tile_groups["CIAM"] if t in remaining_tiles])
ciam_tiles.shape[0]

In [None]:
ciam_futures = client.map(
    process_tile,
    ciam_tiles,
    sset=sset,
    calc_exp=False,
)

In [None]:
dd.progress(ciam_futures)

## Shutdown workers

In [None]:
client.close()
cluster.close()