# Stitch and normalise

In [1]:
%load_ext nb_black

<IPython.core.display.Javascript object>

In [2]:
import datetime as dt
import glob
import os
import os.path
import re
import traceback
import warnings
from concurrent.futures import as_completed, ProcessPoolExecutor
from multiprocessing import Pool
from time import sleep

import netcdf_scm.retractions
import netcdf_scm.stitching
import pandas as pd
import scmdata
import tqdm.autonotebook as tqdman
import xarray as xr

import config

  import tqdm.autonotebook as tqdman


<IPython.core.display.Javascript object>

In [3]:
ID = config.ID

<IPython.core.display.Javascript object>

In [4]:
RUN_CHECK = False

<IPython.core.display.Javascript object>

In [5]:
CRUNCH_DIR = "./{}-country-crunch".format(ID)
STITCHED_DIR = "./{}-country-crunch-stitched".format(ID)
STITCHED_NORMALISED_DIR = "./{}-country-crunch-stitched-normalised".format(ID)

CRUNCH_DIR = "./{}-country-crunch-popn-weighted".format(ID)
STITCHED_DIR = "./{}-country-crunch-stitched-popn-weighted".format(ID)
STITCHED_NORMALISED_DIR = (
    "./{}-country-crunch-stitched-normalised-popn-weighted".format(ID)
)

MAX_WORKERS = 60

<IPython.core.display.Javascript object>

In [32]:
!mkdir -p {STITCHED_DIR}
!mkdir -p {STITCHED_NORMALISED_DIR}

<IPython.core.display.Javascript object>

In [6]:
display(CRUNCH_DIR)
ssp_files = [
    f
    for f in glob.glob(os.path.join(CRUNCH_DIR, "**", "*.nc"), recursive=True)
    if "ssp" in f and "ssp245-" not in f
]
# ssp_files = [f for f in glob.glob(os.path.join(CRUNCH_DIR, "**", "*.nc"), recursive=True) if "ssp" in f]
display(len(ssp_files))
ssp_files[:2]

'./20210416-country-crunch-popn-weighted'

22

['./20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/BCC/BCC-CSM2-MR/ssp370/r1i1p1f1/Amon/tas/gn/v20190314/netcdf-scm_tas_Amon_BCC-CSM2-MR_ssp370_r1i1p1f1_gn_201501-210012.nc',
 './20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/BCC/BCC-CSM2-MR/ssp585/r1i1p1f1/Amon/tas/gn/v20190314/netcdf-scm_tas_Amon_BCC-CSM2-MR_ssp585_r1i1p1f1_gn_201501-210012.nc']

<IPython.core.display.Javascript object>

In [7]:
ssp_files[-1]

'./20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/NUIST/NESM3/ssp126/r1i1p1f1/Amon/tas/gn/v20190731/netcdf-scm_tas_Amon_NESM3_ssp126_r1i1p1f1_gn_201501-210012.nc'

<IPython.core.display.Javascript object>

In [8]:
cms = set([f.split(os.sep)[6] for f in ssp_files])
display(len(cms))
print("\n".join(sorted(cms)))

5

ACCESS-CM2
BCC-CSM2-MR
E3SM-1-1
IITM-ESM
NESM3


<IPython.core.display.Javascript object>

In [9]:
# TODO: move this into netcdf_scm
retracted_ids = netcdf_scm.retractions.check_retractions(
    [".".join(f.split(os.sep)[3:-1]) for f in ssp_files], esgf_query_batch_size=20
)
retracted_files = []
for i in retracted_ids:
    retracted_dir = os.path.join(
        CRUNCH_DIR, "netcdf-scm-crunched", i.replace(".", os.sep)
    )
    retracted_files_dir = os.listdir(retracted_dir)
    assert len(retracted_files_dir) == 1
    retracted_files.append(os.path.join(retracted_dir, retracted_files_dir[0]))

sorted(retracted_files)

Querying ESGF (submitting jobs):   0%|          | 0/2 [00:00<?, ?it/s]

