<img width="50" src="https://carbonplan-assets.s3.amazonaws.com/monogram/dark-small.png" style="margin-left:0px;margin-top:20px"/>

# TERRACLIMATE to Zarr

_by Joe Hamman (CarbonPlan), June 29, 2020_

This notebook converts the raw TERAACLIMATE dataset to Zarr format.

**Inputs:**

- inake catalog: `climate.gridmet_opendap`

**Outputs:**

- Cloud copy of TERRACLIMATE

**Notes:**

- No reprojection or processing of the data is done in this notebook.


In [None]:
import os
import fsspec
import xarray as xr

import dask
from dask.distributed import Client
from dask_gateway import Gateway
from typing import List
import urlpath
from tqdm import tqdm

In [None]:
# options
name = "terraclimate"
chunks = {"lat": 1440, "lon": 1440, "time": 12}
years = list(range(1958, 2020))
cache_location = f"gs://carbonplan-scratch/{name}-cache/"
target_location = f"gs://carbonplan-data/raw/{name}/4000m/raster.zarr"

In [None]:
gateway = Gateway()
options = gateway.cluster_options()
options.worker_cores = 1
options.worker_memory = 42
cluster = gateway.new_cluster(cluster_options=options)
cluster.adapt(minimum=0, maximum=40)
client = cluster.get_client()
cluster
# client = Client(n_workers=2)

In [None]:
client

In [None]:
import gcsfs

fs = gcsfs.GCSFileSystem()
try:
    _ = fs.rm(target_location, recursive=True)
except FileNotFoundError:
    pass

In [None]:
# # uncomment to remove all temporary zarr stores
zarrs = [
    fn + ".zarr" for fn in fs.glob("carbonplan-scratch/terraclimate-cache/*nc")
]
fs.rm(zarrs, recursive=True)

In [None]:
variables = [
    "aet",
    "def",
    "pet",
    "ppt",
    "q",
    "soil",
    "srad",
    "swe",
    "tmax",
    "tmin",
    "vap",
    "ws",
    "vpd",
    "PDSI",
]

rename_vars = {"PDSI": "pdsi"}

mask_opts = {
    "PDSI": ("lt", 10),
    "aet": ("lt", 32767),
    "def": ("lt", 32767),
    "pet": ("lt", 32767),
    "ppt": ("lt", 32767),
    "ppt_station_influence": None,
    "q": ("lt", 2147483647),
    "soil": ("lt", 32767),
    "srad": ("lt", 32767),
    "swe": ("lt", 10000),
    "tmax": ("lt", 200),
    "tmax_station_influence": None,
    "tmin": ("lt", 200),
    "tmin_station_influence": None,
    "vap": ("lt", 300),
    "vap_station_influence": None,
    "vpd": ("lt", 300),
    "ws": ("lt", 200),
}

In [None]:
def apply_mask(key, da):
    """helper function to mask DataArrays based on a threshold value"""
    if mask_opts.get(key, None):
        op, val = mask_opts[key]
        if op == "lt":
            da = da.where(da < val)
        elif op == "neq":
            da = da.where(da != val)
    return da


def preproc(ds):
    """custom preprocessing function for terraclimate data"""
    rename = {}

    station_influence = ds.get("station_influence", None)

    if station_influence is not None:
        ds = ds.drop_vars("station_influence")

    var = list(ds.data_vars)[0]

    if var in rename_vars:
        rename[var] = rename_vars[var]

    if "day" in ds.coords:
        rename["day"] = "time"

    if station_influence is not None:
        ds[f"{var}_station_influence"] = station_influence

    if rename:
        ds = ds.rename(rename)

    return ds


def postproc(ds):
    """custom post processing function to clean up terraclimate data"""
    drop_encoding = [
        "chunksizes",
        "fletcher32",
        "shuffle",
        "zlib",
        "complevel",
        "dtype",
        "_Unsigned",
        "missing_value",
        "_FillValue",
        "scale_factor",
        "add_offset",
    ]
    for v in ds.data_vars.keys():
        with xr.set_options(keep_attrs=True):
            ds[v] = apply_mask(v, ds[v])
        for k in drop_encoding:
            ds[v].encoding.pop(k, None)

    return ds


