# Convert to tabular data

This will load the trajectory data just as it is generated by [Parcels](https://oceanparcels.org) and transform it into a single large tabular dataset that is stored in a parquet store.

See <https://docs.dask.org/en/latest/dataframe.html> and <http://fastparquet.readthedocs.io/> for details.

_**Note** that you're not required to re-run this. But it might help understanding how to transform from the nd-array view the data originally come with to a tabular format._

## Set parameters

Let's make sure that this is the only place where users have to change contents.

In [1]:
# parameters
platform = "jsc_scratch"
tabular_data_dir = "/p/project/training2005/geomar_challenge/data/med_sea_connectivity_v2020.11.03.1/"

## Create Dask Cluster

We'll create a Dask cluster that has resources wherever the Jupyter Kernel is running and then submit a batch job that adds workers to this local cluster.

In [2]:
import dask
from dask.distributed import Client, wait

# Make sure the Dask dashboard is easy to reach
dask.config.set(
    {
        'distributed.dashboard.link':
        "{JUPYTERHUB_BASE_URL}user/{JUPYTERHUB_USER}/{JUPYTERHUB_SERVER_NAME}/proxy/{port}/status"
    }
)

# start a Dask cluster that spans a whole node
client = Client(n_workers=1, threads_per_worker=4, memory_limit=240e9, interface="ib0")
client

0,1
Client  Scheduler: tcp://10.80.118.146:41575  Dashboard: /user/wrath@geomar.de/jupyterlab_1/proxy/8787/status,Cluster  Workers: 1  Cores: 4  Memory: 240.00 GB


In [3]:
%%file add_workers.sh
#!/bin/bash -x
#SBATCH --account=training2005
#SBATCH --nodes=2
#SBATCH --ntasks=2
#SBATCH --ntasks-per-node=1
#SBATCH --output=mpi-out.%j
#SBATCH --error=mpi-err.%j
#SBATCH --time=00:30:00

module purge
module use "$OTHERSTAGES"
module load Stages/Devel-2019a
module load GCCcore/.8.3.0
module load netcdf4-python/1.5.3-serial-Python-3.6.8
module load TensorFlow/1.13.1-GPU-Python-3.6.8
module load Keras/2.2.4-GPU-Python-3.6.8
module load PyTorch/1.1.0-GPU-Python-3.6.8
module load SciPy-Stack/2019a.1-Python-3.6.8

# Activate your Python virtual environment
source /p/project/training2005/jupyter/kernels/datathon/bin/activate

# Ensure python packages installed in the virtual environment are always prefered
export PYTHONPATH=${VIRTUAL_ENV}/lib/python3.6/site-packages:${PYTHONPATH}

srun -n2 -N2 --cpus-per-task=256 --mem=240G dask-worker --nthreads=4 --memory-limit=240G ${SCHEDULER_ADDRESS} &>> worker.log
wait

Overwriting add_workers.sh


In [4]:
%env SCHEDULER_ADDRESS={client.scheduler.address}
!sbatch add_workers.sh

env: SCHEDULER_ADDRESS=tcp://10.80.118.146:41575
Submitted batch job 21265


In [5]:
!squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
             21265     batch add_work    rath1 PD       0:00     32 (Priority)
             21240     batch UNICORE_    rath1  R      51:29      1 jsfc106


## Make sure the cluster works

In [6]:
from dask import array as darr

In [7]:
%%time

x = darr.random.normal(size=(10_000_000_000, 10), chunks=(1_000_000, 10))
display(x)
print(x.mean().compute())

Unnamed: 0,Array,Chunk
Bytes,800.00 GB,80.00 MB
Shape,"(10000000000, 10)","(1000000, 10)"
Count,10000 Tasks,10000 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 800.00 GB 80.00 MB Shape (10000000000, 10) (1000000, 10) Count 10000 Tasks 10000 Chunks Type float64 numpy.ndarray",10  10000000000,

Unnamed: 0,Array,Chunk
Bytes,800.00 GB,80.00 MB
Shape,"(10000000000, 10)","(1000000, 10)"
Count,10000 Tasks,10000 Chunks
Type,float64,numpy.ndarray


-5.416824771966047e-06
CPU times: user 26.7 s, sys: 1.41 s, total: 28.1 s
Wall time: 1min 7s


## Open the data catalog

In [8]:
import intake  # data catalogs
import xarray as xr  # labeled nd arrays
from dask import array as darr

In [9]:
catalog = intake.open_catalog(f"../intake-catalogs/medseaconnectivity_{platform}.yaml")
print(list(catalog))

['medsea-trajectories-stokes', 'medsea-trajectories']


## Load the dataset

There's two sets of trajectories:
- `'medsea-trajectories-stokes'` contains trajectories at the sea surface that are subject to drift by waves and the Ocean circulation.
- `'medsea-trajectories'` contains trajectories at the sea surface and in deeper layers that are only subject to the Ocean circulation.

We'll open both and add a parameter `"stokes"` that indicates if stokes drift was present during the simulation.

In [10]:
# load smaller stokes ds
dataset_stokes = catalog['medsea-trajectories-stokes'](chunks={"traj": 10_000}).to_dask()
dataset_stokes["stokes"] = xr.full_like(dataset_stokes.lon, True, dtype=bool)

# load larger non-stokes ds
dataset_no_stokes = catalog['medsea-trajectories'](chunks={"traj": 10_000}).to_dask()
dataset_no_stokes["stokes"] = xr.full_like(dataset_no_stokes.lon, False, dtype=bool)

# combine
dataset = xr.concat((dataset_no_stokes, dataset_stokes), dim="traj")
display(dataset)
print(f"amounts to {dataset.nbytes / 1e9} GB")

  chunk_spec = get_chunk(name, var, chunks)
  chunk_spec = get_chunk(name, var, chunks)


Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,121.71 GB,76.96 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 121.71 GB 76.96 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type datetime64[ns] numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,121.71 GB,76.96 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,datetime64[ns],numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,15.21 GB,9.62 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3164 Tasks,1582 Chunks
Type,bool,numpy.ndarray
"Array Chunk Bytes 15.21 GB 9.62 MB Shape (15814080, 962) (10000, 962) Count 3164 Tasks 1582 Chunks Type bool numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,15.21 GB,9.62 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3164 Tasks,1582 Chunks
Type,bool,numpy.ndarray


amounts to 562.88636352 GB


## Additional fields

To go tabular, we'll also need an explicit trajectory ID, `"trajectory_id"`, and an explicit step, `"step"`.

In [11]:
dataset["trajectory_id"] = (
    ("traj", "obs"),
    darr.broadcast_to(
        darr.arange(dataset.dims["traj"], chunks=dataset.lon.chunks[0]).reshape(-1, 1),
        (dataset.dims["traj"], dataset.dims["obs"]),
        chunks=dataset.lon.chunks
    )
)

dataset["step"] = (
    ("traj", "obs"),
    darr.broadcast_to(
        darr.arange(dataset.dims["obs"], chunks=dataset.lon.chunks[1]),
        (dataset.dims["traj"], dataset.dims["obs"]),
        chunks=dataset.lon.chunks
    )
)

display(dataset)
print(f"amounts to {dataset.nbytes / 1e9} GB")

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,121.71 GB,76.96 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 121.71 GB 76.96 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type datetime64[ns] numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,121.71 GB,76.96 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,datetime64[ns],numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 60.85 GB 38.48 MB Shape (15814080, 962) (10000, 962) Count 3166 Tasks 1582 Chunks Type float32 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,60.85 GB,38.48 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3166 Tasks,1582 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,15.21 GB,9.62 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3164 Tasks,1582 Chunks
Type,bool,numpy.ndarray
"Array Chunk Bytes 15.21 GB 9.62 MB Shape (15814080, 962) (10000, 962) Count 3164 Tasks 1582 Chunks Type bool numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,15.21 GB,9.62 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,3164 Tasks,1582 Chunks
Type,bool,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,121.71 GB,76.96 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,4746 Tasks,1582 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 121.71 GB 76.96 MB Shape (15814080, 962) (10000, 962) Count 4746 Tasks 1582 Chunks Type int64 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,121.71 GB,76.96 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,4746 Tasks,1582 Chunks
Type,int64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,121.71 GB,76.96 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,1583 Tasks,1582 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 121.71 GB 76.96 MB Shape (15814080, 962) (10000, 962) Count 1583 Tasks 1582 Chunks Type int64 numpy.ndarray",962  15814080,

Unnamed: 0,Array,Chunk
Bytes,121.71 GB,76.96 MB
Shape,"(15814080, 962)","(10000, 962)"
Count,1583 Tasks,1582 Chunks
Type,int64,numpy.ndarray


amounts to 806.29668288 GB


## Build a dataframe

There is a method to transform the Xarray dataset directly (`.to_dask_dataframe()`). It chokes on stacking the huge nd arrays, though. So let's do this by hand.

In [12]:
from dask import dataframe as ddf

In [13]:
def datavar_to_series(dataset, varname):
    """Unpack and reshape the dask arrays and turn in to Dask series."""
    return ddf.from_array(dataset[varname].data.flatten()).rename(varname)

In [14]:
series = {varname: datavar_to_series(dataset, varname) for varname in dataset.data_vars.keys()}
colnames = list(series.keys())
display(colnames)

['MPA',
 'distance',
 'land',
 'lat',
 'lon',
 'temp',
 'time',
 'z',
 'stokes',
 'trajectory_id',
 'step']

In [15]:
df = series[colnames[0]].to_frame()
for col in colnames[1:]:
    df[col] = series[col]
df

Unnamed: 0_level_0,MPA,distance,land,lat,lon,temp,time,z,stokes,trajectory_id,step
npartitions=1582,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,float32,float32,float32,float32,float32,float32,datetime64[ns],float32,bool,int64,int64
9620000,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
15207873200,...,...,...,...,...,...,...,...,...,...,...
15213144959,...,...,...,...,...,...,...,...,...,...,...


In [20]:
%%time
!mkdir -p {tabular_data_dir}
df.to_parquet(f"{tabular_data_dir}/medsea-trajectories.pq", compression="snappy")

CPU times: user 56.9 s, sys: 3.25 s, total: 1min
Wall time: 3min 54s


In [21]:
!du -sh {tabular_data_dir}

295G	/p/project/training2005/geomar_challenge/data/med_sea_connectivity_v2020.11.03.1/


## Test re-reading the data

In [23]:
dfr = ddf.read_parquet(f"{tabular_data_dir}/medsea-trajectories.pq")
display(dfr)
mean_dfr = dfr.mean()
display(mean_dfr)
%time display(mean_dfr.compute())

Unnamed: 0_level_0,MPA,distance,land,lat,lon,temp,time,z,stokes,trajectory_id,step
npartitions=1582,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,float32,float32,float32,float32,float32,float32,datetime64[ns],float32,bool,int64,int64
9620000,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
15207873200,...,...,...,...,...,...,...,...,...,...,...
15213144959,...,...,...,...,...,...,...,...,...,...,...


  meta = self._meta_nonempty.mean(axis=axis, skipna=skipna)


Dask Series Structure:
npartitions=1
MPA    float64
z          ...
dtype: float64
Dask Name: dataframe-mean, 6331 tasks

MPA              4.141529e+00
distance         1.607461e+02
land             3.027008e-01
lat              3.955079e+01
lon              1.727004e+00
temp             1.417800e+01
z                4.856973e+00
stokes           1.660217e-01
trajectory_id    7.907040e+06
step             4.805000e+02
dtype: float64

CPU times: user 8.37 s, sys: 584 ms, total: 8.95 s
Wall time: 34.3 s


In [24]:
client.close()