# 

In [1]:
%env EVALUATION_SYSTEM_CONFIG_DIR=/work/ch1187/clint/nextgems/freva/
%env EVALUATION_SYSTEM_CONFIG_FILE=/work/ch1187/clint/nextgems/freva/evaluation_system.conf

env: EVALUATION_SYSTEM_CONFIG_DIR=/work/ch1187/clint/nextgems/freva/
env: EVALUATION_SYSTEM_CONFIG_FILE=/work/ch1187/clint/nextgems/freva/evaluation_system.conf


In [None]:
from freva_client import databrowser

In [None]:
databrowser.metadata_search(fs_type="s3")

In [None]:
databrowser.metadata_search(fs_type="s3")['project']

In [None]:
for facet, values in databrowser.metadata_search(fs_type='s3').items():
    if facet in ('realm','time_frequency','experiment'):
        print(facet,'\n\t',values)

In [None]:
search_keys = {
    'fs_type': 's3',
    'project':'cesm2-le',
    'experiment': 'historical',
    'realm': 'atm',
    'time_frequency':'monthly'
}
'ts' in databrowser.metadata_search(**search_keys)['variable']

In [None]:
db = databrowser(variable='ts', **search_keys)
db,list(db)[:5]

In [None]:
import xarray as xr
import numpy as np
def field_mean(
    data: xr.DataArray,
    lat_name: str = "lat",
    lon_name: str = "lon",
    mean_dims: tuple[str, str] = ("lat", "lon")
) -> xr.DataArray:
    """
    Compute an area-weighted mean over latitude and longitude using cosine latitude weights.

    Parameters
    ----------
    data : xr.DataArray
        Input data array with latitude and longitude coordinates.
    lat_name : str, optional
        Name of the latitude coordinate. Default is "lat".
    lon_name : str, optional
        Name of the longitude coordinate. Default is "lon".
    mean_dims : Sequence[str], optional
        Dimensions over which to compute the mean. Default is ("lat", "lon").

    Returns
    -------
    xr.DataArray
        Area-weighted mean of the input data over the specified dimensions.
    """
    # Extract latitude values
    lat = data[lat_name]

    # Compute cosine of latitude in radians
    weights = np.cos(np.deg2rad(lat))

    # Normalize weights so they sum to 1 (over just lat)
    weights /= weights.sum()

    # Apply weighted mean over specified dimensions
    return data.weighted(weights).mean(dim=mean_dims, keep_attrs=True)

In [None]:
xr.open_zarr(list(db)[0], storage_options={ 'anon':True })

In [None]:
s3_opts = { 'anon':True }
from pathlib import Path
time_series = {}

for fileurl in db:
    print(f"Opening {fileurl}...")
    engine = {'engine':'zarr'} if fileurl.endswith('zarr') else {}
    %time ds = xr.open_dataset(fileurl, **engine, storage_options=s3_opts)
    ds = ds.rename({'member_id':'ensemble'})
    ds
    
    # Going through all ensembles might take some time
    # let's make a cut at 5 member for demo purposes
    # Memory usage might spike to ~70GB 
    members=ds.ensemble[:5]
    ds = ds.sel(ensemble=members)    
    
    dataset_name=Path(fileurl).stem   

    
    mean_ts = field_mean(ds["TS"])
    mean_ts.attrs['source_dataset'] = fileurl
    time_series[dataset_name] = mean_ts
   

In [None]:
import matplotlib.pyplot as plt

# Create a color cycle for different experiments
colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]

plt.figure(figsize=(12, 5))

for i, (exp, data) in enumerate(time_series.items()):
    ts = data.resample(time="1YE").mean()
    ts_min = ts.min(dim="ensemble").squeeze()
    ts_max = ts.max(dim="ensemble").squeeze()
    ts_mean = ts.mean(dim="ensemble").squeeze()
    time_str = ts.time.dt.strftime("%Y").values
    time_values = np.array(ts.time.values, dtype='datetime64[ns]')
    # Plot min–max shading
    plt.fill_between(
        time_values,
        ts_min.values,
        ts_max.values,
        color=colors[i % len(colors)],
        alpha=0.2,
        label=None,
    )
    # Plot mean line
    plt.plot(
        time_values,
        ts_mean.values,
        color=colors[i % len(colors)],
        linewidth=2,
        label=f"{exp}"
    )

# Add plot decorations
plt.title("Ensemble Mean and Spread for Each Dataset")
plt.xlabel("Time")
plt.ylabel(f"{data.attrs['long_name']} [{data.attrs['units']}]")  # Replace with actual units
plt.legend()
plt.grid(True)
plt.tight_layout()

# We need to save the figure before showing
import io
fig_buf = io.BytesIO()
fig_format = 'png'
plt.savefig(fig_buf, format=fig_format)

# Finally plot 
plt.show()

# Let's save the plot on S3, as well as the datasets!

In [None]:
from os import environ # to get USER env variable
USERNAME=environ['USER']
s3_config = {
    'bucket' : 'freva',
    'endpoint' :'https://s3.eu-dkrz-1.dkrz.cloud', # DKRZ Minio S3
    'prefix' : f'workshop/{environ['USER']}', # Avoid users writting object with same prefix
    'access_key_id' : "s3handson", # Only valid during the workshop
    'secret_access_key' : "s3handson", # Only valid during the workshop
    'region' : 'eu-dkrz-1',
}

