# Combine exposure tiles to construct with-elevation, without-elevation, and area-by-elevation exposure parquets
Take all the 1-degree by 1-degree binned exposure tiles and combine them to form three global datasets:
1. "With-elevation" binned exposure: Includes all areas with elevations up to `sset.HIGHEST_WITHELEV_EXPOSURE_METERS`
2. "Without-elevation" binned exposure: Includes all global exposure
3. Area-by-seg-adm1: For each segment and adm1 region, the total area, in square kilometers, that is closer to that segment than to any other segment

In [None]:
import random

import dask.dataframe as ddf
import dask.distributed as dd
import geopandas as gpd
import numpy as np
import pandas as pd
import rhg_compute_tools.kubernetes as rhgk
import rhg_compute_tools.utils as rhgu

from sliiders import settings as sset
from sliiders import spatial

spatial.filter_spatial_warnings()

In [None]:
dir_batches = sset.DIR_EXPOSURE_BINNED_TMP / "batches"
dir_batches.mkdir(exist_ok=False)

dir_seg_batches = sset.DIR_EXPOSURE_BINNED_TMP / "segment_area_batches"
dir_seg_batches.mkdir(exist_ok=False)

# Define batching function

In [None]:
@rhgu.block_globals
def run_batch(batch_num, batch_size, in_paths, dir_batches, include_tile_name=False):
    exp = []
    batch_paths = in_paths[
        batch_num * batch_size : min((batch_num + 1) * batch_size, len(in_paths))
    ]

    for filename in batch_paths:
        try:
            df = pd.read_csv(filename, index_col=None, header=0)
            if include_tile_name:
                df["filename"] = filename.stem
            exp.append(df)
        except pd.errors.EmptyDataError:
            # these are the placeholder CSVs
            pass

    exp = pd.concat(exp, axis=0, ignore_index=True)
    if "wetland_flag" in exp.columns:
        exp["wetland_flag"] = exp["wetland_flag"].astype(bool)

    exp.to_parquet(dir_batches / f"batch_{batch_num}.parquet")

    return 1

## Start workers

In [None]:
nworkers = 32

In [None]:
client, cluster = rhgk.get_micro_cluster()

In [None]:
cluster.scale(nworkers)

cluster

## Combine 1-degree tile CSVs into batches

In [None]:
tile_paths = list(sset.DIR_EXPOSURE_BINNED_TMP_TILES.glob("*.csv"))

In [None]:
batch_size = int(len(tile_paths) / (nworkers * 2)) + 1

In [None]:
# shuffling the paths helps assure each worker gets CSV batches of about the same total size
random.seed(1)
random.shuffle(tile_paths)

In [None]:
batch_futures = [
    client.submit(run_batch, i, batch_size, tile_paths, dir_batches)
    for i in range(nworkers * 2)
]

In [None]:
dd.progress(batch_futures)

## Combine 1-degree segment-area tile CSVs into batches

In [None]:
seg_tile_paths = list(sset.DIR_EXPOSURE_BINNED_TMP_TILES_SEGMENT_AREA.glob("*.csv"))

In [None]:
batch_size = int(len(seg_tile_paths) / (nworkers * 2)) + 1

In [None]:
# shuffling the paths helps assure each worker gets CSV batches of about the same total size
random.seed(1)
random.shuffle(seg_tile_paths)

In [None]:
batch_futures = [
    client.submit(run_batch, i, batch_size, seg_tile_paths, dir_seg_batches)
    for i in range(nworkers * 2)
]

In [None]:
dd.progress(batch_futures)

# Merge tile batches

In [None]:
exp_ddf = ddf.read_parquet(str(dir_batches / f"batch_*.parquet"))

In [None]:
exp_ddf = exp_ddf.rename(columns={"value": "asset_value"})

In [None]:
exp_ddf

In [None]:
column_dtypes = {
    "z_ix": np.int32,
    "seg_adm": str,
    "protection_zone": np.int16,
    "area_km": np.float32,
    "asset_value": np.float32,
    "pop_landscan": np.float32,
}

In [None]:
exp_ddf = exp_ddf.astype(column_dtypes).persist()

In [None]:
exp_ddf

# Merge segment-area tile batches

In [None]:
seg_area_ddf = ddf.read_parquet(str(dir_seg_batches / f"batch_*.parquet"))

In [None]:
area_by_elev = seg_area_ddf.groupby(
    ["z_ix", "seg_adm", "protection_zone", "wetland_flag"]
)["area_km"].sum()

