# Developing scripts for data preparation

In [1]:
%load_ext autoreload
%autoreload 2
%load_ext lab_black

# Cluster

In [2]:
from dask.distributed import Client
from dask_jobqueue import PBSCluster

walltime = "02:00:00"
cores = 24
memory = "96GB"
cluster = PBSCluster(
    processes=1,
    walltime=str(walltime),
    cores=cores,
    memory=str(memory),
    job_extra=[
        "-l ncpus=" + str(cores),
        "-l mem=" + str(memory),
        "-P xv83",
        "-l jobfs=100GB",
        "-l storage=gdata/xv83+gdata/oi10",
    ],
    local_directory="$PBS_JOBFS",
    # env_extra=['export MALLOC_TRIM_THRESHOLD_="0"'],
    header_skip=["select"],
)

  from distributed.utils import tmpfile


In [3]:
cluster.scale(jobs=1)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.6.44.3:8787/status,

0,1
Dashboard: http://10.6.44.3:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.6.44.3:40241,Workers: 0
Dashboard: http://10.6.44.3:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [49]:
# Helper functions for opening data in a common format

import glob

import dask
import xarray as xr

import yaml
from functools import reduce, partial

from src import utils


def _load_config(name):
    """Load a config .yaml file for a specified dataset"""
    with open(name, "r") as reader:
        return yaml.load(reader, Loader=yaml.SafeLoader)


def _maybe_translate_variables(variables, translation_dict):
    """
    Translate variables using provided dictionary where possible
    """
    translated_variables = {}
    for realm, var in variables.items():
        translated_variables[realm] = []
        for v in var:
            try:
                translated_variables[realm].append(translation_dict[v])
            except KeyError as exception:
                translated_variables[realm].append(v)
    return translated_variables


def _maybe_rename(ds, rename):
    """
    Rename all variables etc that have an entry in rename
    """
    for k, v in rename.items():
        if v in ds:
            ds = ds.rename({v: k})
    return ds


def _scale_variables(ds, norm_dict):
    """
    Rescale variables in a dataset according to provided dictionary
    """
    for v in norm_dict.keys():
        if v in ds:
            ds[v] = float(norm_dict[v]) * ds[v]
    return ds


def _composite_function(function_dict):
    """
    Return a composite function of all functions specified in a processing
        step of a config .yaml
    """

    def composite(*funcs):
        def compose(f, g):
            return lambda x: g(f(x))

        return reduce(compose, funcs, lambda x: x)

    funcs = []
    for fn in function_dict.keys():
        kws = function_dict[fn]
        kws = {} if kws is None else kws
        funcs.append(partial(getattr(utils, fn), **kws))

    return composite(*funcs)


