In [1]:
from dask import delayed
from dask.diagnostics import ProgressBar
from dask.utils import SerializableLock
from datetime import datetime, timedelta
from floater.generators import FloatSet
import bcolz, os, re
import dask.array as dsa
import numpy as np
import pandas as pd
import xarray as xr

In [2]:
fname = '/data/scratch/cz2397/traj_0090-0180.bcolz'

In [3]:
fs = FloatSet(xlim=(0, 360), ylim=(-80, 70), dx=0.03125, dy=0.03125)
Npart = fs.Nx*fs.Ny

In [4]:
def bcolz_to_arrays(bc, time, npart_range, fields, lock=None, dtype='f8'):

    """Convert bcolz data from a certain timestep to a 2d numpy array.
    For some reason, this is fast.
    
    Parameters
    ----------
    bc : bcolz ctable
        where the data lives
    time : float
        the timestep value
    fields : list of strings
        which fields to extract
    fs : FloatSet
        needed to reshape the array properly
    dtype : numpy.dtype
        data type to which to coerce the output data
        
    Returns
    -------
    d
    """
    
    #print("Called bcolz_to_dataframe(%g) on %s" % (time, bc.rootdir))
    npart_min, npart_max = npart_range
    npart = np.arange(npart_min, npart_max)
    query = "(time==%g) & (npart>=%g) & (npart<%g)" % (time, npart_min, npart_max)
    
    # convert to pandas dataframe
    if lock is not None:
        lock.acquire()
    df = pd.DataFrame(bc[query])
    if lock is not None:
        lock.release()
        
    # reindex the dataframe
    df = df.set_index('npart', drop=True, verify_integrity=True)
    df = df.reindex(npart)
    
    data = df[fields].values
    return data

In [5]:
def bcolz_to_dataset(fname, num_floats, fields=['x', 'y', 'vort'],
                     delta_t=86400, date0=datetime(1993,1,1)):
    
    datadir = os.path.dirname(fname)
    basename = os.path.splitext(os.path.basename(fname))[0]

    # figure out time range from basename
    day0, day1 = [int(day) for day in re.search('traj_(\d+)-(\d+)', basename).groups()]
    days = np.arange(day1-day0)
    times = delta_t * days
    refdate = date0 + timedelta(days=day0)
    
    bc = bcolz.open(rootdir=fname, mode='r')

    nt = len(times)
    npart = np.arange(1,num_floats+1)
    # get the full particle range at each timestep
    npart_range = npart[0], npart[-1]+1
    lock = SerializableLock()
    dtype = 'f8'
    data = [dsa.from_delayed(
                delayed(bcolz_to_arrays)(
                    bc, time, npart_range, fields, lock=lock, dtype=dtype),
                (num_floats, len(fields)), dtype)
            for time in times]
    stacked_data = dsa.stack(data)
    
    # create xarray dataset
    data_variables = {field: (('time', 'npart'), stacked_data[...,n])
                      for n, field in enumerate(fields)}
    ds = xr.Dataset(data_variables, {'npart': npart, 'time': days})
    ds.time.attrs['units'] = 'days since %s' % date0.strftime('%Y-%m-%d')
    
    outfile = os.path.join(datadir, basename + '.nc')
    with ProgressBar():
        ds.to_netcdf(outfile, engine='netcdf4')

In [None]:
bcolz_to_dataset(fname=fname, num_floats=Npart)

[                                        ] | 0% Completed |  0.0s