In [1]:
import sys
import os
#import torch
import zarr
import fsspec
import lilio
import xarray as xr
import numpy as np
import pandas as pd
import pyarrow as pa
import matplotlib.pyplot as plt

#from torch import nn
from pathlib import Path

sys.path.append(os.path.expanduser('~/Documents/Medley'))
from scripts.prepare_monthly_ts_data import get_monthly_data

### Storage of SPEIcube
from cloud to cluster

In [34]:
ds = xr.open_zarr(fsspec.get_mapper('https://s3.bgc-jena.mpg.de:9000/xaida/SPEICube.zarr'), consolidated=True)

In [3]:
# Adding some information, and re-chunking
for running_window in [30,90,180]:
    ds[f'spei_{running_window}'].encoding.pop('chunks')
    ds[f'spei_{running_window}'].encoding.pop('preferred_chunks')
    ds[f'spei_{running_window}'].attrs.update({'long_name':'antecedent_water_surplus_or_deficit',
                                               'units':'mm',
                                               'calculation':'SUM(TP-PET)',
                                               'window_length_days':running_window,
                                               'potential_evap':'Penman_Monteith',
                                               'source':'ERA5_and_Max_Planck_MPG',
                                               'project':'XAIDA',
                                               'workpackage':3,})
ds = ds.chunk({'latitude':50,'longitude':50})

In [35]:
localpath = Path('/scistor/ivm/data_catalogue/reanalysis/ERA5_0.25/WSD/1979_2021_water_surplus_deficit_daily.zarr')
# Writing out daily data for everyone.
#ds.to_zarr(localpath)

In [3]:
def monthly_resample_func(ds: xr.Dataset, how = 'mean') -> xr.Dataset:
    resampled_object = ds.resample(time = 'M', label = 'left')
    f = getattr(resampled_object, how)
    newds = f(skipna = False)
    newds.coords['time'] = newds.time + pd.Timedelta('1D') # Shifting due to weird label behaviour.
    for varname in (varname for varname in newds.variables if ('time' in newds[varname].dims)):
        newds[varname].attrs.update({'resample':f'monthly_{how}'})
    return newds

def chunk_func(ds: xr.Dataset, chunks = {'latitude':50, 'longitude':50}) -> xr.Dataset:
    #ds = ds.transpose("time","latitude", "longitude")
    chunks.update({'time':len(ds.time)}) # Needs to be specified as well, otherwise chunk of size 1.
    return ds.chunk(chunks)

In [12]:
# creating a monthly mediterranean subset.
# It is already aggregated/accumulated, but still sampling to monthly mean (mean of preceeding drought within a month, potentially extending until before that month)
ds = xr.open_zarr(localpath)
ds = ds.sel(latitude = slice(65,20),longitude = slice(-20,55))
outpath = Path('/scistor/ivm/jsn295/Medi/monthly/1979_2021_monthly_water_surplus_deficit.zarr')
ds = monthly_resample_func(ds, how = 'mean')
ds = chunk_func(ds)
ds.to_zarr(outpath)

<xarray.backends.zarr.ZarrStore at 0x1554096bc120>

### Label and event cubes
Downloaded manually from https://nextcloud.bgc-jena.mpg.de/s/MTc2BNJNkoSMxdH. Cannot be resampled to months because consists of integers. Just combining them into a single store.

In [2]:
downloadpath = Path('/scistor/ivm/jsn295/Medi/daily/cloud_download/')

In [49]:
key_value = {}
for window in [30,90,180]:
    da = xr.open_zarr(downloadpath / f'EventCube_Univariate_spei_{window}_0.05.zarr', consolidated = False)['layer']
    da = da.sel(latitude = slice(65,20),longitude = slice(-20,55))
    da.encoding.pop('chunks')
    da.encoding.pop('preferred_chunks')
    da.attrs.update({'window_length_days':window,'threshold':0.05})
    key_value.update({f'spei_{window}':da})