def get_encoding(ds):
    compressor = Blosc()
    encoding = {key: {"compressor": compressor} for key in ds.data_vars}
    return encoding


@dask.delayed
def download(source_url: str, cache_location: str) -> str:
    """
    Download a remote file to a cache.
    Parameters
    ----------
    source_url : str
        Path or url to the source file.
    cache_location : str
        Path or url to the target location for the source file.
    Returns
    -------
    target_url : str
        Path or url in the form of `{cache_location}/hash({source_url})`.
    """
    fs = fsspec.get_filesystem_class(cache_location.split(":")[0])(
        token="cloud"
    )

    name = urlpath.URL(source_url).name
    target_url = os.path.join(cache_location, name)

    # there is probably a better way to do caching!
    try:
        fs.open(target_url)
        return target_url
    except FileNotFoundError:
        pass

    with fsspec.open(source_url, mode="rb") as source:
        with fs.open(target_url, mode="wb") as target:
            target.write(source.read())
    return target_url


@dask.delayed(pure=True, traverse=False)
def nc2zarr(source_url: str, cache_location: str) -> str:
    """convert netcdf data to zarr"""
    fs = fsspec.get_filesystem_class(source_url.split(":")[0])(token="cloud")
    print(source_url)

    target_url = source_url + ".zarr"

    if fs.exists(urlpath.URL(target_url) / ".zmetadata"):
        return target_url

    with dask.config.set(scheduler="single-threaded"):

        try:
            ds = (
                xr.open_dataset(fs.open(source_url), engine="h5netcdf")
                .pipe(preproc)
                .pipe(postproc)
                .load()
                .chunk(chunks)
            )
        except Exception as e:
            raise ValueError(source_url)

        mapper = fs.get_mapper(target_url)
        ds.to_zarr(mapper, mode="w", consolidated=True)

    return target_url

In [None]:
source_url_pattern = "https://climate.northwestknowledge.net/TERRACLIMATE-DATA/TerraClimate_{var}_{year}.nc"
source_urls = []

for var in variables:
    for year in years:
        source_urls.append(source_url_pattern.format(var=var, year=year))
source_urls[:4]

In [None]:
downloads = [download(s, cache_location) for s in source_urls]

download_futures = client.compute(downloads, retries=1)

In [None]:
downloaded_files = [d.result() for d in download_futures]
downloaded_files[:4]

In [None]:
zarrs = [nc2zarr(s, cache_location) for s in downloaded_files]
zarr_urls = dask.compute(zarrs, retries=1, scheduler="single-threaded")
zarr_urls[:4]

In [None]:
ds_list = []
for var in variables:
    temp = []
    for year in tqdm(years):
        mapper = fsspec.get_mapper(
            f"gs://carbonplan-scratch/terraclimate-cache/TerraClimate_{var}_{year}.nc.zarr"
        )
        temp.append(xr.open_zarr(mapper, consolidated=True))
    print(f"concat {var}")
    ds_list.append(
        xr.concat(temp, dim="time", coords="minimal", compat="override")
    )

In [None]:
client.close()
cluster.close()

options.worker_cores = 4
options.worker_memory = 16
cluster = gateway.new_cluster(cluster_options=options)
cluster.adapt(minimum=1, maximum=40)
client = cluster.get_client()
cluster

In [None]:
import zarr

ds = xr.merge(ds_list, compat="override").chunk(chunks)

In [None]:
ds

In [None]:
mapper = fsspec.get_mapper(target_location)
task = ds.to_zarr(mapper, mode="w", compute=False)
dask.compute(task, retries=4)
zarr.consolidate_metadata(mapper)

In [None]:
client.close()
cluster.close()