class _open:
    """
    Class containing the dataset-specific code for opening each available dataset
    """

    def JRA55(path, realm, variables, _):
        """Open JRA55 variables from specified realm"""
        return xr.open_dataset(
            f"{path}/{realm}.zarr.zip",
            engine="zarr",
            chunks={},
            use_cftime=True,
        )[variables]

    def HadISST(realm, variables, _):
        """Open HadISST variables from specified realm"""
        path = "/g/data/xv83/reanalyses/HadISST/"
        ds = xr.open_dataset(
            f"{path}/{realm}.zarr",
            engine="zarr",
            chunks={},
            use_cftime=True,
        )[variables]
        return ds.where(ds > -1000)

    def EN422(path, _, variables, __):
        """Open EN.4.2.2 variables"""
        return xr.open_mfdataset(
            f"{path}/*.nc",
            parallel=True,
            use_cftime=True,
        )[variables]

    def CAFEf6(realm, variables, preprocess):
        """Open CAFE-f6 variables from specified realm applying preprocess prior to
        concanenating forecasts
        """
        path = "/g/data/xv83/dcfp/CAFE-f6/"
        files = sorted(
            glob.glob(f"{path}/c5-d60-pX-f6-????1101/{realm}.zarr.zip")
        )  # Skip May starts

        return xr.open_mfdataset(
            files,
            compat="override",
            preprocess=preprocess,
            engine="zarr",
            coords="minimal",
            parallel=True,
        )[variables]

    def CAFEf5(path, realm, variables, _):
        """Open CAFE-f5 variables from specified realm, including appending first
        10 members of CAFE-f6 for 2020 forecast
        """
        return xr.open_dataset(
            f"{path}/NOV/{realm}.zarr.zip", engine="zarr", chunks={}
        )[variables]

    def CAFE60v1(path, realm, variables, _):
        """Open CAFE60v1 variables from specified realm"""
        return xr.open_dataset(f"{path}/{realm}.zarr.zip", engine="zarr", chunks={})[
            variables
        ]

    def CAFE_hist(realm, variables, _):
        """Open CAFE historical run variables from specified realm"""
        path = "/g/data/xv83/users/ds0092/data/CAFE/historical/WIP/"
        hist = xr.open_dataset(
            f"{path}/c5-d60-pX-hist-19601101/ZARR/{realm}.zarr.zip",
            engine="zarr",
            chunks={},
        )[variables]

        ctrl = xr.open_dataset(
            f"{path}/c5-d60-pX-ctrl-19601101/ZARR/{realm}.zarr.zip",
            engine="zarr",
            chunks={},
        )[variables]

        hist = utils.truncate_latitudes(hist)
        ctrl = utils.truncate_latitudes(ctrl)

        drift = (
            ctrl.mean("ensemble")
            .groupby("time.month")
            .map(lambda x: x - x.mean(["time"]))
        )
        return hist - drift

    def CanESM5(path, realm, variables, _):
        """Open CanESM5 dcppA-hindcast variables from specified realm"""

        def _CanESM5_file(y, m, v):
            version = "v20190429"
            return f"{path}/s{y-1}-r{m}i1p2f1/{realm}/{v}/gn/{version}/{v}_{realm}_CanESM5_dcppA-hindcast_s{y-1}-r{m}i1p2f1_gn_{y}01-{y+9}12.nc"

        @dask.delayed
        def _open_CanESM5_delayed(y, m, v):
            file = _CanESM5_file(y, m, v)
            ds = xr.open_dataset(file, chunks={})[v]
            return ds

        def _open_CanESM5(y, m, v, d0):
            var_data = _open_CanESM5_delayed(y, m, v).data
            return dask.array.from_delayed(var_data, d0.shape, d0.dtype)

        years = range(1981, 2018)  # CanESM5 ocean files end in 2017
        members = range(1, 40 + 1)

        ds = []
        for v in variables:
            f0 = _CanESM5_file(years[0], members[0], v)
            d0 = utils.convert_time_to_lead(xr.open_dataset(f0, chunks={}))[v]

            delayed = []
            for y in years:
                delayed.append(
                    dask.array.stack(
                        [_open_CanESM5(y, m, v, d0) for m in members], axis=0
                    )
                )
            delayed = dask.array.stack(delayed, axis=0)

            init = xr.cftime_range(
                str(years[0]), str(years[-1]), freq="YS", calendar="julian"
            )
            time = [
                xr.cftime_range(i, periods=120, freq="MS", calendar="julian")
                for i in init
            ]
            ds.append(
                xr.DataArray(
                    delayed,
                    dims=["init", "member", *d0.dims],
                    coords={
                        "member": members,
                        "init": init,
                        **d0.coords,
                        "time": (["init", "lead"], time),
                    },
                    attrs=d0.attrs,
                ).to_dataset(name=v)
            )
        return xr.merge(ds).compute()

    def CanESM5_hist(path, realm, variables, _):
        """Open CanESM5 historical variables from specified realm"""

        @dask.delayed
        def _open_CanESM5_hist_delayed(f, v):
            ds = xr.open_dataset(f, chunks={})[v]
            return ds

        def _open_CanESM5_hist(f, v):
            var_data = _open_CanESM5_hist_delayed(f, v).data
            return dask.array.from_delayed(var_data, d0.shape, d0.dtype)

        ds = []
        members = range(1, 40 + 1)
        for v in variables:
            files = sorted(
                glob.glob(
                    f"{path}/r*i1p2f1/{realm}/{v}/gn/v20190429/{v}_{realm}_CanESM5_historical_r*i1p2f1_gn_185001-201412.nc"
                )
            )
            d0 = xr.open_dataset(
                files[0],
                chunks={},
            )[v]

            delayed = dask.array.stack(
                [_open_CanESM5_hist(f, v) for f in files], axis=0
            )

            ds.append(
                xr.DataArray(
                    delayed,
                    dims=["member", *d0.dims],
                    coords={
                        "member": members,
                        **d0.coords,
                    },
                    attrs=d0.attrs,
                ).to_dataset(name=v)
            )

        return xr.merge(ds).compute()

