# Preprocessing

In [1]:
import numpy as np
import pandas as pd

In [13]:
# Read
df = pd.read_csv("../data/simplemaps_worldcities_basicv1.76/worldcities.csv")

# Cities > 1,000,000 population
df_pop = df.query("population > 1000000")

# Drop duplicates (take more populous entry)
df_pop = df_pop.sort_values(by="population", ascending=False).drop_duplicates(subset=["city_ascii"])

In [14]:
# Ensure largest city in each country is added to database
df_country_maxs = df.loc[df.dropna(subset="population").groupby("country")["population"].idxmax()].reset_index(
    drop=True
)

df_pop = pd.concat([df_pop, df_country_maxs]).drop_duplicates()

In [15]:
# Include some others
fun_cities = [
    ("Glasgow", "United Kingdom"),
    ("Edinburgh", "United Kingdom"),
    ("Dundee", "United Kingdom"),
    ("Aberdeen", "United Kingdom"),
    ("Inverness", "United Kingdom"),
    ("Elgin", "United Kingdom"),
    ("Heidelberg", "Germany"),
    ("Dortmund", "Germany"),
    ("Urbana", "United States"),
    ("Champaign", "United States"),
    ("Carbondale", "United States"),
    ("Ithaca", "United States"),
]

for city, country in fun_cities:
    if city in ["Urbana", "Carbondale"]:
        city_info = df[(df.city_ascii == city) & (df.country == country) & (df.admin_name == "Illinois")]
    else:
        city_info = df[(df.city_ascii == city) & (df.country == country)]

    df_pop = pd.concat([df_pop, city_info])

In [16]:
# Out
df_out = df_pop[["city_ascii", "country", "lat", "lng"]]

df_out.to_csv("../data/simplemaps_worldcities_basicv1.76/worldcities_subset.csv", index=False)

# Calculations

In [1]:
import os

import dask
import numpy as np
import pandas as pd
import xarray as xr

In [2]:
###############################
# Set paths
# UPDATE THIS FOR REPRODUCTION
###############################
out_path = "/storage/home/dcl5300/work/msd-live_dashboard_lafferty-sriver-2023/data/csv/"  # where to store intermediate data files to speed up plotting

nex_in = "/gpfs/group/kaf26/default/dcl5300/lafferty-sriver_inprep_tbh_DATA/metrics/nex-gddp/"  # location of NEX-GDDP metrics
cil_in = "/gpfs/group/kaf26/default/dcl5300/lafferty-sriver_inprep_tbh_DATA/metrics/cil-gdpcir/"  # location of CIL-GDPCIR metrics
isi_in = "/gpfs/group/kaf26/default/dcl5300/lafferty-sriver_inprep_tbh_DATA/metrics/isimip3b/regridded/conservative/"  # location of *regridded* ISIMIP metrics
cbp_in = "/gpfs/group/kaf26/default/dcl5300/lafferty-sriver_inprep_tbh_DATA/metrics/carbonplan/"  # location of carbonplan metrics

In [3]:
###################
# Models
###################
#######################################
############## NEX & CIL ##############
#######################################

ssp_all = ["ssp126", "ssp245", "ssp370", "ssp585"]
ssp_missing370 = ["ssp126", "ssp245", "ssp585"]
ssp_missing585 = ["ssp126", "ssp245", "ssp370"]

nex_ssp_dict = {
    "ACCESS-ESM1-5": ssp_all,
    "BCC-CSM2-MR": ssp_all,
    "CanESM5": ssp_all,
    "CMCC-ESM2": ssp_all,
    "CNRM-CM6-1": ssp_all,
    "CNRM-ESM2-1": ssp_all,
    "EC-Earth3": ssp_all,
    "EC-Earth3-Veg-LR": ssp_all,
    "GFDL-ESM4": ssp_all,
    "HadGEM3-GC31-LL": ssp_missing370,
    "INM-CM4-8": ssp_all,
    "INM-CM5-0": ssp_all,
    "IPSL-CM6A-LR": ssp_all,
    "MIROC-ES2L": ssp_all,
    "MIROC6": ssp_all,
    "MPI-ESM1-2-HR": ssp_all,
    "MPI-ESM1-2-LR": ssp_all,
    "MRI-ESM2-0": ssp_all,
    "NESM3": ssp_missing370,
    "NorESM2-LM": ssp_all,
    "NorESM2-MM": ssp_all,
    "UKESM1-0-LL": ssp_all,
}

cil_ssp_dict = {
    "ACCESS-ESM1-5": ssp_missing585,
    "BCC-CSM2-MR": ssp_all,
    "CanESM5": ssp_all,
    "CMCC-ESM2": ssp_all,
    "EC-Earth3": ssp_all,
    "EC-Earth3-Veg-LR": ssp_all,
    "GFDL-ESM4": ssp_all,
    "HadGEM3-GC31-LL": ssp_missing370,
    "INM-CM4-8": ssp_all,
    "INM-CM5-0": ssp_all,
    "MIROC-ES2L": ssp_all,
    "MIROC6": ssp_all,
    "MPI-ESM1-2-LR": ssp_all,
    "NESM3": ssp_missing370,
    "NorESM2-LM": ssp_all,
    "NorESM2-MM": ssp_all,
    "UKESM1-0-LL": ssp_all,
}

