# Memory Management

Useful links on dask/zarr/lazy loading:
* [pangeo discussion](https://discourse.pangeo.io/t/processing-large-too-large-for-memory-xarray-datasets-and-writing-to-netcdf/1724/2)

## Set Up

In [1]:
import numba
import numpy as np
import pandas as pd
import holoviews as hv
import xarray as xr
import line_profiler

import clearwater_riverine as cwr

In [2]:
root = './data/sumwere_test_cases/plan28_testTSM'
ras_filepath = f'{root}/clearWaterTestCases.p28.hdf'
initial_condition_path = f'{root}/cwr_initial_conditions_waterTemp_p28.csv'
boundary_condition_path = f'{root}/cwr_boundary_conditions_waterTemp_p28.csv'

In [3]:
%%time
transport_model = cwr.ClearwaterRiverine(ras_filepath, 0.001, verbose=True)

Populating Model Mesh...
Calculating Required Parameters...
CPU times: total: 31.1 s
Wall time: 2min 23s


## Look at the size of the empty array

In [11]:
transport_model.mesh.nbytes * 1e-9

6.6169479440000005

In [14]:
transport_model.mesh.dims

Frozen({'node': 549, 'time': 259201, 'nface': 444, 'nmax_face': 8, 'nedge': 915, '2': 2})

In [30]:
size_dict = {}
for var_name, var_data in transport_model.mesh.variables.items():
    size_dict[var_name] = var_data.nbytes * 1e-9

In [31]:
sorted_dict = dict(sorted(size_dict.items(), key=lambda item: item[1]))

print(sorted_dict)

{'mesh2d': 4e-09, 'faces_surface_area': 1.776e-06, 'face_x': 3.552e-06, 'face_y': 3.552e-06, 'edges_face1': 3.66e-06, 'edges_face2': 3.66e-06, 'edge_length': 3.66e-06, 'node_x': 4.3920000000000005e-06, 'node_y': 4.3920000000000005e-06, 'edge_nodes': 7.32e-06, 'edge_face_connectivity': 7.32e-06, 'face_to_face_dist': 7.32e-06, 'face_nodes': 1.4208e-05, 'time': 0.0020736080000000002, 'dt': 0.0020736080000000002, 'water_surface_elev': 0.460340976, 'volume': 0.460340976, 'edge_velocity': 0.94867566, 'face_flow': 0.94867566, 'advection_coeff': 0.94867566, 'edge_vertical_area': 0.94867566, 'coeff_to_diffusion': 1.89735132}


edge_velocity, face_flow, advection_coeff, edge_vertical_area, and coeff_to_diffusion are all large. These have the largest dimensions of (time, nedge)

In [32]:
dims_dict = {}
for var_name, var_data in transport_model.mesh.variables.items():
    dims_dict[var_name] = var_data.dims

In [35]:
for key in sorted_dict.keys():
    print(key, dims_dict[key])

mesh2d ()
faces_surface_area ('nface',)
face_x ('nface',)
face_y ('nface',)
edges_face1 ('nedge',)
edges_face2 ('nedge',)
edge_length ('nedge',)
node_x ('node',)
node_y ('node',)
edge_nodes ('nedge', '2')
edge_face_connectivity ('nedge', '2')
face_to_face_dist ('nedge',)
face_nodes ('nface', 'nmax_face')
time ('time',)
dt ('time',)
water_surface_elev ('time', 'nface')
volume ('time', 'nface')
edge_velocity ('time', 'nedge')
face_flow ('time', 'nedge')
advection_coeff ('time', 'nedge')
edge_vertical_area ('time', 'nedge')
coeff_to_diffusion ('time', 'nedge')


It's interesting to note that the coeff_to_diffusion variable is twice as large as the other variables of the same dimensions. Why could this be?

In [36]:
dtype_dict = {}
for var_name, var_data in transport_model.mesh.variables.items():
    dtype_dict[var_name] = var_data.dtype

In [40]:
for key in sorted_dict.keys():
    print(key,  dtype_dict[key], dims_dict[key])

mesh2d int32 ()
faces_surface_area float32 ('nface',)
face_x float64 ('nface',)
face_y float64 ('nface',)
edges_face1 int32 ('nedge',)
edges_face2 int32 ('nedge',)
edge_length float32 ('nedge',)
node_x float64 ('node',)
node_y float64 ('node',)
edge_nodes int32 ('nedge', '2')
edge_face_connectivity int32 ('nedge', '2')
face_to_face_dist float64 ('nedge',)
face_nodes int32 ('nface', 'nmax_face')
time datetime64[ns] ('time',)
dt float64 ('time',)
water_surface_elev float32 ('time', 'nface')
volume float32 ('time', 'nface')
edge_velocity float32 ('time', 'nedge')
face_flow float32 ('time', 'nedge')
advection_coeff float32 ('time', 'nedge')
edge_vertical_area float32 ('time', 'nedge')
coeff_to_diffusion float64 ('time', 'nedge')


Coeff to diffusion is float64, not float32, which the other (time, nedge) variables are. This is why it's so much larger. 

We may want to investigate if we want to transform this to float64 (how much does it impact mass balance?. But we'll explore lazy loading / writing to disk first. 

## Save to Zarr 

In [17]:
for key in ['face_area_elevation_info',
    'face_area_elevation_values',
    'face_normalunitvector_and_length',
    'face_cell_indexes_df',
    'face_volume_elevation_info',
    'face_volume_elevation_values',
    'boundary_data']:
    del transport_model.mesh.attrs[key]

In [47]:
ds = transport_model.mesh.to_zarr(
    'temp/output_test_compute.zarr',
    compute=False
)

In [53]:
ds.coeff_to_diffusion.load()

Delayed('load-2512dafa-45ee-46db-b71e-cfcf96a44347')

In [76]:
transport_model.mesh.encoding = {'chunks': {'time':1000}}

In [85]:
for var_name, var_data in transport_model.mesh.variables.items(): 
    if 'time' in var
    transport_model.mesh[var_name].encoding = {'chunks': ('time', 1000)}

In [100]:
for var_name, var_data in transport_model.mesh.variables.items(): 
    if var_data.dims == ('time', 'nedge'):
        print(var_name)
        transport_model.mesh[var_name].encoding = {'chunks': {'time': 1000}}
    else:
        transport_model.mesh[var_name].encoding = {}

edge_velocity
face_flow
advection_coeff
edge_vertical_area
coeff_to_diffusion


In [95]:
ds = transport_model.mesh.to_zarr(
    'temp/output_chunks_vars3.zarr',
    compute=False
)

In [108]:
ds = transport_model.mesh.to_zarr(
    'temp/output_chunks_vars5.zarr',
    compute=True
)

In [113]:
# Assuming 'transport_model.mesh' is your xarray dataset
for var_name, var_data in transport_model.mesh.variables.items():
    if set(var_data.dims) == {'time', 'nedge'}:
        print(var_name)
        transport_model.mesh[var_name] = transport_model.mesh[var_name].chunk({'time': 1000})


edge_velocity
face_flow
advection_coeff
edge_vertical_area
coeff_to_diffusion


In [114]:
ds = transport_model.mesh.to_zarr(
    'temp/output_chunks_vars6.zarr',
    # compute=False
)

## Read Zarr

In [60]:
zarr_test = xr.open_zarr('temp/output_test_compute.zarr')

In [115]:
zarr_test_chunks_1 = xr.open_zarr('temp/output_chunks_vars6.zarr')

In [116]:
zarr_test_chunks_1.coeff_to_diffusion

Unnamed: 0,Array,Chunk
Bytes,1.77 GiB,6.98 MiB
Shape,"(259201, 915)","(1000, 915)"
Dask graph,260 chunks in 2 graph layers,260 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.77 GiB 6.98 MiB Shape (259201, 915) (1000, 915) Dask graph 260 chunks in 2 graph layers Data type float64 numpy.ndarray",915  259201,

Unnamed: 0,Array,Chunk
Bytes,1.77 GiB,6.98 MiB
Shape,"(259201, 915)","(1000, 915)"
Dask graph,260 chunks in 2 graph layers,260 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


## Time test comparison

In [118]:
# %%time
# for i in range(200):
#     a = ds.coeff_to_diffusion.isel(time=0, nedge=200)
# print(a)

Below, we compare the time to select the values for one time step from `zarr_test`, first while lazy loaded, and then actually computing the values. It is much faster to use lazy loading (192 ms vs 14 seconds). This appears to be particularly slow because of the default chunking scheme (don't actually want to chunk over space).

In [123]:
%%time
for i in range(200):
    test = zarr_test.coeff_to_diffusion.isel(time=0)
print(test)

<xarray.DataArray 'coeff_to_diffusion' (nedge: 915)>
dask.array<getitem, shape=(915,), dtype=float64, chunksize=(58,), chunktype=numpy.ndarray>
Coordinates:
    time     datetime64[ns] 2022-05-13
Dimensions without coordinates: nedge
CPU times: total: 78.1 ms
Wall time: 192 ms


In [120]:
%%time
for i in range(200):
    test = zarr_test.coeff_to_diffusion.isel(time=0).values
print(test)

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.

Loading the values directly from the xarray stored in memory is much faster (932 ms)

In [121]:
%%time
for i in range(200):
    t = transport_model.mesh.coeff_to_diffusion.isel(time=0).values
print(t)

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.

In this iteration, I chunked the data along the time dimension only, with a chunk every 1000 timesteps. Just over a second.

In [122]:
%%time
for i in range(200):
    t = zarr_test_chunks_1.isel(time=0)
print(t)

<xarray.Dataset>
Dimensions:                 (nedge: 915, 2: 2, nface: 444, nmax_face: 8,
                             node: 549)
Coordinates:
    face_x                  (nface) float64 dask.array<chunksize=(444,), meta=np.ndarray>
    face_y                  (nface) float64 dask.array<chunksize=(444,), meta=np.ndarray>
    node_x                  (node) float64 dask.array<chunksize=(549,), meta=np.ndarray>
    node_y                  (node) float64 dask.array<chunksize=(549,), meta=np.ndarray>
    time                    datetime64[ns] 2022-05-13
Dimensions without coordinates: nedge, 2, nface, nmax_face, node
Data variables: (12/17)
    advection_coeff         (nedge) float32 dask.array<chunksize=(915,), meta=np.ndarray>
    coeff_to_diffusion      (nedge) float64 dask.array<chunksize=(915,), meta=np.ndarray>
    dt                      float64 dask.array<chunksize=(), meta=np.ndarray>
    edge_face_connectivity  (nedge, 2) int32 dask.array<chunksize=(915, 2), meta=np.ndarray>
    e

Essentially the same speed to grab 1,000 chunks:

In [125]:
%%time
for i in range(200):
    t = zarr_test_chunks_1.isel(time=slice(0,999))
print(t)

<xarray.Dataset>
Dimensions:                 (time: 999, nedge: 915, 2: 2, nface: 444,
                             nmax_face: 8, node: 549)
Coordinates:
    face_x                  (nface) float64 dask.array<chunksize=(444,), meta=np.ndarray>
    face_y                  (nface) float64 dask.array<chunksize=(444,), meta=np.ndarray>
    node_x                  (node) float64 dask.array<chunksize=(549,), meta=np.ndarray>
    node_y                  (node) float64 dask.array<chunksize=(549,), meta=np.ndarray>
  * time                    (time) datetime64[ns] 2022-05-13 ... 2022-05-13T0...
Dimensions without coordinates: nedge, 2, nface, nmax_face, node
Data variables: (12/17)
    advection_coeff         (time, nedge) float32 dask.array<chunksize=(999, 915), meta=np.ndarray>
    coeff_to_diffusion      (time, nedge) float64 dask.array<chunksize=(999, 915), meta=np.ndarray>
    dt                      (time) float64 dask.array<chunksize=(999,), meta=np.ndarray>
    edge_face_connectivity  (