In [66]:
def prepare_dataset(config, save_dir):
    """
    Prepare a dataset according to a provided config file and save as netcdf
    """
    cfg = _load_config(config)

    # List of datasets that have open methods impletemented
    methods = [
        method_name
        for method_name in dir(_open)
        if callable(getattr(_open, method_name))
    ]
    methods = [m for m in methods if "__" not in m]

    if "name" not in cfg:
        raise ValueError(
            f"Please provide an entry for 'name' in the config file so that I know how to open the data. Available options are {methods}"
        )

    if "prepare" in cfg:
        # Loop over output variables
        output_variables = cfg["prepare"]
        for variable in output_variables.keys():
            input_variables = output_variables[variable]["uses"]

            if "rename" in cfg:
                input_variables = _maybe_translate_variables(
                    input_variables, cfg["rename"]
                )

            if "preprocess" in output_variables[variable]:
                preprocess = _composite_function(
                    output_variables[variable]["preprocess"]
                )
            else:
                preprocess = None

            if hasattr(_open, cfg["name"]):
                ds = []
                for realm, var in input_variables.items():
                    ds.append(getattr(_open, cfg["name"])(realm, var, preprocess))
                ds = xr.merge(ds)
            else:
                raise ValueError(
                    f"There is no method available to open '{cfg['name']}'. Please ensure that the 'name' entry in the config file matches an existing method in src.data._open, or add a new method for this data. Available methods are {methods}"
                )

            if "rename" in cfg:
                ds = _maybe_rename(ds, cfg["rename"])

            if "scale_variables" in cfg:
                ds = _scale_variables(ds, cfg["scale_variables"])

            if "apply" in output_variables[variable]:
                ds = _composite_function(output_variables[variable]["apply"])(ds)

            ds.to_zarr(f"{save_dir}/{cfg['name']}.{variable}.zarr", mode="w")

    else:
        raise ValueError(f"No variables were specified to prepare")

In [91]:
config_path = "/g/data/xv83/users/ds0092/active_projects/Squire_2022_CAFE-f6/config/"
save_dir = (
    "/g/data/xv83/users/ds0092/active_projects/Squire_2022_CAFE-f6/data/processed"
)
test = prepare_dataset(f"{config_path}/HadISST.yaml", save_dir)

  return self.array[key]


In [92]:
test = xr.open_zarr(
    "/g/data/xv83/users/ds0092/active_projects/Squire_2022_CAFE-f6/data/processed/HadISST.sst.zarr",
)

In [93]:
test

Unnamed: 0,Array,Chunk
Bytes,29.91 MiB,29.91 MiB
Shape,"(121, 180, 360)","(121, 180, 360)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 29.91 MiB 29.91 MiB Shape (121, 180, 360) (121, 180, 360) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",360  180  121,

Unnamed: 0,Array,Chunk
Bytes,29.91 MiB,29.91 MiB
Shape,"(121, 180, 360)","(121, 180, 360)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray


