In [1]:
#from glob import glob
from pathlib import Path
import os, time

import xarray as xr

import itidenatl.utils as ut

In [2]:
from dask.distributed import Client, LocalCluster, wait
cluster = LocalCluster(n_workers=8) #n_workers=24, threads_per_worker=1, memory_limit=8e6,silence_logs=50
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:46241  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 56  Memory: 236.33 GiB


In [3]:
### define paths
scratch = Path(os.getenv("SCRATCHDIR"))
workdir = Path("/work/CT1/ige2071/nlahaye")
worksha = (workdir/"../SHARED").resolve()

data_path = Path("/work/CT1/hmg2840/lbrodeau/eNATL60")
data_subs = "eNATL60-BLBT02*-S"

### construct list of raw files, sorted by date (day)
subs = "eNATL60-BLBT02?-S/????????-????????/eNATL60-BLBT02?_1h_*_gridS_*.nc"
list_files = list(data_path.glob(subs))

dico_files = {k.name.rstrip(".nc")[-8:]:k for k in list_files} # dico day:path
dates = [k for k in dico_files.keys()] # list of dates (day)
dates.sort()

### utilitary function to get file corresponding to one time index and one variable
map_varname = {v:k for k,v in ut.vmapping.items()}
if map_varname["sossheig"]=="gridT2D":
    map_varname["sossheig"] = "gridT-2D"
    
def get_path(var,it):
    path = dico_files[dates[it]]
    return path.parent/path.name.replace("gridS", map_varname[var])

In [8]:
chunks = {'x': -1, 'time_counter': 1}
i_day = 10
iy, iz = slice(200,400), slice(0,None)

### first test: single chunk in z, keep the other ones


In [13]:
chunks.update({"y":10, "deptht":-1})
ds = xr.open_dataset(get_path("votemper", i_day), chunks=chunks)
ds = ds.drop_dims("axis_nbounds").drop([c for c in ds.coords])
ds

Unnamed: 0,Array,Chunk
Bytes,1.03 TiB,95.60 MiB
Shape,"(24, 300, 4729, 8354)","(1, 300, 10, 8354)"
Count,11353 Tasks,11352 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.03 TiB 95.60 MiB Shape (24, 300, 4729, 8354) (1, 300, 10, 8354) Count 11353 Tasks 11352 Chunks Type float32 numpy.ndarray",24  1  8354  4729  300,

Unnamed: 0,Array,Chunk
Bytes,1.03 TiB,95.60 MiB
Shape,"(24, 300, 4729, 8354)","(1, 300, 10, 8354)"
Count,11353 Tasks,11352 Chunks
Type,float32,numpy.ndarray


In [14]:
%%time
it = 0
# in this computation, open_dataset_getitem (20 instances for chunk y size 20) takes 10 to 30s
# limiting operation seems to be the open_dataset_getitem that takes the longer time
ds.votemper.isel(time_counter=it, y=iy, deptht=iz).sum().values

CPU times: user 2.88 s, sys: 512 ms, total: 3.4 s
Wall time: 36 s


array(1.3780913e+09, dtype=float32)

### second test: leave chunking unchanged

In [15]:
chunks.update({"y":10, "deptht":1})
ds = xr.open_dataset(get_path("votemper", i_day), chunks=chunks)
ds = ds.drop_dims("axis_nbounds").drop([c for c in ds.coords])
ds

Unnamed: 0,Array,Chunk
Bytes,1.03 TiB,326.33 kiB
Shape,"(24, 300, 4729, 8354)","(1, 1, 10, 8354)"
Count,3405601 Tasks,3405600 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.03 TiB 326.33 kiB Shape (24, 300, 4729, 8354) (1, 1, 10, 8354) Count 3405601 Tasks 3405600 Chunks Type float32 numpy.ndarray",24  1  8354  4729  300,

Unnamed: 0,Array,Chunk
Bytes,1.03 TiB,326.33 kiB
Shape,"(24, 300, 4729, 8354)","(1, 1, 10, 8354)"
Count,3405601 Tasks,3405600 Chunks
Type,float32,numpy.ndarray