Retrieving results from ESGF jobs:   0%|          | 0/2 [00:00<?, ?it/s]

[]

<IPython.core.display.Javascript object>

In [10]:
ssp_files = [f for f in ssp_files if f not in retracted_files]
display(len(ssp_files))

22

<IPython.core.display.Javascript object>

In [11]:
def get_bcc_csm2_mr_hack_extension(picontrol, norm_years):
    picontrol_last_year = picontrol["year"].max()
    last_nyear_mean = (
        picontrol.filter(
            year=range(picontrol_last_year - norm_years, picontrol_last_year + 1)
        )
        .timeseries()
        .mean(axis=1)
    )
    last_nyear_mean.name = dt.datetime(picontrol_last_year + 1, 1, 16, 12)
    hack_extension = pd.concat([picontrol.timeseries(), last_nyear_mean], axis=1)
    hack_extension = scmdata.ScmRun(hack_extension)
    hack_extension = hack_extension.interpolate(
        hack_extension["time"].tolist()
        + [
            dt.datetime(y, v.month, v.day, v.hour)
            for y in range(picontrol_last_year + 1, picontrol_last_year + 400)
            for v in hack_extension.filter(year=int(picontrol_last_year))[
                "time"
            ].tolist()
        ][1:],
        extrapolation_type="constant",
    )
    hack_extension.metadata = picontrol.metadata

    return hack_extension

<IPython.core.display.Javascript object>

