In [1]:
#!/usr/bin/env python
# coding: utf-8

import os, sys
import numpy as np

import xarray as xr
import xrft
import pandas as pd

import dask.array as da
from dask.distributed import Client, LocalCluster
from dask.diagnostics import ProgressBar

import itertools
#
# Initialisation d'un cluster de 32 coeurs
cluster = LocalCluster(processes=False, n_workers=1, threads_per_worker=8)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://137.129.155.67:8787/status,

0,1
Dashboard: http://137.129.155.67:8787/status,Workers: 4
Total threads: 12,Total memory: 78.61 GiB
Status: running,Using processes: True

0,1
Comm: tcp://137.129.155.67:8788,Workers: 4
Dashboard: http://137.129.155.67:8787/status,Total threads: 12
Started: Just now,Total memory: 78.61 GiB

0,1
Comm: tcp://137.129.155.67:35761,Total threads: 3
Dashboard: http://137.129.155.67:41375/status,Memory: 19.65 GiB
Nanny: tcp://137.129.155.67:44645,
Local directory: /mnt/lfs/d50/tropics/commun/DATACOMMUN/WAVE/SCRIPTS/TOWEL/TF/dask-worker-space/worker-aamua_6h,Local directory: /mnt/lfs/d50/tropics/commun/DATACOMMUN/WAVE/SCRIPTS/TOWEL/TF/dask-worker-space/worker-aamua_6h

0,1
Comm: tcp://137.129.155.67:46069,Total threads: 3
Dashboard: http://137.129.155.67:42537/status,Memory: 19.65 GiB
Nanny: tcp://137.129.155.67:48331,
Local directory: /mnt/lfs/d50/tropics/commun/DATACOMMUN/WAVE/SCRIPTS/TOWEL/TF/dask-worker-space/worker-opnsog46,Local directory: /mnt/lfs/d50/tropics/commun/DATACOMMUN/WAVE/SCRIPTS/TOWEL/TF/dask-worker-space/worker-opnsog46

0,1
Comm: tcp://137.129.155.67:39315,Total threads: 3
Dashboard: http://137.129.155.67:41765/status,Memory: 19.65 GiB
Nanny: tcp://137.129.155.67:42307,
Local directory: /mnt/lfs/d50/tropics/commun/DATACOMMUN/WAVE/SCRIPTS/TOWEL/TF/dask-worker-space/worker-vb76zrd5,Local directory: /mnt/lfs/d50/tropics/commun/DATACOMMUN/WAVE/SCRIPTS/TOWEL/TF/dask-worker-space/worker-vb76zrd5

0,1
Comm: tcp://137.129.155.67:41347,Total threads: 3
Dashboard: http://137.129.155.67:45773/status,Memory: 19.65 GiB
Nanny: tcp://137.129.155.67:38331,
Local directory: /mnt/lfs/d50/tropics/commun/DATACOMMUN/WAVE/SCRIPTS/TOWEL/TF/dask-worker-space/worker-aiuzihcd,Local directory: /mnt/lfs/d50/tropics/commun/DATACOMMUN/WAVE/SCRIPTS/TOWEL/TF/dask-worker-space/worker-aiuzihcd


In [2]:
indir_data = '/cnrm/tropics/commun/DATACOMMUN/WAVE/NO_SAVE/DATA/RAW_ANOMALY/TCWV/'
outdir_TF = '/cnrm/tropics/commun/DATACOMMUN/WAVE/NO_SAVE/DATA/TF2D/TCWV/'
var = 'tcwv'
prefix = 'TF2D'

########
# to save chunks
# tempdir = '/cnrm/tropics/commun/DATACOMMUN/WAVE/NO_SAVE/DATA/TF2D/TCWV/temp/'
tempdir = os.environ['TMPDIR']
tempdir = tempdir + '/level/'
os.mkdir(tempdir)

addDay = 180

