In [21]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client
import dask.dataframe as dd
import dask


In [2]:
cluster = PBSCluster(
    cores=1,
    memory="60GB",
    project='pangeo',
    processes=1,
    walltime='04:00:00',
    local_directory='$TMPDIR')
cluster

In [3]:
cluster.scale(40)

In [4]:
client = Client(cluster) # scheduler_file="/home/ad/briolf/scheduler.json")
client

0,1
Client  Scheduler: tcp://10.120.43.24:43948  Dashboard: http://10.120.43.24:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [15]:
import zarr
import netCDF4
import pytide
import xarray as xr
import os
import numpy as np

In [7]:
%%time
root = "/work/ALT/odatis/eNATL60/BLBT02/gridT-2D/"
files = [
    os.path.join(root, item) for item in os.listdir(root) if item.endswith(".nc")]

drop_vars = [
    'nav_lat',
    'nav_lon',
    'somxl010',
    'sosaline',
    'sosstsst']

# these are variables I want to drop while running `open_mfdataset` but then add back later
extra_coord_vars = ['time_counter', 'y', 'x']
extra_coord_vars = []

chunks = dict(time_counter=1)

open_kwargs = dict(drop_variables=(drop_vars + extra_coord_vars),
                   chunks=chunks, decode_cf=True, concat_dim="time_counter") #, combine='nested')
ds = xr.open_mfdataset(files, combine='nested',parallel=True, **open_kwargs)


CPU times: user 4.14 s, sys: 193 ms, total: 4.33 s
Wall time: 7.63 s


In [8]:
time=ds.time_counter
ssh=ds.sossheig

In [9]:
t=time.values

In [10]:
wt = pytide.WaveTable()
wt = pytide.WaveTable(["M2", "S2", "N2", "O1", "K1"])

In [11]:
%%time
f, vu = wt.compute_nodal_modulations(t)

CPU times: user 17.4 ms, sys: 1.24 ms, total: 18.6 ms
Wall time: 15.4 ms


In [12]:
ssh

Unnamed: 0,Array,Chunk
Bytes,1.85 TB,158.02 MB
Shape,"(11688, 4729, 8354)","(1, 4729, 8354)"
Count,23863 Tasks,11688 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.85 TB 158.02 MB Shape (11688, 4729, 8354) (1, 4729, 8354) Count 23863 Tasks 11688 Chunks Type float32 numpy.ndarray",8354  4729  11688,

Unnamed: 0,Array,Chunk
Bytes,1.85 TB,158.02 MB
Shape,"(11688, 4729, 8354)","(1, 4729, 8354)"
Count,23863 Tasks,11688 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,93.50 kB,8 B
Shape,"(11688,)","(1,)"
Count,23863 Tasks,11688 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 93.50 kB 8 B Shape (11688,) (1,) Count 23863 Tasks 11688 Chunks Type datetime64[ns] numpy.ndarray",11688  1,

Unnamed: 0,Array,Chunk
Bytes,93.50 kB,8 B
Shape,"(11688,)","(1,)"
Count,23863 Tasks,11688 Chunks
Type,datetime64[ns],numpy.ndarray


In [13]:
def dask_array_rechunk(da, axis=0):
    """Search for the optimal block cutting without modifying the axis 'axis'
    in order to optimize its access in memory."""
    nblocks = 1
    
    def calculate_chuncks_size(chunks, size):
        result = np.array(chunks).prod() * size
        return result / (1000**2)
       
    while True:
        chunks = []
        div = int(np.sqrt(nblocks))
        for index, item in enumerate(da.chunks):
            chunks.append(np.array(item).sum() * (div if index == axis else 1))
        chunks = tuple(item // div for index, item in enumerate(chunks))
        chuncks_size = calculate_chuncks_size(chunks, da.dtype.itemsize)
        if chuncks_size > 100 and chuncks_size < 150:
            return chunks
        nblocks += 1

In [16]:
dask_array_rechunk(ssh)

(11688, 42, 75)

In [19]:
ssh_rechunk = ssh.chunk(dask_array_rechunk(ssh))

In [20]:
ssh_rechunk

Unnamed: 0,Array,Chunk
Bytes,1.85 TB,147.27 MB
Shape,"(11688, 4729, 8354)","(11688, 42, 75)"
Count,811171 Tasks,12656 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.85 TB 147.27 MB Shape (11688, 4729, 8354) (11688, 42, 75) Count 811171 Tasks 12656 Chunks Type float32 numpy.ndarray",8354  4729  11688,

Unnamed: 0,Array,Chunk
Bytes,1.85 TB,147.27 MB
Shape,"(11688, 4729, 8354)","(11688, 42, 75)"
Count,811171 Tasks,12656 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,93.50 kB,93.50 kB
Shape,"(11688,)","(11688,)"
Count,23864 Tasks,1 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 93.50 kB 93.50 kB Shape (11688,) (11688,) Count 23864 Tasks 1 Chunks Type datetime64[ns] numpy.ndarray",11688  1,

Unnamed: 0,Array,Chunk
Bytes,93.50 kB,93.50 kB
Shape,"(11688,)","(11688,)"
Count,23864 Tasks,1 Chunks
Type,datetime64[ns],numpy.ndarray


In [22]:
def _apply_along_axis(arr, func1d, func1d_axis, func1d_args, func1d_kwargs):
    """Wrap apply_along_axis"""
    return np.apply_along_axis(func1d, func1d_axis, arr, *func1d_args,
                                  **func1d_kwargs)


def apply_along_axis(func1d, axis, arr, *args, **kwargs):
    """Apply the harmonic analysis to 1-D slices along the given axis."""
    arr = dask.array.core.asarray(arr)

    # Validate and normalize axis.
    arr.shape[axis]
    axis = len(arr.shape[:axis])

    # Rechunk so that analyze is applied over the full axis.
    arr = arr.rechunk(arr.chunks[:axis] + (arr.shape[axis:axis + 1], ) +
                      arr.chunks[axis + 1:])

    # Test out some data with the function.
    test_data = np.ones(args[0].shape[1], dtype=arr.dtype)
    test_result = np.array(func1d(test_data, *args, **kwargs))

    # Map analyze over the data to get the result
    # Adds other axes as needed.
    result = arr.map_blocks(
        _apply_along_axis,
        name=dask.utils.funcname(func1d) + '-along-axis',
        dtype=test_result.dtype,
        chunks=(arr.chunks[:axis] + test_result.shape + arr.chunks[axis + 1:]),
        drop_axis=axis,
        new_axis=list(range(axis, axis + test_result.ndim, 1)),
        func1d=func1d,
        func1d_axis=axis,
        func1d_args=args,
        func1d_kwargs=kwargs,
    )

    return result

In [23]:
future = apply_along_axis(pytide.WaveTable.harmonic_analysis, 0, ssh_rechunk,
                          *(f, vu))

In [None]:
analysis = future.compute()

In [None]:
%%time
hp = wt.tide_from_tide_series(t,w)