In [None]:
from dask.distributed import LocalCluster

cluster = LocalCluster(n_workers=10)  # Fully-featured local Dask cluster
# cluster = LocalCluster()
client = cluster.get_client()
client

In [None]:
import numpy as np


def compute_scale_and_offset_mm(min, max, n=16):
    vmin = min
    vmax = max
    # stretch/compress data to the available packed range
    scale_factor = (vmax - vmin) / (2**n - 1)
    # translate the range to be symmetric about zero
    add_offset = vmin + 2 ** (n - 1) * scale_factor
    print(scale_factor, add_offset)
    return scale_factor, add_offset


def compute_scale_and_offset(na):
    vmin = np.min(na).item()
    vmax = np.max(na).item()
    return compute_scale_and_offset_mm(vmin, vmax)


def get_min_max_from_persist(pers_array):
    v_min = pers_array.min().compute().values.item()
    v_max = pers_array.max().compute().values.item()
    print(v_min, v_max)
    return v_min, v_max


def get_scale_offset_from_persist(pers_array):
    v_min, v_max = get_min_max_from_persist(pers_array)
    return compute_scale_and_offset_mm(v_min, v_max)

In [None]:
import xarray as xr

days = []

for cur_year in range(1984, 2024):
    ds = xr.open_dataset(
        f"/data/era5/raw/2m_temperature/2m_temperature-{cur_year}.nc",
        chunks={"time": 24},
    )
    day = ds.resample(time="D").max()
    days.append(day)

day_concet = xr.concat(days, dim="time")

In [None]:
persisted = client.persist(day_concet)
day_scale, day_offset = get_scale_offset_from_persist(persisted["t2m"])

persisted.to_netcdf(
    "data/output/2m_temperature-day-max.nc",
    encoding={
        "t2m": {
            "dtype": "int16",
            "missing_value": -32767,
            "_FillValue": -32767,
            "scale_factor": day_scale,
            "add_offset": day_offset,
        }
    },
)

# month

In [None]:
month = persisted.resample(time="ME").max()
persisted_month = client.persist(month)
month_scale, month_offset = get_scale_offset_from_persist(persisted_month["t2m"])

persisted_month.to_netcdf(
    "data/output/2m_temperature-month-max.nc",
    encoding={
        "t2m": {
            "dtype": "int16",
            "missing_value": -32767,
            "_FillValue": -32767,
            "scale_factor": month_scale,
            "add_offset": month_offset,
        }
    },
)

# year

In [None]:
year = persisted_month.resample(time="YE").max()
persisted_year = client.persist(year)
year_scale, year_offset = get_scale_offset_from_persist(persisted_year["t2m"])

persisted_year.to_netcdf(
    "data/output/2m_temperature-year-max.nc",
    encoding={
        "t2m": {
            "dtype": "int16",
            "missing_value": -32767,
            "_FillValue": -32767,
            "scale_factor": year_scale,
            "add_offset": year_offset,
        }
    },
)

# End

In [None]:
cluster.close()

# Check

In [None]:
import xarray as xr
ds = xr.open_dataset("data/output/2m_temperature-day-max.nc")
ds