In [3]:
def split_by_chunks(dataset):
    chunk_slices = {}
    for dim, chunks in dataset.chunks.items():
        slices = []
        start = 0
        for chunk in chunks:
            if start >= dataset.sizes[dim]:
                break
            stop = start + chunk
            slices.append(slice(start, stop))
            start = stop
        chunk_slices[dim] = slices
    for slices in itertools.product(*chunk_slices.values()):
        selection = dict(zip(chunk_slices.keys(), slices))
        yield dataset[selection]
        
# print(datasets)

def create_filepath(ds, prefix='filename', root_path="/cnrm/tropics/commun/DATACOMMUN/WAVE/NO_SAVE/DATA/RAW_CLIM/temp/test/"):
    """
    Generate a filepath when given an xarray dataset
    """
    start = ds.latitude.data[0]
    filepath = f'{root_path}/{prefix}_{start}_ERA5.nc'
    return filepath


def createArray(year) :
    _ds_m1 = xr.open_mfdataset(indir_data+'*'+var+'*'+str(y-1)+'*.nc', chunks = { 'latitude' : 1}, parallel=True)
    _ds_m1 = _ds_m1.isel(time = slice(-addDay*int(24/3),None))
    _ds = xr.open_mfdataset(indir_data+'*'+var+'*'+str(y)+'*.nc', chunks = { 'latitude' : 1}, parallel=True)
    _ds1 = xr.open_mfdataset(indir_data+'*'+var+'*'+str(y+1)+'*.nc', chunks = { 'latitude' : 1}, parallel=True)
    _ds1 = _ds1.isel(time = slice(None,addDay*int(24/3)))

    ds = xr.concat([_ds_m1,_ds,_ds1], dim='time', coords='minimal', compat='override')
    
    return ds

def saveChunks(ds, prefix = 'XXX'):
    datasets_l = list(split_by_chunks(ds))
    paths_l = [create_filepath_l(ds, 
                         prefix = prefix,
                         root_path = tempdir) for ds in datasets_l]
    xr.save_mfdataset(datasets=datasets_l, paths=paths_l, engine="h5netcdf", mode = 'w')
    ds = xr.open_mfdataset(tempdir+'*.nc', chunks = 'auto', parallel=True, combine='by_coords', engine="h5netcdf")
    return ds

In [6]:

year = np.arange(1991,1992)
y = year

for y in year :
    ds = createArray(y)
    ds = ds.chunk({"time" : -1, "latitude": 1})

    tcwvhat = xrft.fft(ds['tcwv_ano'],
                    dim=['time','longitude'], true_phase=False, true_amplitude=True)

#     tcwv = xrft.ifft(tcwvhat,
#                     dim=['freq_time','freq_longitude'], true_phase=False, true_amplitude=True)
    
    tcwvhat = tcwvhat.to_dataset(name = 'TF_tcwv')
    tcwvhat['time'] = ds.time
    # '''
    _ds = saveChunks(ds, prefix = prefix)
    # '''
    ##################################
    ##################################
    start = y
    end = start + 1 
    filepath = f'{prefix}_ERA5_3H_{var}_{start}_V2.nc'

    _ds.to_netcdf('/cnrm/tropics/commun/DATACOMMUN/WAVE/NO_SAVE/DATA/TF2D/TCWV/'+filepath, 
                  engine='h5netcdf', invalid_netcdf=True, mode = 'w')

Unnamed: 0,Array,Chunk
Bytes,37.44 GiB,53.17 MiB
Shape,"(4840, 721, 1440)","(4840, 1, 1440)"
Count,6492 Tasks,721 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 37.44 GiB 53.17 MiB Shape (4840, 721, 1440) (4840, 1, 1440) Count 6492 Tasks 721 Chunks Type float64 numpy.ndarray",1440  721  4840,

Unnamed: 0,Array,Chunk
Bytes,37.44 GiB,53.17 MiB
Shape,"(4840, 721, 1440)","(4840, 1, 1440)"
Count,6492 Tasks,721 Chunks
Type,float64,numpy.ndarray
