# Working with data that doesn't fit in memory

In this notebook we will explore the necessary steps to do an arbitrary computation based on data from a dfs file, without reading the entire file in memory first.

In [80]:

import numpy as np
import dask.array as da
import dask

import mikeio
from mikecore.DfsFileFactory import DfsFileFactory

# 1. Create a function to read a subset of the file
def my_dfs_reader(filename, item, t):
    dfs = DfsFileFactory.DfsGenericOpenEdit(str(filename))
    data = dfs.ReadItemTimeStep(itemNumber = item+1, timestepIndex=t)
    return data.Data

# 2. Turn it into a delayed computation , which is is a pure function and can be cached
reader = dask.delayed(my_dfs_reader,pure=True)

# 3. Get the shape of the data
item = 0
testfile = "../tests/testdata/wind_north_sea.dfsu"
filename = testfile
dfs = mikeio.open(filename)
shape = (dfs.n_elements,)
n_timesteps = dfs.n_timesteps

In [81]:
#4. Create a list of delayed functions
lazy_arrays = [reader(testfile,item,t) for t in range(n_timesteps)]
lazy_arrays[0]

Delayed('my_dfs_reader-6a51646b980d7aaa808a7d22413adb6c')

In [82]:
# 5. Create a list of dask arrays
arrays = [da.from_delayed(a, dtype=np.float32, shape=shape) for a in lazy_arrays]

In [83]:
# 6. Stack the arrays into a single array-like object
stack = da.stack(arrays, axis=0)
stack

Unnamed: 0,Array,Chunk
Bytes,22.45 kiB,3.74 kiB
Shape,"(6, 958)","(1, 958)"
Count,13 Graph Layers,6 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.45 kiB 3.74 kiB Shape (6, 958) (1, 958) Count 13 Graph Layers 6 Chunks Type float32 numpy.ndarray",958  6,

Unnamed: 0,Array,Chunk
Bytes,22.45 kiB,3.74 kiB
Shape,"(6, 958)","(1, 958)"
Count,13 Graph Layers,6 Chunks
Type,float32,numpy.ndarray


In [84]:
stack.dtype, stack.ndim, stack.shape

(dtype('float32'), 2, (6, 958))

Calling e.g. `.mean()` on the `stack` only creates a computational graph, it doesn't exectute it.

In [85]:
stack.mean()

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,(),()
Count,17 Graph Layers,1 Chunks
Type,float32,numpy.ndarray
Array Chunk Bytes 4 B 4 B Shape () () Count 17 Graph Layers 1 Chunks Type float32 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,(),()
Count,17 Graph Layers,1 Chunks
Type,float32,numpy.ndarray


In [86]:
stack.mean(axis=1).compute()

array([10.234554 , 10.264292 , 10.531686 , 10.794677 , 10.858319 ,
       10.9060335], dtype=float32)

As an example of a computation we choose the trimmed mean in each timestep, i.e. ignoring a fraction from the tails (e.g. caused by to numerical instability)
<https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.trim_mean.html>

(this still reads the entire dataset, since dask doesn't know how to calculate a trimmed mean?🤔)

In [87]:
from scipy import stats

tm = stats.trim_mean(stack, 0.1, axis=1)
tm

array([10.331429, 10.351207, 10.670218, 10.930687, 11.019432, 11.058955],
      dtype=float32)