#######################################
############### ISIMIP3b ##############
#######################################

ssp_missing245 = ["ssp126", "ssp370", "ssp585"]

isimip_ssp_dict = {
    "CanESM5": ssp_missing245,
    "CNRM-CM6-1": ssp_missing245,
    "CNRM-ESM2-1": ssp_missing245,
    "EC-Earth3": ssp_missing245,
    "GFDL-ESM4": ssp_all,
    "IPSL-CM6A-LR": ssp_all,
    "MIROC6": ssp_missing245,
    "MPI-ESM1-2-HR": ssp_all,
    "MRI-ESM2-0": ssp_all,
    "UKESM1-0-LL": ssp_all,
}

########################################
############## carbonplan ##############
########################################

# GARD-SV
ssp_missing126 = ["ssp245", "ssp370", "ssp585"]
var_all = ["tasmin", "tasmax", "pr"]
var_missing_pr = ["tasmin", "tasmax"]

gardsv_ssp_dict = {
    "BCC-CSM2-MR": ssp_missing126,
    "CanESM5": ssp_missing126,
    "MIROC6": ssp_missing126,
    "MPI-ESM1-2-HR": ssp_missing126,
}

gardsv_var_dict = {
    "BCC-CSM2-MR": var_missing_pr,
    "CanESM5": var_all,
    "MIROC6": var_missing_pr,
    "MPI-ESM1-2-HR": var_all,
}

# DeepSD-BC
deepsdbc_dict = {
    "CanESM5": {"ssp245": var_all, "ssp370": var_all, "ssp585": var_all},
    "MRI-ESM2-0": {"ssp245": var_all, "ssp370": var_all, "ssp585": var_missing_pr},
}

In [4]:
# Models dict
nex_models = list(nex_ssp_dict.keys())
cil_models = list(cil_ssp_dict.keys())
isi_models = list(isimip_ssp_dict.keys())
cbp_gard_models = list(gardsv_ssp_dict.keys())
cbp_gard_precip_models = [model for model in cbp_gard_models if "pr" in gardsv_var_dict[model]]
cbp_deep_models = list(deepsdbc_dict.keys())

In [5]:
#########################################################
# Get raw timeseries for given lat/lon and write to file
# using dask for speedup
#########################################################
def get_timeseries_latlon(lat, lon, metric, var_id, out_str):
    ## Subfunction to read and process each ensemble
    def read_and_process(ensemble, path_in, model, metric, submetric, submetric_var, lat, lon):
        if metric in ["hot"]:
            model_str = (
                model
                + "_"
                + var_id.replace("_count", "").replace("_streak", "").replace("q99", "").replace("rp10", "")[:-5]
            )
        else:
            model_str = model

        # Read netcdf or zarr
        if ensemble in ["NEX", "ISIMIP", "GARD-SV"]:
            ds = xr.open_dataset(path_in + metric + "/" + model_str + ".nc")
        elif ensemble in ["CIL", "DeepSD-BC"]:
            ds = xr.open_dataset(path_in + metric + "/" + model, engine="zarr")

        # Select submetric
        ds = ds[submetric]

        # Select lat
        ds = ds.sel(lat=lat, method="nearest")

        # Common preprocessing
        ds["time"] = ds.indexes["time"].year
        if ds.lon.max() > 180:
            ds["lon"] = np.where(ds["lon"] > 180, ds["lon"] - 360, ds["lon"])
            ds = ds.sortby("lon")

        # Select lon
        ds = ds.sel(lon=lon, method="nearest")

        # Construct dataframe
        df_tmp = ds.to_dataframe().drop(columns=["lat", "lon"]).reset_index()
        df_tmp["ensemble"] = ensemble
        df_tmp["model"] = model

        # DeepSD-BC still has member_id
        if "member_id" in df_tmp.columns:
            df_tmp = df_tmp.drop(columns="member_id")

        return df_tmp

    ## Read all combinations
    delayed_df = []
    # df = pd.DataFrame(columns = ['time', 'model', 'ensemble', 'ssp', var_id])

    # NEX
    for model in nex_models:
        df_tmp = dask.delayed(read_and_process)("NEX", nex_in, model, metric, var_id, False, lat, lon)
        delayed_df.append(df_tmp)

    # CIL
    for model in cil_models:
        df_tmp = dask.delayed(read_and_process)("CIL", cil_in, model, metric, var_id, False, lat, lon)
        delayed_df.append(df_tmp)

    # ISIMIP
    for model in isi_models:
        df_tmp = dask.delayed(read_and_process)("ISIMIP", isi_in, model, metric, var_id, False, lat, lon)
        delayed_df.append(df_tmp)

    # GARD-SV
    if metric in ["wet", "dry", "hotdry"] or var_id == "pr":
        model_list = cbp_gard_precip_models
    else:
        model_list = cbp_gard_models
    for model in model_list:
        df_tmp = dask.delayed(read_and_process)(
            "GARD-SV",
            cbp_in + "regridded/conservative/GARD-SV/",
            model,
            metric,
            var_id,
            False,
            lat,
            lon,
        )
        delayed_df.append(df_tmp)

    # DeepSD-BC
    for model in cbp_deep_models:
        df_tmp = dask.delayed(read_and_process)(
            "DeepSD-BC",
            cbp_in + "native_grid/DeepSD-BC/",
            model,
            metric,
            var_id,
            False,
            lat,
            lon,
        )
        delayed_df.append(df_tmp)

    # Compute
    if not os.path.isfile(out_path + out_str + ".csv"):
        df = dask.compute(*delayed_df)

        # Store
        pd.concat(df).to_csv(out_path + out_str + ".csv", index=False)
    else:
        return None