In [None]:
def make_datasets(datasets, yaml_suffix):
    """ Process dataset according to specifications in a provided set of config files
        and save output ../processed
    """
    ds

In [140]:
def JRA55(realm, variables):
    """Open JRA55 data following specifications in JRA55.yaml"""

    cfg = _load_config("JRA55")

    if isinstance(variables, str):
        variables = [variables]

    if "rename" in cfg:
        variables = _maybe_translate_variables(variables, cfg["rename"])

    if "preprocess" in cfg:
        warnings.warn(
            "preprocess functions were provided but not used because the data does not require concatenation"
        )

    ds = xr.open_dataset(
        f"{cfg['path']}/{realm}.zarr.zip",
        engine="zarr",
        chunks={},
        use_cftime=True,
    )[variables]

    if "rename" in cfg:
        ds = _maybe_rename(ds, cfg["rename"])

    if "scale_variables" in cfg:
        ds = _normalise(ds, cfg["scale_variables"])

    if "postprocess" in cfg:
        ds = _composite_function(cfg["postprocess"])(ds)

    return ds


def HadISST(variables):
    """Open HadISST data following specifications in HadISST.yaml"""

    cfg = _load_config("HadISST")

    if isinstance(variables, str):
        variables = [variables]

    if "rename" in cfg:
        variables = _maybe_translate_variables(variables, cfg["rename"])

    if "preprocess" in cfg:
        warnings.warn(
            "preprocess functions were provided but not used because the data does not require concatenation"
        )

    ds = xr.open_dataset(
        f"{cfg['path']}/ocean_month.zarr",
        engine="zarr",
        chunks={},
        use_cftime=True,
    )[variables]
    ds = ds.where(ds > -1000)

    if "rename" in cfg:
        ds = _maybe_rename(ds, cfg["rename"])

    if "normalise" in cfg:
        ds = _normalise(ds, cfg["normalise"])

    if "postprocess" in cfg:
        ds = _composite_function(cfg["postprocess"])(ds)

    return ds


def EN422(variables):
    """Open EN.4.2.2 data following specifications in EN422.yaml"""

    cfg = _load_config("EN422")

    if isinstance(variables, str):
        variables = [variables]

    if "rename" in cfg:
        variables = _maybe_translate_variables(variables, cfg["rename"])

    if "preprocess" in cfg:
        warnings.warn(
            "preprocess functions were provided but not used because the data does not require concatenation"
        )

    ds = xr.open_mfdataset(
        f"{PATHS['EN422']}/*.nc",
        parallel=True,
        use_cftime=True,
    )[variables]

    if "rename" in cfg:
        ds = _maybe_rename(ds, cfg["rename"])

    if "normalise" in cfg:
        ds = _normalise(ds, cfg["normalise"])

    if "postprocess" in cfg:
        ds = _composite_function(cfg["postprocess"])(ds)

    return ds


def CAFEf6(realm, variables):
    """Open CAFEf6 forecast data following specifications in CAFEf6.yaml"""

    cfg = _load_config("CAFEf6")

    if isinstance(variables, str):
        variables = [variables]

    if "rename" in cfg:
        variables = _maybe_translate_variables(variables, cfg["rename"])

    if "preprocess" in cfg:
        preprocess = _composite_function(cfg["preprocess"])
    else:
        preprocess = None

    files = sorted(
        glob.glob(f"{cfg['path']}/c5-d60-pX-f6-????1101/{realm}.zarr.zip")
    )  # Skip May starts

    ds = xr.open_mfdataset(
        files,
        compat="override",
        preprocess=preprocess,
        engine="zarr",
        coords="minimal",
        parallel=True,
    )[variables]

    if "rename" in cfg:
        ds = _maybe_rename(ds, cfg["rename"])

    if "normalise" in cfg:
        ds = _normalise(ds, cfg["normalise"])

    if "postprocess" in cfg:
        ds = _composite_function(cfg["postprocess"])(ds)

    return ds