In [12]:
def stitch_and_normalise(
    f, catch=True, norm_years=21, normalise=True, verbose=False, force=False
):
    def get_result():
        if verbose:
            print(f"Loading and stitching {f}")
        (
            scmrun,
            picontrol_branching_time,
            picontrol_file,
        ) = netcdf_scm.stitching.get_continuous_timeseries_with_meta(
            f, drs="CMIP6Output", return_picontrol_info=normalise
        )

        variable = scmrun.get_unique_meta("variable", True)
        climate_model = scmrun.get_unique_meta("climate_model", True)
        scenario = scmrun.get_unique_meta("scenario", True)
        member_id = scmrun.get_unique_meta("member_id", True)

        min_time = scmrun["time"].min()
        start_year = min_time.year
        start_month = min_time.month

        max_time = scmrun["time"].max()
        end_year = max_time.year
        end_month = max_time.month

        table = os.path.basename(f).split("_")[2]
        grid = os.path.basename(f).split("_")[-2]
        out_name = f"netcdf-scm_{variable}_Amon_{climate_model}_{scenario}_{member_id}_{grid}_{start_year}{start_month:02d}-{end_year}{end_month:02d}.nc"

        if normalise:
            out_file = os.path.join(STITCHED_NORMALISED_DIR, out_name)
        else:
            out_file = os.path.join(STITCHED_DIR, out_name)

        if os.path.isfile(out_file):
            if verbose:
                print(f"Out file already exists: {out_file}")

            if force:
                if verbose:
                    print("Force over-writing")
            else:
                return None

        if normalise:
            if verbose:
                print(f"Loading {picontrol_file}")

            picontrol_scmrun = netcdf_scm.io.load_scmrun(picontrol_file)
            picontrol_scmrun.metadata["netcdf-scm crunched file"] = picontrol_file

            if climate_model == "BCC-CSM2-MR":
                if verbose:
                    print("Performing hack extension of piControl")

                picontrol_scmrun = get_bcc_csm2_mr_hack_extension(
                    picontrol_scmrun, norm_years
                )

            elif climate_model == "CAMS-CSM1-0":
                bt_raw = scmrun.metadata["(parent) branch_time_in_parent"]
                if verbose:
                    print("Branch time in parent: {}".format(bt_raw))
                picontrol_branching_time = dt.datetime(int(bt_raw), 1, 1)
                if verbose:
                    print(
                        "Updating branch time to: {}".format(picontrol_branching_time)
                    )
            elif climate_model in ["FGOALS-f3-L", "CAS-ESM2-0"]:
                if climate_model == "FGOALS-f3-L":
                    member_ids = {
                        "r1i1p1f1": dt.datetime(600, 1, 1),
                        "r2i1p1f1": dt.datetime(650, 1, 1),
                        "r3i1p1f1": dt.datetime(700, 1, 1),
                    }

                elif climate_model == "CAS-ESM2-0":
                    member_ids = {
                        "r1i1p1f1": dt.datetime(100, 1, 1),
                        "r2i1p1f1": dt.datetime(150, 1, 1),
                        "r3i1p1f1": dt.datetime(200, 1, 1),
                        "r4i1p1f1": dt.datetime(250, 1, 1),
                    }

                if verbose:
                    print("Over-writing branch time metadata")
                picontrol_branching_time = member_ids[member_id]

                if verbose:
                    print("Assumed branch time: {}".format(picontrol_branching_time))

            elif climate_model == "GFDL-CM4":
                assert (
                    scmrun.metadata["(parent) parent_experiment_id"] == "piControl"
                ), scmrun.metadata["(parent) parent_experiment_id"]
                assert (
                    scmrun.metadata["(parent) parent_time_units"]
                    == "days since 0001-1-1"
                )
                picontrol_branching_time = dt.datetime(
                    picontrol_scmrun["year"].min()
                    - 1  # so that we start from zero (as confirmed via email)
                    + picontrol_branching_time.year,
                    picontrol_branching_time.month,
                    picontrol_branching_time.day,
                    picontrol_branching_time.hour,
                )

                if verbose:
                    print("Assumed branch time: {}".format(picontrol_branching_time))

            elif climate_model == "TaiESM1":
                source = netcdf_scm.io.load_scmrun(f)
                parent_replacements = netcdf_scm.stitching.get_parent_replacements(
                    source
                )
                parent_file = netcdf_scm.stitching.get_parent_file_path(
                    f, parent_replacements, "CMIP6Output"
                )
                parent = netcdf_scm.io.load_scmrun(parent_file)
                # email from group, they will fix later
                parent.metadata["parent_time_units"] = "days since 0201-01-01"
                picontrol_branching_time = netcdf_scm.stitching.get_branch_time(parent)
                if verbose:
                    print("Assumed branch time: {}".format(picontrol_branching_time))

                if verbose:
                    print("Performing hack extension of piControl")

                picontrol_scmrun = get_bcc_csm2_mr_hack_extension(
                    picontrol_scmrun, norm_years
                )

            if verbose:
                print(f"Normalising using {norm_years} years")

            normaliser = netcdf_scm.normalisation.NormaliserRunningMean(
                nyears=norm_years
            )

            out = normaliser.normalise_against_picontrol(
                scmrun, picontrol_scmrun, picontrol_branching_time
            )
        else:
            out = scmrun

        out["grid"] = grid

        out_to_disk = out.copy()
        out_to_disk.metadata = {
            k.replace("(", "").replace(")", ""): v
            for k, v in out_to_disk.metadata.items()
        }

        if verbose:
            print(f"Saving to {out_file}")

        out_to_disk.to_nc(out_file)

        return None

    if catch:
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")

            try:
                return get_result()
            except Exception as exc:
                raise ValueError("File failed: {}".format(f)) from exc
    else:
        return get_result()

<IPython.core.display.Javascript object>

In [13]:
checker = "./country-crunch/netcdf-scm-crunched/CMIP6/ScenarioMIP/AS-RCEC/TaiESM1/ssp245/r1i1p1f1/Amon/tas/gn/v20201124/netcdf-scm_tas_Amon_TaiESM1_ssp245_r1i1p1f1_gn_201501-210012.nc"
checker

'./country-crunch/netcdf-scm-crunched/CMIP6/ScenarioMIP/AS-RCEC/TaiESM1/ssp245/r1i1p1f1/Amon/tas/gn/v20201124/netcdf-scm_tas_Amon_TaiESM1_ssp245_r1i1p1f1_gn_201501-210012.nc'

<IPython.core.display.Javascript object>

In [14]:
# RUN_CHECK = True
# %pdb off

<IPython.core.display.Javascript object>

