# Write neXtSIM-OPA and ERA5 data to zarr archives.

This notebook is used to prepare the data for further use.

Before processing, the data needs to be downloaded from [SASIP GitHub](https://github.com/sasip-climate/catalog-shared-data-SASIP/blob/main/outputs/OPA-neXtSIM_CREG025-ILBOXE140.md), e.g. for January 1995 [OpenDAP](https://ige-meom-opendap.univ-grenoble-alpes.fr/thredds/dodsC/meomopendap/extract/SASIP/model-outputs/OPA-neXtSIM_CREG025/OPA-neXtSIM_CREG025-ILBOXE140-S/1995/nextsim/Moorings_1995m01.nc).

The path to this data is set with the *nextsim_path* variable.

Additionally, the ERA5 data has to be linked. An examplary data loading from Google servers is included. Yet, loading from disk will be likely more efficient.

The previously built auxiliary data is linked via the *auxiliary_path* variable.

For further use, the data will be stored in zarr archives, one for the states and forcings for each year, under the folder *store_path*. These archives will be used in the next script.

This script efficiently works with a __dask cluster__. Its settings have to be tuned to your configuration.

In [None]:
nextsim_path = "*** MISSING ***"
auxiliary_path = "../data/auxiliary/ds_auxiliary.nc"

store_path = "*** MISSING ***"

# Import and define cluster for processing

In [None]:
from typing import Tuple

import logging
from itertools import product
import xarray as xr
import numpy as np
from distributed import LocalCluster, Client
from zarr.storage import DirectoryStore, ZipStore
import zarr
import shutil
from numcodecs import Blosc
from tqdm.notebook import tqdm
import pandas as pd

import cartopy.crs as ccrs

In [None]:
logging.basicConfig(level=logging.INFO)

In [None]:
cluster = LocalCluster(local_directory="/tmp", n_workers=32, threads_per_worker=1, memory_limit="16 GB")
client = Client(cluster)
client

In [None]:
source_grid = ccrs.PlateCarree()
target_grid = ccrs.NorthPolarStereo()

# Load auxiliary

In [None]:
ds_aux = xr.open_dataset(auxiliary_path)

In [None]:
mask = ds_aux.stack(grid=("y", "x"))["mask"].astype(bool).values

#Â Load neXtSIM data

In [None]:
years = range(1995, 2019)
months = range(1, 13)

In [None]:
nc_files = [f"{nextsim_path:s}/Moorings_{y:d}m{m:02d}.nc" for y, m in product(years, months)]

In [None]:
dataset = xr.open_mfdataset(
    nc_files,
    parallel=True,
    engine="netcdf4",
    drop_variables=["sea_ice_drift"]
)
dataset = dataset.isel(y=slice(91, None), x=slice(8, -8))
dataset = dataset.chunk({"time": 1})
dataset = dataset.rename_vars({"damage": "sid"})

## Write neXtSIM

In [None]:
state_vars = ["sit", "sic", "sid", "siu", "siv", "snt"]

In [None]:
ds_nextsim = dataset[state_vars].stack(grid=("y", "x")).isel(grid=mask)
ds_nextsim = ds_nextsim.drop_vars(["grid", "y", "x"])

In [None]:
for v in list(ds_nextsim.coords.keys()):
    if ds_nextsim.coords[v].dtype == object:
        ds_nextsim.coords[v] = ds_nextsim.coords[v].astype("unicode")
        
for v in list(ds_nextsim.variables.keys()):
    if ds_nextsim[v].dtype == object:
        ds_nextsim[v] = ds_nextsim[v].astype("unicode")
compressor = Blosc()
encoding = {}
for data_var in ds_nextsim.data_vars:
   encoding[data_var] = {"compressor": compressor}

In [None]:
for year in tqdm(years):
    curr_data = ds_nextsim.sel(time=f"{year:04d}")
    curr_data.to_zarr(
        f"{store_path:s}/states_creg025_v02_{year:04d}.zarr",
        mode="w", consolidated=True,
        encoding=encoding
    )

# Load the forcing

## Exemplary ERA5 data loading from Google cloud

## Prepare forcings

In [None]:
ds_era5 = ds_era5.sel(time=pd.date_range("1993-01-01 03:00", "2019-01-01 03:00", freq="6h"))

In [None]:
lat_mask = np.logical_and(
    90 >= ds_era5["latitude"].values,
    ds_era5["latitude"].values >= 40
)

In [None]:
ds_era5 = ds_era5.isel(latitude=lat_mask)
ds_era5 = ds_era5.stack(grid=["latitude", "longitude"])

## Interpolate to needed data

In [None]:
def determine_nearest_idx(
        dataset: xr.Dataset,
        ds_aux: xr.Dataset,
        lon_lat_names: Tuple[str, str] = ("longitude", "latitude")
) -> Tuple[np.ndarray, np.ndarray]:
    dataset_points = target_grid.transform_points(
        source_grid,
        dataset[lon_lat_names[0]].values.flatten(),
        dataset[lon_lat_names[1]].values.flatten()
    )[..., :2]
    nextsim_points = target_grid.transform_points(
        source_grid,
        ds_aux["longitude"].values,
        ds_aux["latitude"].values
    )[..., :2]
    
    dataset_tree = KDTree(dataset_points)
    nearest_dist, nearest_idx = dataset_tree.query(nextsim_points, workers=-1)
    return nearest_idx, nearest_dist

def interpolate_to_nextsim(
        dataset: xr.Dataset,
        ds_aux: xr.Dataset,
        lon_lat_names: Tuple[str, str] = ("longitude", "latitude")
) -> xr.Dataset:
    target_grid = ds_aux[["longitude", "latitude"]].stack(grid=["y", "x"])
    nearest_idx, _ = _determine_nearest_idx(
        dataset, target_grid, lon_lat_names
    )
    dataset = dataset.drop_vars(lon_lat_names)
    dataset = dataset.isel(grid=nearest_idx)
    dataset = dataset.assign_coords(
        xr.Coordinates.from_pandas_multiindex(
            target_grid.indexes["grid"], "grid"
        )
    )
    dataset = dataset.assign_coords(
        longitude=target_grid["longitude"],
        latitude=target_grid["latitude"],
    )
    dataset = dataset.unstack("grid")
    dataset = dataset.chunk({"time": 1, "y": -1, "x": -1})
    return dataset

In [None]:
ds_era5 = interpolate_to_nextsim(ds_era5, ds_aux)

## Rotated to x- and y-direction

In [None]:
def estimate_rotation_matrices(
        ds_aux: xr.Dataset
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Estimates the rotation from vectors in lat/lon into curvilinear coordinates,
    as given by the sea-ice simulations. The function is inspired by
    the "geo2ocean" module from the NEMO model "/src/OCE/SBC/geo2ocean.F90".
    Returns the sin and cosine of the rotation for the direct application on
    U and V.
    """
    mesh = np.stack(
        (ds_aux["x_coord"].values, ds_aux["y_coord"].values),
        axis=0
    )

    x_north = -mesh[0]
    y_north = -mesh[1]

    x_diff, y_diff = calculate_differences(mesh)
    normalizer = calculate_normalizer(x_north, y_north, x_diff, y_diff)
    sin_rot, cos_rot = calculate_rotation_arrays(
        x_north, y_north, x_diff, y_diff, normalizer
    )
    lon_diff = calculate_lon_diff(ds_aux)
    sin_rot, cos_rot = apply_lon_mask(sin_rot, cos_rot, lon_diff)
    return sin_rot, cos_rot


def rotate_wind(
        wind_arrays: Tuple[np.ndarray, np.ndarray],
        rot_arrays: Tuple[np.ndarray, np.ndarray],
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Apply the rotation to u and v with given sine and cosine of the rotation
    angle.
    Returns the rotated U and V.
    """
    u_rot = wind_arrays[0] * rot_arrays[1] + wind_arrays[1] * rot_arrays[0]
    v_rot = wind_arrays[1] * rot_arrays[1] - wind_arrays[0] * rot_arrays[0]
    return u_rot, v_rot

In [None]:
def apply_rotation(
        dataset: xr.Dataset,
        ds_aux: xr.Dataset,
        var_names: Tuple[str, str] = ('uas', 'vas'),
) -> xr.Dataset:
    rot_arrays = estimate_rotation_matrices(ds_aux)

    def rotate_velocities(uas, vas):
        return rotate_wind((uas, vas), rot_arrays=rot_arrays)
    
    ds_vel_rotated = xr.apply_ufunc(
        rotate_velocities,
        dataset[var_names[0]], dataset[var_names[1]],
        input_core_dims=(("y", "x"), ("y", "x")),
        output_core_dims=(("y", "x"), ("y", "x")),
        vectorize=True,
        dask="parallelized",
        output_dtypes=[np.float32, np.float32],
    )
    ds_rotated = dataset.assign({
        var_names[0]: ds_vel_rotated[0],
        var_names[1]: ds_vel_rotated[1],
    })
    return ds_rotated

In [None]:
ds_era5 = apply_rotation(
    ds_era5,
    ds_aux,
    ["10m_u_component_of_wind", "10m_v_component_of_wind"]
)

In [None]:
ds_era5 = ds_era5.stack(grid=("y", "x")).isel(grid=mask)

## Extract variables

### Humidity

In [None]:
RD = 287.0597
RV = 461.51
EPSILON = _RD / _RV


MAGNUS_SETTINGS = dict(
    a1_w = 611.21,
    a3_w = 17.502,
    a4_w = 32.19,
    a1_i = 611.21,
    a3_i = 22.587,
    a4_i = -0.7,
    t0 = 273.16,
    tice = 250.16,
)


def estimate_sat(temp, a1, a3, a4):
    return a1 * np.exp(a3 * ((temp-MAGNUS_SETTINGS["t0"])/(temp-a4)))


def interp_sat(temp):
    sat_w = estimate_sat(
        temp,
        MAGNUS_SETTINGS["a1_w"],
        MAGNUS_SETTINGS["a3_w"],
        MAGNUS_SETTINGS["a4_w"]
    )
    sat_i = estimate_sat(
        temp,
        MAGNUS_SETTINGS["a1_i"],
        MAGNUS_SETTINGS["a3_i"],
        MAGNUS_SETTINGS["a4_i"]
    )
    alpha = (
        (temp - MAGNUS_SETTINGS["tice"])/
        (MAGNUS_SETTINGS["t0"] - MAGNUS_SETTINGS["tice"])
    )**2
    alpha = np.clip(alpha, 0, 1)
    return alpha * sat_w + (1-alpha) * sat_i


def estimate_relative_humidity(
        dataset: xr.Dataset, t2m_name: str = "t2m", d2m_name: str = "d2m"
) -> xr.Dataset:
    """
    From ECMWF's IFS documentation Part IV Physical processes
    """
    sat_t2m = xr.apply_ufunc(
        interp_sat, dataset[t2m_name], dask="allowed"
    )
    sat_d2m = xr.apply_ufunc(
        interp_sat, dataset[d2m_name], dask="allowed"
    )
    dataset = dataset.assign(rhus=(sat_d2m/sat_t2m).rename("rhus") * 100.)
    return dataset


def estimate_specific_hum(
        dataset: xr.Dataset, d2m_name: str = "d2m", pressure_name: str = "ps"
) -> xr.Dataset:
    sat_d2m = xr.apply_ufunc(
        interp_sat, dataset[d2m_name], dask="allowed"
    )
    denom = dataset[pressure_name] + sat_d2m * (EPSILON - 1)
    specific_hum = sat_d2m * EPSILON / denom
    dataset = dataset.assign(huss=specific_hum.rename("huss"))
    return dataset

In [None]:
ds_era5 = estimate_specific_hum(
    ds_era5,
    "2m_dewpoint_temperature", "surface_pressure"
)
ds_era5 = estimate_relative_humidity(
    ds_era5, "2m_temperature", "2m_dewpoint_temperature"
)

### Degree days

In [None]:
default_avg_len = dict(
    month=60,
    year=732
)


def estimate_degree_days(
        dataset: xr.Dataset,
        t2m_name: str = "t2m",
        t2m_threshold: float = 271.35,
        avg_len: Dict[str, int] = None
) -> xr.Dataset:
    if avg_len is None:
        avg_len = default_avg_len
    diff_t2m = dataset[t2m_name] - t2m_threshold
    pdd = diff_t2m.clip(min=0)
    avg_pdd = {
        f"pdd_{suffix}": pdd.rolling(
            time=l, min_periods=l, center=False
        ).mean(dim="time")
        for suffix, l in avg_len.items()
    }
    fdd = -diff_t2m.clip(max=0)
    avg_fdd = {
        f"fdd_{suffix}": fdd.rolling(
            time=l, min_periods=l, center=False
        ).mean(dim="time")
        for suffix, l in avg_len.items()
    }
    dataset = dataset.assign(**avg_pdd)
    dataset = dataset.assign(**avg_fdd)
    return dataset

In [None]:
ds_era5 = estimate_degree_days(
    ds_era5,
    "2m_temperature",
    t2m_threshold=271.35,
    avg_len={
        "month": 120,
        "year": 1464
    }
)

## Combine all needed variables

In [None]:
ds_era5 = ds_era5.rename({
    "2m_temperature": "tus",
    "10m_u_component_of_wind": "uas",
    "10m_v_component_of_wind": "vas"
})

In [None]:
ds_era5 = ds_era5.drop_vars(["2m_dewpoint_temperature", "surface_pressure"])

## Slice the time to dataset time

In [None]:
ds_era5 = ds_era5.sel(time=dataset.indexes["time"])

## Write the forcing

In [None]:
ds_era5 = ds_era5.drop_vars(["grid", "y", "x"])

In [None]:
ds_era5 = ds_era5.chunk({"time": 1, "grid": -1})

In [None]:
for v in list(ds_era5.coords.keys()):
    if ds_era5.coords[v].dtype == object:
        ds_era5.coords[v] = ds_era5.coords[v].astype("unicode")
        
for v in list(ds_era5.variables.keys()):
    if ds_era5[v].dtype == object:
        ds_era5[v] = ds_era5[v].astype("unicode")
compressor = Blosc()
encoding = {}
for data_var in ds_era5.data_vars:
   encoding[data_var] = {"compressor": compressor}

In [None]:
for year in tqdm(years):
    curr_data = ds_era5.sel(time=f"{year:04d}")
    curr_data.to_zarr(
        f"{store_path:s}/forcings_creg025_v02_{year:04d}.zarr",
        mode="w", consolidated=True,
        encoding=encoding
    )