# LakeCREST notebooks
*Version: 06.04.2022 17:25*
## 1. Unpack ESA CCI Lakes (single lake, from disk)
In this script we will use [**xarray**](https://docs.xarray.dev/en/latest/index.html) and [**dask**](https://dask.org/) to load, mask and subset the [**ESA CCI Lakes v1.1**](https://catalogue.ceda.ac.uk/uuid/ef1627f523764eae8bbb6b81bf1f7a0a). The unpacking in this script is based on loading the pre-downloaded dataset from a local disk. Using xarray and dask, two free and open-source libraries, allows us to fully utilize the computing power of our machine in parallelized workflows scaled to the system at hand.

### 1.1 Importing modules
First, we import the necessary python modules for the following steps.

In [1]:
import pathlib
import xarray as xr
import numpy as np
import time
import pandas as pd
from dask.distributed import Client, LocalCluster

### 1.2 Define ROI
Here we define the desired lake and data paths. These are the only necessary user inputs to run the export of LSWT+LIC subsets. Depending on the system the dask settings in *1.3 Dask initialization* can be adapted as well to confine to individual memory and processing limits. To export data on other available variables, more information can be found in the [**D4.3: Product User Guide (PUG)**](https://climate.esa.int/media/documents/CCI-LAKES-0029-PUG_v1.1_signed_CA.pdf). To mask the lakes from the global dataset we are using the lakemask "ESACCI-LAKES_mask_v1.nc" that can be accessed as part of [**ESA CCI Lakes v1.0**](https://catalogue.ceda.ac.uk/uuid/3c324bb4ee394d0d876fe2e1db217378).
#### Define filepaths (✦ User inputs)

In [2]:
# Define lake to unpack
lakename = 'Garda'

# Define data directory path and filenames
path_data = pathlib.Path(r'D:\lakecrest\esa_cci_lp\v1.1') # Path to ESA CCI Lakes data folder path
path_mask = pathlib.Path(r'D:\lakecrest\esa_cci_lp\mask\ESACCI-LAKES_mask_v1.nc') # Path to lakemask
path_dask = pathlib.Path(r'C:\Users\Micha\Desktop\dask') # Path to temporary dask workerspace

Now we can search for the CCI_lakeid corresponding to the defined lake using the provided table.

In [3]:
# Load in table of lakes with lake coordinates and the CCI_lakeid
# based on D4.3 Product User Guide (PUG) - Annex B: List of lakes
try:
    df = pd.read_csv('lakelist_v1.1.csv', delimiter=';')
except:
    print('Error: Did not find the lakelist .csv file, check that it is in the same folder!')

# Find lakeid for specified lake
lakeid = df.loc[df['name'] == lakename]['cci_lakeid'].values[0]
lake_idx = df[df['name']==lakename].index.values[0]

print(f'The lakeid for Lake {lakename} is {lakeid}.')

# Get a preview of table around idx
df[lake_idx-2:lake_idx+3]

The lakeid for Lake Garda is 505.


Unnamed: 0,name,latitude,longitude,cci_lakeid
63,Fort Peck,47.68,-107.29,201
64,Fureso,72.03,-26.29,185
65,Garda,45.61,10.63,505
66,Geist,39.94,-85.94,215311
67,General Carrera/Buenos Aires,-46.66,-72.5,94


We can use [**hvplot**](https://hvplot.holoviz.org/) to plot the pandas table in a map-overview

In [22]:
import hvplot.pandas

# Plot pandas table to map-view with hvplot
df.hvplot.points(x='longitude', y='latitude',
                 color='red', alpha=0.5,
                 geo=True, tiles='OSM', 
                 hover_cols='all',
                 coastline=True,
                 xlabel='Longitude', ylabel='Latitude')

### 1.3 Dask initialization
We initialize a local Dask client with our specified number of workers, threads per worker and memory limit (per worker). Calling the client outputs the client adress, so we can access the client over its webinterface. A good starting point for the settings is to set *n_workers* to the number of physical cores and *threads_per_worker* to number of virtual cores.

In [None]:
# Define according to system specs
n_workers = 4            # (e.g. number of physical cores)
threads_per_worker = 4     # (e.g. virtual cores / n_workers)
memory_limit = '8GB'       # (e.g. max memory / n_workers)

local_directory = path_dask
cluster = LocalCluster(n_workers=n_workers, 
                       threads_per_worker=threads_per_worker, 
                       memory_limit=memory_limit,
                       local_directory=local_directory
                      )
client = Client(address=cluster.scheduler_address)
client

### 1.4 Specify chunk size
We use xarray to load the large multi-file dataset. xarray allows us to initialize and load the entire dataset by only providing the necessary filepaths. Instead of loading the entire dataset (>350GB) to memory, we can make use of xarray's ability to lazy-load chunks of data. This means that only the necessary subset of each individual .nc file will be loaded into memory at the time it is needed. For this we can either pre-define a chunk size or let xarray automatically define a size.

In [None]:
# dask decides chunk size
chunks='auto'

# Alternatively set chunks to specified size
#chunks={'lat':1000,
#        'lon':1000,
#        'time':1
#        }

### 1.5 Load an individual .nc file
To test out xarray and get a preview of the ESA CCI Lakes dataset we can load a single file from the dataset. For this we will use the [xarray.open_dataset](https://docs.xarray.dev/en/stable/generated/xarray.open_dataset.html) function. We can get a preview of the loaded data, its attributes and variables in the console view.

In [None]:
# Use pathlib.Path.rglob function to recursively find all .nc files within the data folder
paths_data = list(path_data.rglob('*fv1.1.nc'))

# Get the first filepath from the list of .nc files
path_fn_first = paths_data[0]

# Load the file with xarray
DS_preview = xr.open_dataset(filename_or_obj=path_fn_first,
                             engine='netcdf4',
                             decode_cf=False,
                             chunks=chunks)
DS_preview

### 1.6 Spatial subsetting and variable selection
Because we are only interested in a specific lake and a subset of the 50+ available variables we subset the dataset. Therefore, we setup a function **preprocess(ds)** that subsets the daily .nc file to only fetch information about the desired variables in a bounding-box based on the bounding coordinates of the desired lake. The preprocess function will be run on every daily .nc file when the multifile-dataset is initialized.

In [None]:
# List of the variables we want to load
variables = ['lake_surface_water_temperature',
              'lswt_quality_level',
              'lswt_uncertainty',
              'lake_ice_cover'
              ]

# Load CCI lakes mask
DS_mask = xr.open_dataset(filename_or_obj=path_mask,
                          engine='netcdf4',
                          decode_cf=True,
                          chunks=chunks
                          )

# Get logical True/False lake mask over full globe
mask_full = (DS_mask.CCI_lakeid == lakeid)

# Get logical True/False lake mask sliced over ROI only
mask_roi = mask_full.where(mask_full, drop=True)

# Get bounds coordinates
lat_min = mask_roi.lat[0].values
lat_max = mask_roi.lat[-1].values
lon_min = mask_roi.lon[0].values
lon_max = mask_roi.lon[-1].values

def preprocess(ds):
    '''Keeps only the necessary lat/lon slice and necessary variables when opening .nc files'''
    return ds[variables].sel(lat=slice(lat_min, lat_max), 
                             lon=slice(lon_min, lon_max))

print(f'Created preprocess(ds) function to subset with bbox ' \
      f'lat: ({lat_min:0.1f}, {lat_max:0.1f}), ' \
      f'lon: ({lon_min:0.1f}, {lon_max:0.1f}) for Lake {lakename}.')

We can check the lake mask and the computed bounding box based on the mask file on a mapview using [**hvplot**](https://hvplot.holoviz.org/) and [**GeoViews**](https://geoviews.org/). Both objects are based on the [**HoloViews**](https://holoviews.org/) library and can be easily combined.

In [None]:
import hvplot.xarray
import geoviews as gv

# Create map-plot of xarray dataarray using hvplot
hv_map = mask_roi.hvplot(geo=True, tiles='CartoLight', colorbar=False, 
                         xlabel='Longitude', ylabel='Latitude')

# Create boundingbox using geoviews
gv_bbox = gv.Rectangles([(lon_min, lat_min, lon_max, lat_max)]).opts(color='none', line_width=2, line_color='red')

# Combine objects as overlay
hv_map * gv_bbox

### 1.7 Load full dataset as xarray.Dataset
Now, we can initialize the full dataset using xarray's [**xarray.open_mfdataset**](https://docs.xarray.dev/en/latest/generated/xarray.open_mfdataset.html) function. xarray will handle the data-decoding of the NetCDF format with the scaling- and offset-attributes found in the loaded files. During the loading process we can monitor the progress and the task stream of our workers in the dask webinterface (output from *1.3 Dask initialization*). Once the dataset is loaded, we'll get a preview.

The xarray documentation has an extensive [user-guide](https://xarray.pydata.org/en/stable/user-guide/io.html) with explanations and best-practices to load large datasets.

In [None]:
# Setup timer to time the loading process
start_time = time.time()

DS = xr.open_mfdataset(paths=paths_data,
                       combine='nested',
                       parallel=True,
                       engine='netcdf4',
                       decode_cf=True,
                       preprocess=preprocess,
                       chunks=chunks,
                       concat_dim="time",
                       data_vars="minimal", 
                       coords="minimal", 
                       compat="override")

print(f'Xarray dataset with variables: {variables} initialized after ' \
      f'{(time.time()-start_time):0.1f} seconds')

DS # Load preview

Once the dataset has been loaded, we can get a overview of the xarray.Dataset object and its loaded variables and attributes.

### 1.8 Apply lake-mask
Next we apply the lake mask to mask cells of other lakes in the same bounding box. To make sure that the lake mask has identical cell coordinates we align it to the coordinates of our dataset.

In [None]:
# Reindex coordinates to make sure that our lake mask cells are aligned to data cells
da_mask = mask_roi.reindex_like(other=DS, method='nearest')

# Get subset with lakemask converted to boolean and xarraay.DataArray.where
DS_masked = DS.where(cond=(da_mask==True))

### 1.9 Export subset
The subset with the masked data can now be exported using [**xarray.Dataset.to_netcdf**](https://xarray.pydata.org/en/stable/generated/xarray.Dataset.to_netcdf.html). We can get the encoding settings (e.g. compression, fillvalue, scale-factor, offset) for the export from the previously loaded dataset.

#### Define outputs path

In [None]:
# Define an output path for the file
path_dst = fr'subsets\ESACCI-LAKES-L3S-LK_PRODUCTS-MERGED-LSWT_LIC-{lakename}-fv1.1.zarr'
# Form pathlib objects
path_wrk = pathlib.Path().absolute()
path_dst = path_wrk.joinpath(path_dst)

# create parent folder if it doesnt exit yet
path_dst.parent.mkdir(parents=True, exist_ok=True)

#### Rechunk
The export process is dependent on the specified chunk sizes. Daily chunks after subsetting are very small and create unneccesary overhead. Therefore we will adjust the chunks sizes to monthly chunks.

In [None]:
# lat_chunk = DS_masked.lat.size
# lon_chunk = DS_masked.lon.size
# DS_masked = DS_masked.chunk({'lat':lat_chunk, 'lon':lon_chunk, 'time':1})

#### Run Export
Now we can export the subset. For this we'll retrieve the encoding information (scale-factor, offset etc.) from the original metadata. Exporting the files to the [zarr](https://zarr.readthedocs.io/en/stable/)-format instead of NetCDF is more efficient in parralel-writing and compression.

In [None]:
import zarr

# Get encoding settings from un-decoded DS_preview
DS_enc = {}
encoding_enc = ['dtype'] # ,'zlib', 'shuffle', 'complevel', 'fletcher32', 'contiguous']
attr_enc = ['_FillValue', 'scale_factor', 'add_offset']
for var in variables:
    DS_enc_encoding = DS_preview.get(var).encoding
    DS_enc_fromenc = {k:v for k, v in DS_enc_encoding.items() \
                      if k in encoding_enc}
    DS_enc_attrs = DS_preview.get(var).attrs
    DS_enc_fromattrs = {k:v for k, v in DS_enc_attrs.items() \
                        if (k in attr_enc and hasattr(DS_preview.get(var), k))}
    DS_enc[var] = {**DS_enc_fromenc, **DS_enc_fromattrs}

# Setup zarr-compressor using Blosc and zstd-compression
compressor = zarr.Blosc(cname="zstd", clevel=3, shuffle=2)

# Set compression settings in encoding
for var in DS_enc.items():
    var[1]['compressor'] = compressor
   
print('DS encoding:')
for var in DS_enc.items(): 
    print(var)

# Set starting time for timer
start_time = time.time() 

# We can also slice the data in time before exporting
#timeslice = slice('2019-01-01', '2020-01-01)                 
#DS_masked_dropna = lswt_celsius.sel(time=timeslice)

print('Exporting started.. ', end='')

# DS_masked.to_netcdf(path=path_dst,
#                            mode='w',
#                            engine='h5netcdf',
#                            encoding=DS_enc
#                           )

# with zarr.ZipStore(path_dst) as store:
#     DS_masked.to_zarr(store=store)#, mode='w', encoding=DS_enc)

DS_masked.to_zarr(store=path_dst,
                  mode='w',
                  encoding=DS_enc)

print(f'Subset of Lake {lakename} ' \
      f'exported after {(time.time()-start_time):0.0f} seconds.')