In [17]:
%%time
it = 1
## with this chunking, there are 6000 open_dataset_getitem 
# that run during the whole computation, each being around 100 ms
ds.votemper.isel(time_counter=it, y=iy, deptht=iz).sum().values

CPU times: user 14.3 s, sys: 793 ms, total: 15.1 s
Wall time: 16.4 s


array(1.3778604e+09, dtype=float32)

### third test: take larger chunks in z

In [18]:
chunks.update({"y":200, "deptht":1})
ds = xr.open_dataset(get_path("votemper", i_day), chunks=chunks)
ds = ds.drop_dims("axis_nbounds").drop([c for c in ds.coords])
ds

Unnamed: 0,Array,Chunk
Bytes,1.03 TiB,6.37 MiB
Shape,"(24, 300, 4729, 8354)","(1, 1, 200, 8354)"
Count,172801 Tasks,172800 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.03 TiB 6.37 MiB Shape (24, 300, 4729, 8354) (1, 1, 200, 8354) Count 172801 Tasks 172800 Chunks Type float32 numpy.ndarray",24  1  8354  4729  300,

Unnamed: 0,Array,Chunk
Bytes,1.03 TiB,6.37 MiB
Shape,"(24, 300, 4729, 8354)","(1, 1, 200, 8354)"
Count,172801 Tasks,172800 Chunks
Type,float32,numpy.ndarray


In [20]:
%%time
it = 3
## with this chunking, there are 300 open_dataset_getitem 
# that run during the whole computation, each being sligthly below 1s
ds.votemper.isel(time_counter=it, y=iy, deptht=iz).sum().values

CPU times: user 1.77 s, sys: 165 ms, total: 1.93 s
Wall time: 5.21 s


array(1.377712e+09, dtype=float32)

## second phase: take larger chunk in y and see how it scales

In [21]:
chunks = {'x': -1, 'time_counter': 1}
i_day = 11
iy, iz = slice(400,800), slice(0,None)

In [22]:
%%time
it = 0
### first test:
chunks.update({"y":10, "deptht":-1})
ds = xr.open_dataset(get_path("votemper", i_day), chunks=chunks)
ds = ds.drop_dims("axis_nbounds").drop([c for c in ds.coords])
ds.votemper.isel(time_counter=it, y=iy, deptht=iz).sum().values

CPU times: user 3.74 s, sys: 652 ms, total: 4.39 s
Wall time: 44.5 s


array(3.4235474e+09, dtype=float32)

In [23]:
%%time
it = 1
### second test: scaling is bad, probably because of too small chunks
chunks.update({"y":10, "deptht":1})
ds = xr.open_dataset(get_path("votemper", i_day), chunks=chunks)
ds = ds.drop_dims("axis_nbounds").drop([c for c in ds.coords])
ds.votemper.isel(time_counter=it, y=iy, deptht=iz).sum().values

CPU times: user 38.7 s, sys: 2.03 s, total: 40.7 s
Wall time: 42.3 s


array(3.423357e+09, dtype=float32)

In [24]:
%%time
it = 2
### third test: scaling is bad, probably because of too small chunks
chunks.update({"y":400, "deptht":1})
ds = xr.open_dataset(get_path("votemper", i_day), chunks=chunks)
ds = ds.drop_dims("axis_nbounds").drop([c for c in ds.coords])
ds.votemper.isel(time_counter=it, y=iy, deptht=iz).sum().values

CPU times: user 3.07 s, sys: 398 ms, total: 3.47 s
Wall time: 14.5 s


array(3.4232067e+09, dtype=float32)

In [25]:
%%time
it = 3
### intermediate case: keep chunks in y of size 200 (best performances)
chunks.update({"y":200, "deptht":1})
ds = xr.open_dataset(get_path("votemper", i_day), chunks=chunks)
ds = ds.drop_dims("axis_nbounds").drop([c for c in ds.coords])
ds.votemper.isel(time_counter=it, y=iy, deptht=iz).sum().values

CPU times: user 3.36 s, sys: 300 ms, total: 3.66 s
Wall time: 11.2 s


array(3.423091e+09, dtype=float32)