def CAFEf5(realm, variables):
    """Open CAFE-f5 forecast data following specifications in CAFEf5.yaml"""

    cfg = _load_config("CAFEf5")

    if isinstance(variables, str):
        variables = [variables]

    if "rename" in cfg:
        variables = _maybe_translate_variables(variables, cfg["rename"])

    if "preprocess" in cfg:
        warnings.warn(
            "preprocess functions were provided but not used because the data does not require concatenation"
        )

    ds = xr.open_dataset(
        f"{cfg['path']}/NOV/{realm}.zarr.zip", engine="zarr", chunks={}
    )[variables]

    # Append 2020 forecast from CAFE-f6
    cfg_f6 = _load_config("CAFEf6")

    ds_2020 = xr.open_dataset(
        f"{cfg_f6['path']}/c5-d60-pX-f6-20201101/{realm}.zarr.zip",
        engine="zarr",
        chunks={},
    )[variables]
    ds_2020 = ds_2020.isel(ensemble=range(10))
    ds_2020 = utils.convert_time_to_lead(ds_2020)

    if "rename" in cfg:
        ds = _maybe_rename(ds, cfg["rename"])
        ds_2020 = _maybe_rename(ds_2020, cfg["rename"])

    if "normalise" in cfg:
        ds = _normalise(ds, cfg["normalise"])
        ds_2020 = _normalise(ds_2020, cfg["normalise"])

    if "postprocess" in cfg:
        ds = _composite_function(cfg["postprocess"])(ds)
        ds_2020 = _composite_function(cfg["postprocess"])(ds_2020)

    return xr.concat([ds, ds_2020], dim="init")


def CAFE60v1(realm, variables):
    """Open CAFE60v1 data following specifications in CAFE60v1.yaml"""

    cfg = _load_config("CAFE60v1")

    if isinstance(variables, str):
        variables = [variables]

    if "rename" in cfg:
        variables = _maybe_translate_variables(variables, cfg["rename"])

    if "preprocess" in cfg:
        warnings.warn(
            "preprocess functions were provided but not used because the data does not require concatenation"
        )

    ds = xr.open_dataset(f"{cfg['path']}/{realm}.zarr.zip", engine="zarr", chunks={})[
        variables
    ]

    if "rename" in cfg:
        ds = _maybe_rename(ds, cfg["rename"])

    if "normalise" in cfg:
        ds = _normalise(ds, cfg["normalise"])

    if "postprocess" in cfg:
        ds = _composite_function(cfg["postprocess"])(ds)

    return ds


def CAFE_hist(realm, variables):
    """Open CAFE historical data following specifications in CAFE_hist.yaml"""

    cfg = _load_config("CAFE_hist")

    if isinstance(variables, str):
        variables = [variables]

    if "rename" in cfg:
        variables = _maybe_translate_variables(variables, cfg["rename"])

    if "preprocess" in cfg:
        warnings.warn(
            "preprocess functions were provided but not used because the data does not require concatenation"
        )

    hist = xr.open_dataset(
        f"{cfg['path']}/c5-d60-pX-hist-19601101/ZARR/{realm}.zarr.zip",
        engine="zarr",
        chunks={},
    )[variables]

    ctrl = xr.open_dataset(
        f"{cfg['path']}/c5-d60-pX-ctrl-19601101/ZARR/{realm}.zarr.zip",
        engine="zarr",
        chunks={},
    )[variables].mean("ensemble")

    if "rename" in cfg:
        hist = _maybe_rename(hist, cfg["rename"])
        ctrl = _maybe_rename(ctrl, cfg["rename"])

    if "normalise" in cfg:
        hist = _normalise(hist, cfg["normalise"])
        ctrl = _normalise(ctrl, cfg["normalise"])

    if "postprocess" in cfg:
        hist = _composite_function(cfg["postprocess"])(hist)
        ctrl = _composite_function(cfg["postprocess"])(ctrl)

    drift = ctrl.groupby("time.month").map(lambda x: x - x.mean(["time"]))
    return hist - drift