In [15]:
if RUN_CHECK:
    import xarray as xr
    from netcdf_scm.iris_cube_wrappers import ScmCube

    def _load_helper_and_scm_cubes(path):
        scm_cubes = {}

        data = xr.open_dataset(path)
        data.load()  # get everything in memory

        # Must be kept until https://github.com/pandas-dev/pandas/issues/37071
        # is solved
        if data["time"].encoding["units"] == "days since 1-01-01 00:00:00":
            data["time"].encoding["units"] = "days since 0001-01-01 00:00:00"

        for _, darray in data.data_vars.items():
            try:
                region = darray.attrs["region"]
            except KeyError:
                # bnds or some other unclassified variable
                continue

            if region != "World":
                continue

            scm_cubes[region] = ScmCube()

            scm_cubes[region].cube = darray.to_iris()
            scm_cubes[region].cube.attributes = {
                **scm_cubes[region].cube.attributes,
                **data.attrs,
            }

        # take any cube as base for now, not sure how to really handle this so will
        # leave like this for now and only make this method public when I work it
        # out...
        loaded = list(scm_cubes.values())[0]

        return loaded, scm_cubes

    netcdf_scm.io._load_helper_and_scm_cubes = _load_helper_and_scm_cubes

<IPython.core.display.Javascript object>

In [16]:
if RUN_CHECK:
    tmp = stitch_and_normalise(checker, catch=False, verbose=True)
    display(tmp)

<IPython.core.display.Javascript object>

In [17]:
if RUN_CHECK:
    source = netcdf_scm.io.load_scmrun(checker)
    display(source)

<IPython.core.display.Javascript object>

In [18]:
if RUN_CHECK:
    parent_replacements = netcdf_scm.stitching.get_parent_replacements(source)
    display(parent_replacements)

<IPython.core.display.Javascript object>

In [19]:
if RUN_CHECK:
    parent_file = netcdf_scm.stitching.get_parent_file_path(
        checker, parent_replacements, "CMIP6Output"
    )
    display(parent_file)

<IPython.core.display.Javascript object>

In [20]:
if RUN_CHECK:
    parent = netcdf_scm.io.load_scmrun(parent_file)
#     parent.metadata["parent_time_units"] = "days since 0001-01-01"

<IPython.core.display.Javascript object>

In [21]:
if RUN_CHECK:
    display(netcdf_scm.stitching.get_branch_time(parent, parent=True))
    display(netcdf_scm.stitching.get_branch_time(parent))

<IPython.core.display.Javascript object>

In [22]:
if RUN_CHECK:
    !ncdump -h {parent_file} | grep parent

<IPython.core.display.Javascript object>

In [23]:
if RUN_CHECK:
    picontrol = netcdf_scm.io.load_scmrun(
        "./country-crunch/netcdf-scm-crunched/CMIP6/CMIP/AS-RCEC/TaiESM1/piControl/r1i1p1f1/Amon/tas/gn/v20200211/netcdf-scm_tas_Amon_TaiESM1_piControl_r1i1p1f1_gn_020101-070012.nc"
    )

<IPython.core.display.Javascript object>

In [24]:
if RUN_CHECK:
    display(picontrol)

<IPython.core.display.Javascript object>

In [25]:
if RUN_CHECK:
    picontrol_new_time = picontrol.timeseries(time_axis="year-month")
    year_shift = 671 - 1850
    # year_shift = 3030 - 1850
    picontrol_new_time.columns = picontrol_new_time.columns.map(
        lambda x: x - year_shift
    )
    # picontrol_new_time = picontrol_new_time.rolling(
    #     window=21 * 12, center=True, axis="columns"
    # ).mean()
    picontrol_new_time = scmdata.ScmRun(picontrol_new_time)
    display(picontrol_new_time)

<IPython.core.display.Javascript object>

In [26]:
if RUN_CHECK:
    ax = (
        #         scmdata.run_append([source, parent, picontrol])
        scmdata.run_append([source, parent, picontrol_new_time])
        #         scmdata.run_append([source, parent, hack_extension])
        .filter(region="World", year=range(1840, 1855))  # .time_mean("AC")
        #     .filter(year=range(1850, 1950))
        .lineplot(style="climate_model")
    )