import s3fs
s3 = s3fs.S3FileSystem(
    key=s3_config['access_key_id'],
    secret= s3_config['secret_access_key'],
    client_kwargs={'endpoint_url': s3_config['endpoint']},
)

full_prefix = s3_config['bucket']+'/'+s3_config['prefix'] 
testobj=f'{full_prefix}/hi.txt'
s3.write_bytes(testobj, b'Hi!\n')

print(f"Writing data to S3 works! Test it with:\ncurl {s3_config['endpoint']}/{testobj}")

In [None]:
!curl https://s3.eu-dkrz-1.dkrz.cloud/freva/workshop/${USER}/hi.txt

## Save the figure on S3
 - Save the figure into a buffer instead of a file
 - Make a `PUT` request to write the _in-memory_ image into an S3 object

In [None]:
figure_path = f'{full_prefix}/figure-ts-mean.{fig_format}'
fig_buf.seek(0) ## rewind the buffer to the beginning
s3.write_bytes(figure_path, fig_buf.getvalue())
print(f"{s3_config['endpoint']}/{figure_path}")

### Lets write the data to S3 as both NetCDF and ZARR

In [None]:
def write_dataset_to_s3(name, dataset:xr.Dataset, file_format='nc'):
    _supported_types = ('nc','zarr')
    file_format = file_format.replace('.','')
    if file_format not in _supported_types:
        raise Exception('Unsuported file format, use one of')
    
    ## Copy to s3
    s3_path = f'{full_prefix}/{name}.{file_format}'
    if file_format == 'nc':
        tmp_name=f"/scratch/{USERNAME[0]}/{USERNAME}/{name}.nc"

        dataset.to_netcdf(tmp_name, engine='h5netcdf')
            
        ## Copy to s3
        with s3.open(s3_path,'wb') as s3file:
            with open(tmp_name, 'rb') as tmpf:
                s3file.write(tmpf.read())

        ## Check if we can open with Xarray and CDO!
        with s3.open(s3_path,'rb') as s3file:
            xr.open_dataset(s3file, engine='h5netcdf')

        print(f"Try running:\n\t" \
              f"/fastdata/k20200/k202186/public/bin/ncdump -h {s3_config['endpoint']}/{s3_path}#mode=s3,bytes\n\t"\
              f"/fastdata/k20200/k202186/public/bin/cdo sinfo {s3_config['endpoint']}/{s3_path}#mode=s3,bytes")
        
        ## We can now remove the local copy
        import os
        os.remove(tmp_name)

    elif file_format == 'zarr':
        # We need to create a S3 store zarr store
        storage_options = { 'key':s3_config['access_key_id'],'secret':s3_config['secret_access_key'],'client_kwargs':{'endpoint_url': s3_config['endpoint']}}
        write_zarr(f's3://{s3_path}', dataset, s3_options=storage_options)
        print(f"Try running:\n\t" \
              f"/fastdata/k20200/k202186/public/bin/ncdump -h {s3_config['endpoint']}/{s3_path}#mode=s3,zarr\n\t"\
              f"/fastdata/k20200/k202186/public/bin/cdo sinfo {s3_config['endpoint']}/{s3_path}#mode=s3,zarr")
        

In [None]:
def write_zarr(store, data: xr.Dataset, s3_options={}):
    if type(data) == xr.DataArray:
        data= data.to_dataset()
    import zarr
    zarr.config.set(default_zarr_format=2)
    chunk_encoding = { "chunk_key_encoding": zarr.core.chunk_key_encodings.V2ChunkKeyEncoding(separator="/").to_dict() }
    
    def get_encoding(data: xr.Dataset):
        import numcodecs
        codec = numcodecs.Blosc(shuffle=1, clevel=6)
        return {
            var: {
                #"chunks": get_chunks(dataset[var].dims),
                "compressors": codec,
                "chunk_key_encoding": zarr.core.chunk_key_encodings.V2ChunkKeyEncoding(separator="/").to_dict(),
            }
            for var in data.variables
            #if var not in dataset.dims
        }
    zstore = data.to_zarr(store,
                          mode='w', # OVERWRITES!!
                          encoding=get_encoding(data),
                          consolidated=True, # consolidate metadata for fast access
                          storage_options=s3_options)

In [None]:
for name, data in time_series.items():
    ## for CDO it is important that time is the first dimension
    ## also it cannot have indexers of type string
    ensemble = data.ensemble.values
    ensemble_id = np.arange(len(ensemble))
    
    da = data.to_dataset() \
            .transpose('time',...) \
            .assign_coords(ensemble_id=('ensemble',ensemble_id)) \
            .swap_dims({'ensemble':'ensemble_id'}) \
            .reset_coords(drop=True)\
            .assign_attrs(ensembles= ', '.join(data.ensemble.values))
    
    write_dataset_to_s3(name, da, file_format='nc')
    write_dataset_to_s3(name, da, file_format='zarr')
    #write_zarr('/scratch/k/k202186/test.zarr',da)
da