In [None]:
area_by_elev = area_by_elev.persist()
dd.progress(area_by_elev)

In [None]:
area_by_elev = area_by_elev.reset_index(drop=False)

In [None]:
highest_z_ix = (
    int(sset.HIGHEST_WITHELEV_EXPOSURE_METERS / sset.EXPOSURE_BIN_WIDTH_H) - 1
)
area_by_elev = area_by_elev[area_by_elev["z_ix"] <= highest_z_ix]

area_by_elev

In [None]:
ciam = (
    area_by_elev.groupby(["z_ix", "seg_adm", "protection_zone", "wetland_flag"])[
        "area_km"
    ]
    .sum()
    .reset_index(drop=False)
)

In [None]:
area_by_elev_dtypes = {
    "z_ix": np.int16,
    "seg_adm": "category",
    "protection_zone": "category",
    "wetland_flag": bool,
    "area_km": np.float32,
    "land_area_km": np.float32,
    "wetland_area_km": np.float32,
}

In [None]:
ciam = ciam.astype({k: v for k, v in area_by_elev_dtypes.items() if k in ciam.columns})

In [None]:
ciam = ciam.persist()

In [None]:
ciam_local = ciam.compute()

In [None]:
def divide_area_by_elev_into_wetland_and_non_wetland(area_by_elev_local):
    group_cols = [
        c for c in area_by_elev_local.columns if c not in ["wetland_flag", "area_km"]
    ]

    with_wetland = area_by_elev_local.loc[area_by_elev_local["wetland_flag"]]
    without_wetland = area_by_elev_local.loc[~area_by_elev_local["wetland_flag"]]
    area_by_elev_local = pd.merge(
        without_wetland,
        with_wetland,
        left_on=group_cols,
        right_on=group_cols,
        suffixes=("_no_wetland", "_wetland"),
        how="outer",
    ).reset_index(drop=True)

    area_by_elev_local = area_by_elev_local.drop(
        columns=["wetland_flag_no_wetland", "wetland_flag_wetland"]
    )

    area_by_elev_local = area_by_elev_local.rename(
        columns={
            "area_km_no_wetland": "land_area_km",
            "area_km_wetland": "wetland_area_km",
        }
    )

    area_by_elev_local["land_area_km"] = area_by_elev_local["land_area_km"].fillna(0)
    area_by_elev_local["wetland_area_km"] = area_by_elev_local[
        "wetland_area_km"
    ].fillna(0)

    area_by_elev_local = area_by_elev_local.astype(
        {
            k: v
            for k, v in area_by_elev_dtypes.items()
            if k in area_by_elev_local.columns
        }
    )

    return area_by_elev_local.reset_index(drop=True)

In [None]:
ciam_local = divide_area_by_elev_into_wetland_and_non_wetland(ciam_local)

#### Remove any old versions

In [None]:
sset.PATH_EXPOSURE_AREA_BY_CIAM_AND_ELEVATION

In [None]:
sset.PATH_EXPOSURE_AREA_BY_CIAM_AND_ELEVATION.exists()

In [None]:
sset.PATH_EXPOSURE_AREA_BY_CIAM_AND_ELEVATION.parent.mkdir(exist_ok=True)

#### Save parquet

In [None]:
ciam_local.to_parquet(sset.PATH_EXPOSURE_AREA_BY_CIAM_AND_ELEVATION, index=False)

In [None]:
client.cancel(area_by_elev)

## Create without-elevation dataframe from with-elevation tiles

In [None]:
exp_ddf

In [None]:
withoutelev_ddf = exp_ddf.groupby(
    ["seg_adm"],
)[["asset_value", "pop_landscan", "area_km"]].sum()

In [None]:
withoutelev_ddf = withoutelev_ddf.reset_index(drop=False)

In [None]:
withoutelev_ddf = withoutelev_ddf.persist()
dd.progress(withoutelev_ddf)

In [None]:
withoutelev_ddf = withoutelev_ddf.astype(
    {k: v for k, v in column_dtypes.items() if k in withoutelev_ddf.columns}
)

In [None]:
withoutelev_ddf = withoutelev_ddf.persist()
dd.progress(withoutelev_ddf)

#### Remove any old versions

In [None]:
sset.PATH_EXPOSURE_BINNED_WITHOUTELEV.exists()

#### Save parquet