#     ax.set_xlim([1840, 1855])

<IPython.core.display.Javascript object>

In [27]:
# ssp_files = [f for f in ssp_files if "TaiESM1" in f]
# ssp_files

<IPython.core.display.Javascript object>

In [34]:
normalise = False
# normalise = True

# force = True
force = False

verbose = True
verbose = False

pool = ProcessPoolExecutor(max_workers=MAX_WORKERS)

futures = []
for f in tqdman.tqdm(ssp_files):
    futures.append(
        pool.submit(
            stitch_and_normalise, f, normalise=normalise, verbose=verbose, force=force
        )
    )

all_errors = []
errors = []
for i, future in tqdman.tqdm(
    enumerate(as_completed(futures, timeout=None)), total=len(futures)
):
    try:
        future.result()
    except Exception as exc:
        errors.append(traceback.format_exc())

    if i % 50 == 10 or i == len(futures) - 1:
        print("\n\n".join(errors))
        all_errors += list(
            set([v for e in errors for v in re.findall(".*File failed: (.*.nc)", e)])
        )
        #         if errors:
        #             break
        errors = []

  0%|          | 0/22 [00:00<?, ?it/s]

  0%|          | 0/22 [00:00<?, ?it/s]

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "<ipython-input-12-aa7800c49b33>", line 169, in stitch_and_normalise
    return get_result()
  File "<ipython-input-12-aa7800c49b33>", line 11, in get_result
    ) = netcdf_scm.stitching.get_continuous_timeseries_with_meta(
  File "/data/ubuntu-znicholls/miniconda3/envs/cmip6-country-level-processing/lib/python3.8/site-packages/netcdf_scm/stitching.py", line 238, in get_continuous_timeseries_with_meta
    parent_file_path = get_parent_file_path(
  File "/data/ubuntu-znicholls/miniconda3/envs/cmip6-country-level-processing/lib/python3.8/site-packages/netcdf_scm/stitching.py", line 373, in get_parent_file_path
    raise IOError(
OSError: No parent data (historical) available for ./20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/E3SM-Project/E3SM-1-1/ssp245/r2i1p1f1/Amon/tas/gr/v20201110/netcdf-scm_tas_Amon_E3SM-1-1_ssp245_r2i1p1f1_gr_202001-202412.nc, we looked in ./202




<IPython.core.display.Javascript object>

In [35]:
all_errors

['./20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/BCC/BCC-CSM2-MR/ssp245/r1i1p1f1/Amon/tas/gn/v20190314/netcdf-scm_tas_Amon_BCC-CSM2-MR_ssp245_r1i1p1f1_gn_201501-210012.nc',
 './20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/E3SM-Project/E3SM-1-1/ssp245/r3i1p1f1/Amon/tas/gr/v20201110/netcdf-scm_tas_Amon_E3SM-1-1_ssp245_r3i1p1f1_gr_202001-202412.nc',
 './20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/BCC/BCC-CSM2-MR/ssp126/r1i1p1f1/Amon/tas/gn/v20190314/netcdf-scm_tas_Amon_BCC-CSM2-MR_ssp126_r1i1p1f1_gn_201501-210012.nc',
 './20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/BCC/BCC-CSM2-MR/ssp585/r1i1p1f1/Amon/tas/gn/v20190314/netcdf-scm_tas_Amon_BCC-CSM2-MR_ssp585_r1i1p1f1_gn_201501-210012.nc',
 './20210416-country-crunch-popn-weighted/netcdf-scm-crunched/CMIP6/ScenarioMIP/E3SM-Project/E3SM-1-1/ssp245/r10i1p1f1/Amon/tas/gr/v20201110/netcdf-scm_tas_Amon_E3SM-1-1_ssp245_r10i

<IPython.core.display.Javascript object>

In [36]:
len(all_errors)

9

<IPython.core.display.Javascript object>