In [None]:
import xarray as xr
import dask.dataframe as dd
from distributed import Client
import pandas as pd
import dask
import hvplot.xarray
import s3fs
import zarr
import os
from time import gmtime, strftime
import shutil
import glob
import holoviews as hv


# Dask Client

In [None]:
client = Client(silence_logs=40)
client

# Configs

In [None]:
# Source of data
bucket = 's3://prsim/hydrogrammes-stochastiques/09995/' 

# Wasabi cloud storage configurations
client_kwargs={'endpoint_url': 'https://s3.us-east-1.wasabisys.com'}
config_kwargs = {'max_pool_connections': 30}
storage_options = {'anon': True,
                   "client_kwargs": client_kwargs}

# List all files in bucket
s3 = s3fs.S3FileSystem(client_kwargs=client_kwargs, 
                       config_kwargs=config_kwargs, 
                       anon=True)  # public read
file_list = ['s3://{}'.format(each) for each in s3.ls(bucket)  if each.endswith('.csv')]

# Data sink (store). Alternativaly, the sink could be directly in the cloud.
store= os.environ['HOME'] + '/Documents/store'

# Zarr storage

Conversion des csv vers des datasets Xarray via le lazy loading (dask.delayed). Mise en commun (xr.combine_nested) et dépot dans un dossier Zarr

In [None]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
        
@dask.delayed
def load(filename):
    return dd.read_csv(filename,
                       storage_options=storage_options).compute()

@dask.delayed
def process(df, sim_member):
    data = xr.Dataset.from_dataframe(df)
    data['sim_member'] = sim_member
    return data.expand_dims('sim_member').set_coords('sim_member')

@dask.delayed
def combine(results):
    return xr.combine_nested(results, 'sim_member')

@dask.delayed
def save(ds, store):
    return ds.to_zarr(store, consolidated=True)

# main function to load, process, combine and save multiple zarr datasets lazily
def f(filenames, index):
    results = []
    for sim_member, filename in enumerate(filenames):
        data = load(filename)
        data = process(data, int(index*10000) + sim_member)
        results.append(data)  
    ds = combine(results)
    ds = save(ds, store + '/part/' + str(index))
    ds = dask.compute(ds, retries=10)

# calling the main function with a generator
for index, files in enumerate(chunks(file_list,10000)):
    print(strftime("%Y-%m-%d %H:%M:%S", gmtime()) + ' : ' + str(index))
    f(files, index)

# Merging to a final dataset
ds_out = dask.delayed(xr.combine_nested)\
    ([dask.delayed(xr.open_zarr)(store_num) 
      for store_num in sorted(glob.glob(store + '/part/*'))],
     'sim_member')\
        .compute()

# intermediate step to allow rechunking
for name in list(ds_out.keys()):
    del ds_out[name].encoding['chunks']
    
# Save to final zarr store
ds_out.chunk({'index':365,'sim_member':1000})\
    .to_zarr(store + '/final', consolidated=True)

# Clean up partial files
[shutil.rmtree(path) for path in sorted(glob.glob(store + '/part/*'))]
shutil.rmtree(store + '/part/')



# Validation

In [None]:
# local
ds = xr.open_zarr(store + '/final')

## cloud
# store_wasabi = s3fs.S3Map(root='s3://prsim/hydrogrammes-stochastiques/zarr/09995',
#                           s3=s3,
#                           check=False)
#ds = xr.open_zarr(store_wasabi)

ds

In [None]:
%%time
variables = ["Dozois","Lac Victoria et lac Granet",
             "Rapide-7","Rapide-2","Riviere Kinojevis","Lac des Quinze"]

da = ds[variables].to_array().sum('variable').load()

da.max('sim_member').hvplot(ylabel='Débit', grid=True)*\
da.mean('sim_member').hvplot()*\
da.min('sim_member').hvplot()

In [None]:
%%time

dict_bassins = {'Angliers': ["Dozois","Lac Victoria et lac Granet",
                             "Rapide-7","Rapide-2","Riviere Kinojevis","Lac des Quinze"],
                'Rapide-2': ["Dozois","Lac Victoria et lac Granet",
                             "Rapide-7","Rapide-2",],
                'Rapide-7': ["Dozois","Lac Victoria et lac Granet",
                             "Rapide-7"],
                'Lac Victoria et lac Granet': ["Dozois","Lac Victoria et lac Granet"],
                'Dozois': ["Dozois"]}
graphs=[]

for key, value in dict_bassins.items():
    print(key)
    da = ds[value].to_array()\
                 .sum('variable')
    sim_member = da.where(da==da.max(), drop=True).sim_member
    graphs.append(da.sel(sim_member=sim_member)[0,:]\
                    .hvplot(x='index', grid=True, ylabel='Débit', 
                            xlabel='sim member : ' + str(sim_member.values[0]),
                            title=key, legend=False).opts(active_tools=['wheel_zoom','pan']))
    graphs.append(ds[value].sel(sim_member=sim_member)\
    .drop('sim_member')\
    .hvplot(x='index', grid=True, ylabel='Débit', 
            xlabel='sim member : ' + str(sim_member.values[0]),
            title=key, legend=True).opts(active_tools=['wheel_zoom','pan']))
hv.Layout(graphs).opts(title=title).cols(2)

title='Hydrogrammes cumulatifs maximal des séries stochastiques à chaque site'
graph_layout = hv.Layout(graphs).opts(title=title).cols(2)
hv.save(graph_layout, 'graph_layout.html', backend='bokeh')

graph_layout