In [None]:
withoutelev_pq_out = withoutelev_ddf.to_parquet(
    sset.PATH_EXPOSURE_BINNED_WITHOUTELEV,
    engine="pyarrow",
    write_index=False,
    compute=False,
).persist()

In [None]:
dd.progress(withoutelev_ddf)

## Create with-elevation parquet

In [None]:
withelev_ddf = exp_ddf[exp_ddf["z_ix"] <= highest_z_ix]

In [None]:
withelev_ddf

In [None]:
withelev_ddf = withelev_ddf.groupby(["z_ix", "seg_adm", "protection_zone"])[
    ["area_km", "asset_value", "pop_landscan"]
].sum()

In [None]:
withelev_ddf = withelev_ddf.reset_index(drop=False)

In [None]:
withelev_ddf = withelev_ddf.persist()
dd.progress(withelev_ddf)

#### Remove any old versions

In [None]:
sset.PATH_EXPOSURE_BINNED_WITHELEV

In [None]:
sset.PATH_EXPOSURE_BINNED_WITHELEV.exists()

#### Save parquet

In [None]:
withelev_pq_out = withelev_ddf.to_parquet(
    sset.PATH_EXPOSURE_BINNED_WITHELEV,
    engine="pyarrow",
    write_index=False,
    compute=False,
)

In [None]:
withelev_pq_out = withelev_pq_out.persist()

In [None]:
dd.progress(withelev_pq_out)

### Shut down cluster

In [None]:
client.close()
cluster.close()

## Make some final adjustments and checks

In [None]:
withelev = pd.read_parquet(sset.PATH_EXPOSURE_BINNED_WITHELEV)

withoutelev = pd.read_parquet(sset.PATH_EXPOSURE_BINNED_WITHOUTELEV)

area_ciam = pd.read_parquet(sset.PATH_EXPOSURE_AREA_BY_CIAM_AND_ELEVATION)

In [None]:
exp_dtypes = {
    "z_ix": np.int32,
    "seg_adm": "category",
    "protection_zone": np.int16,
    "area_km": np.float32,
    "asset_value": np.float32,
    "pop_landscan": np.float32,
}

# Step through fields one-by-one to prevent memory explosion copying the whole dataframe
for field, field_type in exp_dtypes.items():
    withelev[field] = withelev[field].astype(field_type)

for field, field_type in exp_dtypes.items():
    if field in withoutelev.columns:
        withoutelev[field] = withoutelev[field].astype(field_type)

In [None]:
withelev = withelev[
    (withelev["asset_value"] > 0) | (withelev["pop_landscan"] > 0)
].reset_index(drop=True)

withoutelev = withoutelev[
    (withoutelev["asset_value"] > 0) | (withoutelev["pop_landscan"] > 0)
].reset_index(drop=True)

In [None]:
def parse_adm1(df):
    df["adm1"] = df["seg_adm"].str[15:]
    df["ISO"] = df["adm1"].str[:3]
    return df

In [None]:
area_ciam = parse_adm1(area_ciam)
withelev = parse_adm1(withelev)
withoutelev = parse_adm1(withoutelev)

#### Check against PWT 10.0

In [None]:
ktable_full = pd.read_parquet(sset.PATH_COUNTRY_LEVEL_EXPOSURE)

In [None]:
ktable_full = ktable_full.reset_index(drop=False)

In [None]:
ktable_full = ktable_full[ktable_full["year"] == 2019].set_index("ccode")[
    ["cn_19", "pop"]
]

In [None]:
ktable = ktable_full["cn_19"] * 1e6

In [None]:
pop = ktable_full["pop"] * 1e6

In [None]:
replacements = {"XAD": "GBR", "XKO": "KO-", "XNC": "CYP", "XPI": "CHN"}

area_ciam["ISO"] = area_ciam["ISO"].apply(
    lambda c: replacements[c] if c in replacements else c
)

withelev["ISO"] = withelev["ISO"].apply(
    lambda c: replacements[c] if c in replacements else c
)

withoutelev["ISO"] = withoutelev["ISO"].apply(
    lambda c: replacements[c] if c in replacements else c
)

In [None]:
set(ktable.index) - set(withoutelev["ISO"].unique())

In [None]:
set(withoutelev["ISO"].unique()) - set(ktable.index)

In [None]:
assert len(set(withoutelev["ISO"].unique()) - set(ktable.index)) == 0

### Rescale asset value if needed

In [None]:
country_totals = withoutelev.groupby("ISO")["asset_value"].sum()
country_totals.name = "country_asset_value"

