# Testing Memory Usage of Dask

## ESGF Data

* http://esgf3.dkrz.de/thredds/fileServer/cmip6/CMIP/MPI-M/MPI-ESM1-2-HR/historical/r1i1p1f1/day/tas/gn/v20190710/tas_day_MPI-ESM1-2-HR_historical_r1i1p1f1_gn_19000101-19041231.nc

## xarray

* http://xarray.pydata.org/en/stable/user-guide/dask.html
* http://stephanhoyer.com/2015/06/11/xray-dask-out-of-core-labeled-arrays/

## Dask

* https://blog.dask.org/2021/03/11/dask_memory_usage
* https://docs.dask.org/en/latest/array-chunks.html
* https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead
* https://coiled.io/tackling-unmanaged-memory-with-dask/
* https://docs.dask.org/en/latest/diagnostics-distributed.html

## Bugs

* https://github.com/pydata/xarray/issues/3781
* https://github.com/pydata/xarray/issues/3401

## Memory profiler

* https://pypi.org/project/dask_memusage/
* https://pypi.org/project/filprofiler/
    * https://pythonspeed.com/products/filmemoryprofiler/#profiling-in-jupyter
* https://pypi.org/project/memory-profiler/

In [None]:
import os 
import humanize
import math
import xarray as xr
import dask

from bokeh.plotting import show
from bokeh.io import output_notebook

output_notebook()

In [None]:
# https://www.geeksforgeeks.org/how-to-get-current-cpu-and-ram-usage-in-python/
import psutil

print(f"CPUs: {os.cpu_count()}")
print(f"Total Memory: {humanize.naturalsize(psutil.virtual_memory()[0])}")
print(f"Available Memory: {humanize.naturalsize(psutil.virtual_memory()[1])}")
print(f"Used Memory: {humanize.naturalsize(psutil.virtual_memory()[3])}")


In [None]:
# https://docs.dask.org/en/latest/array-best-practices.html#avoid-oversubscribing-threads
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'

In [None]:
# http://xarray.pydata.org/en/stable/user-guide/dask.html#
ds = xr.open_dataset(
    # "data/tas_day_MPI-ESM1-2-HR_historical_r1i1p1f1_gn_19000101-19041231.nc",
    "data/ta_day_MPI-ESM1-2-HR_historical_r1i1p1f1_gn_19050101-19091231.nc",
)
ds

In [None]:
da = ds.ta.sel(time='1905').squeeze()


In [None]:
# https://github.com/roocs/clisops/blob/1280f278b5017d1e1d8c938b95b1dc25508398b1/clisops/utils/output_utils.py#L167

def get_chunk_length(da, mem_limit):
    """
    Calculate the chunk length to use when chunking xarray datasets.
    Based on memory limit provided in config and the size of th dataset.
    """
    size = da.nbytes
    n_times = len(da.time.values)
    # mem_limit = parse_size(chunk_memory_limit)

    if size > 0:
        n_chunks = math.ceil(size / mem_limit)
    else:
        n_chunks = 1
        
    print(f"num chunks: {n_chunks}")

    chunk_length = math.ceil(n_times / n_chunks)

    return chunk_length

def chunk(da, mem_limit):
    # https://docs.dask.org/en/latest/array-best-practices.html#select-a-good-chunk-size
    # dask.config.set({"array.chunk-size": chunk_memory_limit})
    chunk_length = get_chunk_length(da, mem_limit)
    print(f"chunk length: {chunk_length}")
    da = da.chunk({
        # "time": "auto"
        "time": chunk_length
    })
    da = da.unify_chunks()
    print(f"chunks: {da.chunks}")
    return da

def chunk_size(max_workers=None, max_memory=None):
    max_workers = max_workers or os.cpu_count()
    print(f"max workers: {max_workers}")
    max_memory = max_memory or psutil.virtual_memory()[0]
    print(f"max memory: {humanize.naturalsize(max_memory, binary=True)}")
    max_chunks = 2
    base_mem = 400 * 1024**2
    max_threads = max_workers * max_chunks
    chunk_size_ = int((max_memory - max_workers * base_mem) / max_threads)
    print(f"chunk size: {humanize.naturalsize(chunk_size_, binary=True)}")
    max_memory_per_worker = chunk_size_ * max_chunks + base_mem
    print(f"max memory per worker: {humanize.naturalsize(max_memory_per_worker, binary=True)}")
    return chunk_size_

In [None]:
# https://docs.dask.org/en/latest/caching.html?highlight=cache
#from dask.cache import Cache
#cache = Cache(2e9)  # Leverage two gigabytes of memory
#cache.register()  

In [None]:
# https://docs.dask.org/en/latest/diagnostics-local.html
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler, CacheProfiler
from cachey import nbytes
from dask.diagnostics import visualize

# https://docs.dask.org/en/latest/setup/single-machine.html
# https://docs.dask.org/en/latest/scheduling.html
dask.config.set({
    "scheduler": "synchronous", 
    # "scheduler": "threads", 
    # "scheduler": "processes", 
    # "array.chunk-size": chunk_size(),
})
da = chunk(da, chunk_size(max_workers=2, max_memory=1000*1024**2))
with ProgressBar(), Profiler() as prof, ResourceProfiler() as rprof, CacheProfiler(metric=nbytes) as cprof:
    da.to_netcdf("out.nc", compute=True)


In [None]:
show(visualize([prof, rprof, cprof]))