In [45]:
from glob import glob
import re
import shutil

import numpy as np
import pandas as pd
import dask.dataframe as dpd
import dask
import zarr
import xarray as xr
from tqdm import tqdm

## Examine the files

In [108]:
ddir = '/rigel/ocp/users/csj2114/swot/agulhas/run_1week_fwd'
fnames = sorted(glob(f'{ddir}/*.csv'))
fnames[:4]

['/rigel/ocp/users/csj2114/swot/agulhas/run_1week_fwd/float_trajectories.0000000000.001.001.csv',
 '/rigel/ocp/users/csj2114/swot/agulhas/run_1week_fwd/float_trajectories.0000000000.001.002.csv',
 '/rigel/ocp/users/csj2114/swot/agulhas/run_1week_fwd/float_trajectories.0000000000.001.003.csv',
 '/rigel/ocp/users/csj2114/swot/agulhas/run_1week_fwd/float_trajectories.0000000000.001.004.csv']

In [109]:
pattern = '.*\.(\d{10})\.(\d{3})\.(\d{3})\.csv'
r = re.compile(pattern)
file_data = np.array(
    [(int(m.group(1)), int(m.group(2)), int(m.group(3)))
     for m in map(r.match, fnames)]
)
niters, ntile_x, ntile_y = file_data.transpose()
niter_unique = np.unique(niters)
len(niter_unique)

169

In [110]:
file_df = pd.DataFrame({'niter': niters, 'ntile_x': ntile_x, 'ntile_y': ntile_y, 'fnames': fnames})
file_df

Unnamed: 0,niter,ntile_x,ntile_y,fnames
0,0,1,1,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...
1,0,1,2,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...
2,0,1,3,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...
3,0,1,4,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...
4,0,1,5,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...
...,...,...,...,...
24331,24192,12,8,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...
24332,24192,12,9,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...
24333,24192,12,10,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...
24334,24192,12,11,/rigel/ocp/users/csj2114/swot/agulhas/run_1wee...


In [113]:
file_df.groupby('niter')['fnames'].count()

niter
0        144
144      144
288      144
432      144
576      144
        ... 
23616    144
23760    144
23904    144
24048    144
24192    144
Name: fnames, Length: 169, dtype: int64

## Load one timtestep to Xarray Dataset

We experiment with two different approaches.

In [99]:
def timestep_to_ds_lazy(ddir, niter, npartitions=1):
    """
    Read CSV files for one timestep and turn into an xarray dataset.
    This function is *lazy* using dask throughout.
    """
    fnames = sorted(glob(f'{ddir}/float_trajectories.{niter:010d}.*.csv'))
    df = dpd.read_csv(fnames)
    
    # don't need time, since all the files have the same time
    df = df.drop('time', axis=1)

    # this is like rechunking
    # it will consolidate all the rows into one in-memory block
    df = df.repartition(npartitions=npartitions)
    
    # the more partitions, the slow this goes
    df = df.set_index('npart')
    
    # convert to xarray dataset
    dim = df.index.name
    
    # takes time, needs to read data
    index_data = df.index.values.compute()
    lengths = len(index_data)

    coords = {dim: ([dim], index_data)}
    data_vars = {v: ([dim], df[v].to_dask_array(lengths=(lengths,)))
                 for v in df.columns}
    ds = xr.Dataset(data_vars, coords)
    
    # now add time as a dimension
    ds = ds.expand_dims('niter', axis=0)
    ds.coords['niter'] = ('niter', [niter])

    return ds

In [100]:
def timestep_to_ds_eager(ddir, niter):
    """
    Read CSV files for one timestep and turn into an xarray dataset.
    This function is *eager*, loading the data into memory at the beginning.
    """
    fnames = sorted(glob(f'{ddir}/float_trajectories.{niter:010d}.*.csv'))
    df = dpd.read_csv(fnames)
    
    # load it all into memory
    df = df.compute()
    df = df.drop('time', axis=1)
    
    # much faster than dask for small dataframes
    df = df.set_index('npart')
    
    ds = df.to_xarray()
    ds = ds.expand_dims('niter', axis=0)
    ds.coords['niter'] = ('niter', [niter])

    return ds
    

In [121]:
%time dsl = timestep_to_ds_lazy(ddir, niter_unique[0])
dsl

CPU times: user 22.9 s, sys: 5.18 s, total: 28.1 s
Wall time: 9.35 s


In [122]:
%time dse = timestep_to_ds_eager(ddir, niter_unique[0])
dse

CPU times: user 10.9 s, sys: 1.73 s, total: 12.6 s
Wall time: 3.9 s


**Result**: The eager version was 2-3x faster.
I think this must be because there is so little data and the filesystem is slow.

### Questions about the data

Why is `max(npart)` not equal to 2332800?

In [116]:
dse.npart.max()

Why is 2332800 not equal to `Nx * Ny`?

In [105]:
nx = 180 * 12
ny = 180 * 12
nx * ny

4665600

## Test Writing

Which datasets writes to zarr faster?

In [120]:
target = '/rigel/ocp/projects/swot/agulhas/run_1week_fwd.zarr'

In [123]:
%time dsl.to_zarr(target, mode='w')

CPU times: user 11.8 s, sys: 2.64 s, total: 14.5 s
Wall time: 4.97 s


<xarray.backends.zarr.ZarrStore at 0x2aacd08db530>

In [None]:
shutil.rmtree(target)

In [125]:
%time dse.chunk().to_zarr(target, mode='w')

CPU times: user 500 ms, sys: 84.9 ms, total: 585 ms
Wall time: 375 ms


<xarray.backends.zarr.ZarrStore at 0x2aacd081d410>

**Result:**: Writing the eagerly loaded data to zarr is obviously much faster because it is already in memory! 

That wasn't really fair. What if we do 10 timesteps in serial?

In [128]:
shutil.rmtree(target)

In [130]:
%%time 
for n in niter_unique[:10]:
    ds = timestep_to_ds_eager(ddir, n)
    ds.chunk().to_zarr(target, append_dim='niter')

CPU times: user 1min 50s, sys: 17.4 s, total: 2min 8s
Wall time: 52.5 s


In [135]:
shutil.rmtree(target)

Compare this to preparing a lazy version of the array in advance.

In [133]:
%%time
dsets = [timestep_to_ds_lazy(ddir, n) for n in niter_unique[:10]]
ds_all = xr.concat(dsets, dim='niter')
ds_all

CPU times: user 3min 52s, sys: 41.6 s, total: 4min 34s
Wall time: 1min 37s


In [136]:
%%time
ds_all.to_zarr(target)

CPU times: user 2min 2s, sys: 22.6 s, total: 2min 24s
Wall time: 45.8 s


<xarray.backends.zarr.ZarrStore at 0x2aacd1550470>

So the eager, serial approach took 52s, while the lazy approach took nearly over 2 minutes.

**Conclusion:** on habanero, there is no benefit to trying to paralleize this with dask.
I believe this is mostly a consequence of using CSV, which doesn't allow us to easily extact individual columns one at a time.
We end up reading the files over and over. This is inefficient.