ds = xr.Dataset(key_value)
ds = chunk_func(ds)
ds.to_zarr(downloadpath.parent / '1979_2021_water_surplus_deficit_daily_EventCube.zarr', consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x1553ddb7b1b0>

In [10]:
# Labelcubes are already only limited to Europe.
key_value = {}
for window in [30,90,180]:
    da = xr.open_zarr(downloadpath / f'LabelCube_Univariate_spei_{window}_0.05_Europe_1979_2021.zarr', consolidated = False)['layer']
    da.encoding.pop('chunks')
    da.encoding.pop('preferred_chunks')
    da.attrs.update({'window_length_days':window,'threshold':0.05})
    key_value.update({f'spei_{window}':da})
ds = xr.Dataset(key_value)
ds = chunk_func(ds)
ds.to_zarr(downloadpath.parent / '1979_2021_water_surplus_deficit_daily_LabelCube.zarr', consolidated=True)
#test = xr.open_zarr(downloadpath / 'LabelCube_Univariate_spei_180_0.05_Europe_1979_2021.zarr', consolidated=False)

<xarray.backends.zarr.ZarrStore at 0x15540a1f5690>

#### creating a land-sea mask for the WSD variables from WP3

In [20]:
example = xr.open_zarr('/scistor/ivm/jsn295/Medi/monthly/1979_2021_monthly_water_surplus_deficit.zarr').isel(time = 0)
whereland = xr.open_dataarray('/scistor/ivm/jsn295/ERA5/sst_nhplus.nc')[0,:,:].isnull()
whereland.name = 'land'
whereland = whereland.reindex_like(example, method = 'nearest').drop('time')

In [22]:
whereland.to_netcdf('/scistor/ivm/jsn295/Medi/landseamask_wp3.nc',encoding={'land':{'dtype':'byte'}})

### Storage of E-OBS

In [4]:
# varnmane currently unused argument.
def chunk_and_to_zarr(ncpath: Path, varname: str , to_monthly: bool = False) -> tuple[xr.DataArray,Path]:
    """ Returns array and suggested zarrpath """
    assert ncpath.exists(), 'please provide an existing path'
    zarrpath = ncpath.parent / (ncpath.parts[-1][:-3] + '.zarr') # Stripping .nc and replacing with zarr
    daskarray = xr.open_dataset(ncpath).drop_vars('time_bnds', errors = 'ignore') # Keeping as dataset.
    if to_monthly:
        daskarray = monthly_resample_func(daskarray, how = 'sum')
    daskarray = chunk_func(daskarray)
    # Writing and filling
    return daskarray, zarrpath

In [5]:
newdir = Path('/scistor/ivm/jsn295/Medi/monthly/')
# daily to monthly obs.
daskarray, zarrpath = chunk_and_to_zarr(ncpath = Path('/scistor/ivm/data_catalogue/observations/EOBS/rr_ens_mean_0.1deg_reg_v26.0e.nc'),
                             varname = 'rr',
                             to_monthly = True)
daskarray.to_zarr(newdir / f'{zarrpath.name[:3]}mon_{zarrpath.name[3:]}', consolidated=True) # adding month to name

<xarray.backends.zarr.ZarrStore at 0x155409480c10>

In [15]:
# Only changing the time axis (from mid-stamped to left-stamped). SPI is already monthly.
daskarray, zarrpath = chunk_and_to_zarr(ncpath = Path('/scistor/ivm/data_catalogue/observations/EOBS/spi3_mon_0.1deg_reg_ens_median_E-OBSv23.1e.nc'),
                             varname = 'spi3ETCCDI',
                             to_monthly = False)
daskarray.coords['time'] = pd.date_range(pd.Timestamp(daskarray.time.values[0]).strftime('%Y-%m-01'), freq = 'MS', periods = len(daskarray.time))
daskarray.to_zarr(newdir / zarrpath.name)

<xarray.backends.zarr.ZarrStore at 0x15540a8e77d0>