In [None]:
check = pd.DataFrame(ktable).join(country_totals, on="ccode")

In [None]:
check["diff"] = check["cn_19"] / check["country_asset_value"]

If rescaling:

In [None]:
scaling = check[["diff"]]

scaling["diff"].max(), scaling["diff"].min()

withoutelev = withoutelev.join(scaling, on="ISO")
withelev = withelev.join(scaling, on="ISO")

withoutelev["asset_value"] = withoutelev["asset_value"] * withoutelev["diff"]
withelev["asset_value"] = withelev["asset_value"] * withelev["diff"]

withoutelev = withoutelev.drop(columns=["diff"])
withelev = withelev.drop(columns=["diff"])

### Rescale population if needed

In [None]:
country_totals_landscan = withoutelev.groupby("ISO")["pop_landscan"].sum()
country_totals_landscan.name = "country_population_landscan"

In [None]:
check = pd.DataFrame(pop).join(country_totals_landscan, on="ccode")
check["diff_landscan"] = check["pop"] / check["country_population_landscan"]

If rescaling:

In [None]:
scaling = check[["diff_landscan"]]

scaling["diff_landscan"].max(), scaling["diff_landscan"].min()

In [None]:
withoutelev = withoutelev.join(scaling, on="ISO")
withelev = withelev.join(scaling, on="ISO")

withoutelev["pop_landscan"] = withoutelev["pop_landscan"] * withoutelev["diff_landscan"]
withelev["pop_landscan"] = withelev["pop_landscan"] * withelev["diff_landscan"]

withoutelev = withoutelev.drop(columns=["diff_landscan"])
withelev = withelev.drop(columns=["diff_landscan"])

In [None]:
withelev["asset_value"].sum() / 1e12

In [None]:
withoutelev["asset_value"].sum() / 1e12

In [None]:
withelev["pop_landscan"].sum() / 1e9

In [None]:
withoutelev["pop_landscan"].sum() / 1e9

In [None]:
withoutelev["ISO"] = withoutelev["ISO"].astype("category")
withelev["ISO"] = withelev["ISO"].astype("category")

withoutelev["asset_value"] = withoutelev["asset_value"].astype(np.float32)
withelev["asset_value"] = withelev["asset_value"].astype(np.float32)

In [None]:
sset.PATH_EXPOSURE_BINNED_WITHELEV.exists()

Delete

In [None]:
sset.PATH_EXPOSURE_BINNED_WITHELEV

In [None]:
withelev.to_parquet(sset.PATH_EXPOSURE_BINNED_WITHELEV, index=False)

In [None]:
sset.PATH_EXPOSURE_BINNED_WITHOUTELEV.exists()

In [None]:
sset.PATH_EXPOSURE_BINNED_WITHOUTELEV

In [None]:
withoutelev.to_parquet(sset.PATH_EXPOSURE_BINNED_WITHOUTELEV, index=False)

In [None]:
area_ciam.to_parquet(sset.PATH_EXPOSURE_AREA_BY_CIAM_AND_ELEVATION, index=False)

### Add `lowelev` field to CIAM-Adm1 intersections file to indicate inclusion in elevation processing

In [None]:
ciam_adm1 = gpd.read_parquet(sset.PATH_CIAM_ADM1_VORONOI_INTERSECTIONS)
ciam_adm1["lowelev"] = ciam_adm1["seg_adm"].isin(withelev["seg_adm"].unique())
ciam_adm1["ISO"] = ciam_adm1["ISO"].apply(
    lambda c: replacements[c] if c in replacements else c
)

In [None]:
ciam_adm1.to_parquet(
    sset.PATH_CIAM_ADM1_VORONOI_INTERSECTIONS, index=False, row_group_size=500
)

### Check that it looks good

#### withelev

In [None]:
withelev_out = pd.read_parquet(sset.PATH_EXPOSURE_BINNED_WITHELEV)

In [None]:
withelev_out.head()

In [None]:
withelev_out["asset_value"].sum() / 1e12

#### withoutelev

In [None]:
withoutelev_out = pd.read_parquet(sset.PATH_EXPOSURE_BINNED_WITHOUTELEV)

In [None]:
withoutelev_out.head()

In [None]:
withoutelev_out["asset_value"].sum() / 1e12

#### CIAM area-by-elevation

In [None]:
area = pd.read_parquet(sset.PATH_EXPOSURE_AREA_BY_CIAM_AND_ELEVATION)

In [None]:
area.head()