In [6]:
###################
# Dask
###################
from dask_jobqueue import PBSCluster

cluster = PBSCluster(
    cores=1,
    memory="15GB",
    resource_spec="pmem=15GB",
    worker_extra_args=["#PBS -l feature=rhel7"],
    walltime="00:10:00",
)

cluster.scale(jobs=20)  # ask for jobs

from dask.distributed import Client

client = Client(cluster)

client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.102.201.236:33748,Workers: 0
Dashboard: /proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [17]:
# Read city list
city_list = pd.read_csv("../data/simplemaps_worldcities_basicv1.76/worldcities_subset.csv")

city_list["city_ascii"] = city_list["city_ascii"].apply(lambda x: x.replace(" ", "X-X"))
city_list["country"] = city_list["country"].apply(lambda x: x.replace(" ", "X-X"))

In [22]:
%%time

##################################
# Create and store all necessary
# timeseries for the figures
##################################
## Averages
metric = "avg"
for var_id in ["tas", "pr"]:
    for city in city_list["city_ascii"]:
        city_info_all = city_list[city_list.city_ascii == city]
        for country in city_info_all["country"]:
            city_info = city_list[(city_list.city_ascii == city) & (city_list.country == country)]
            lat, lon = city_info["lat"].values[0], city_info["lng"].values[0]
            get_timeseries_latlon(lat, lon, metric, var_id, city + "_" + country + "_" + metric + "_" + var_id)

## 1-day Max
metric = "max"
for var_id in ["pr", "tasmax"]:
    for city in city_list["city_ascii"]:
        city_info_all = city_list[city_list.city_ascii == city]
        for country in city_info_all["country"]:
            city_info = city_list[(city_list.city_ascii == city) & (city_list.country == country)]
            lat, lon = city_info["lat"].values[0], city_info["lng"].values[0]
            get_timeseries_latlon(lat, lon, metric, var_id, city + "_" + country + "_" + metric + "_" + var_id)

## Dry days
metric = "dry"
for var_id in ["count_lt_1"]:
    for city in city_list["city_ascii"]:
        city_info_all = city_list[city_list.city_ascii == city]
        for country in city_info_all["country"]:
            city_info = city_list[(city_list.city_ascii == city) & (city_list.country == country)]
            lat, lon = city_info["lat"].values[0], city_info["lng"].values[0]
            get_timeseries_latlon(lat, lon, metric, var_id, city + "_" + country + "_" + metric + "_" + var_id)

## Extremely Hot Days
metric = "hot"
for var_id in ["tasmax_q99gmfd_count"]:
    for city in city_list["city_ascii"]:
        city_info_all = city_list[city_list.city_ascii == city]
        for country in city_info_all["country"]:
            city_info = city_list[(city_list.city_ascii == city) & (city_list.country == country)]
            lat, lon = city_info["lat"].values[0], city_info["lng"].values[0]
            get_timeseries_latlon(lat, lon, metric, var_id, city + "_" + country + "_" + metric + "_" + var_id)

## Extremely Wet Days
metric = "wet"
for var_id in ["pr_q99gmfd_count"]:
    for city in city_list["city_ascii"]:
        city_info_all = city_list[city_list.city_ascii == city]
        for country in city_info_all["country"]:
            city_info = city_list[(city_list.city_ascii == city) & (city_list.country == country)]
            lat, lon = city_info["lat"].values[0], city_info["lng"].values[0]
            get_timeseries_latlon(lat, lon, metric, var_id, city + "_" + country + "_" + metric + "_" + var_id)

CPU times: user 1min 13s, sys: 3.33 s, total: 1min 16s
Wall time: 5min 11s
