In [2]:
from typing import Iterable, List, Optional
#
import enum
import functools
import operator
import shutil
import os
import tempfile
#
import dask_jobqueue
import dask.distributed
import numpy as np
import pandas as pd
import xarray as xr

In [None]:
cluster = dask_jobqueue.PBSCluster(cores=1,
                                   memory='4GB',
                                   interface='ib0',
                                   local_directory="$TMPDIR",
                                   walltime='12:00:00')
cluster.scale(10)
cluster

In [None]:
client = dask.distributed.Client(cluster)
client

In [3]:
# Root directory containing the datasets to be processed.
ROOT = "/home/ad/briolf/odatis/briolf/bigdata4science"

# File describing the queries to be executed .
REQUEST = "/home/ad/briolf/notebooks/bigdata4bigscience/requests.csv"

# File containing the results of the benchmarks
RESULT = "/home/ad/briolf/notebooks/bigdata4bigscience/results.csv"

# Temporary directory
TMPDIR = "/work/ALT/odatis/briolf/tmp"

# Temporary Zarr store
TMPZARR = os.path.join(TMPDIR, "zarr")

In [4]:
class Writers(enum.Enum):
    """List of known writers"""
    NETCDF =1
    ZARR = 2

In [5]:
def get_dataset_path(name: str) -> str:
    """Gets the dataset path"""
    return os.path.join(ROOT, name)

In [6]:
def load_request(path: str) -> pd.DataFrame:
    """Loading the file describing the queries"""
    result = pd.read_csv(path,
                         sep=";",
                         dtype={
                             "Dataset": "str",
                             "variables": "str",
                             "minLongitude": "float64",
                             "maxLongitude": "float64",
                             "minLatitude": "float64",
                             "maxLatitude": "float64",
                             "minTime(epoch ms)": "str",
                             "maxTime(epoch(ms)": "str",
                             "minDate": "str",
                             "maxDate": "str",
                             "depth": "float64",
                             "Cores": "int",
                             "mem": "str"
                         })
    result.drop("minTime(epoch ms)", axis=1, inplace=True)
    result.drop("maxTime(epoch(ms)", axis=1, inplace=True)
    result = result.assign(minDate=result["minDate"].astype("datetime64"),
                           maxDate=result["maxDate"].astype("datetime64"),
                           average=float("nan"),
                           stdev=float("nan"),
                           best=float("nan"),
                           worst=float("nan"),
                           loops=float("nan"),
                           repeat=float("nan"),
                           netcdf=float("nan"),
                           zarr=float("nan"),
                           nbytes=0)
    return result

In [7]:
def load_result(path: str) -> pd.DataFrame:
    """Loading the file containing the intermediate results."""
    result = pd.read_csv(path,
                         sep=";",
                         dtype={
                             "Dataset": "str",
                             "variables": "str",
                             "minLongitude": "float64",
                             "maxLongitude": "float64",
                             "minLatitude": "float64",
                             "maxLatitude": "float64",
                             "minDate": "str",
                             "maxDate": "str",
                             "depth": "float64",
                             "Cores": "int",
                             "mem": "str",
                             "average": "float64",
                             "stdev": "float64",
                             "best": "float64",
                             "worst": "float64",
                             "loops": "float64",
                             "repeat": "float64",
                             "netcdf": "float64",
                             "zarr": "float64",
                             "nbytes": "uint64",
                         })
    result = result.assign(minDate=result["minDate"].astype("datetime64"),
                            maxDate=result["maxDate"].astype("datetime64"))
    return result

In [None]:
def varname_from_standard_name(ds: xr.Dataset,
                               standard_names: Iterable[str]) -> List[str]:
    """Get variable names from standard names."""
    result = []
    for name, data_array in ds.data_vars.items():
        if data_array.attrs["standard_name"] in standard_names:
            result.append(name)
    return result

In [None]:
def write_netcdf(selected: xr.Dataset):
    """Writing a NetCDF file"""
    chunksizes = {}
    for item in selected.data_vars:
        chunks = selected[item].data.rechunk(block_size_limit="512KB").chunks
        chunksizes[item] = tuple(item[0] for item in chunks)

    tmp = tempfile.NamedTemporaryFile(dir=TMPDIR).name
    try:
        encoding = dict((name, {
            'zlib': True,
            'complevel': 4,
            'chunksizes': chunksizes[name]
        }) for name in selected.data_vars)
        selected.to_netcdf(tmp, mode="w", encoding=encoding)
    finally:
        os.unlink(tmp)

In [None]:
class TooBig(RuntimeError):
    pass

In [None]:
def run_benchmark(row: pd.DataFrame,
                  ds: xr.Dataset,
                  nbytes: np.ndarray,
                  writer: Optional[Writers] = None,
                  depth: str = "depth",
                  lng: str = "longitude",
                  lat: str = "latitude",
                  time: str = "time",
                  block_size_limit: str = "256MB") -> None:
    """Execute one benchmark"""
    x0, x1 = row["minLongitude"], row["maxLongitude"]
    y0, y1 = row["minLatitude"], row["maxLatitude"]
    t0, t1 = row["minDate"].to_datetime64(), row["maxDate"].to_datetime64()
    z0 = row["depth"]

    variables = varname_from_standard_name(ds, row["variables"])

    # Building the query
    isel = {
        lng: (ds[lng] >= x0) & (ds[lng] <= x1),
        lat: (ds[lat] >= y0) & (ds[lat] <= y1),
        time: (ds[time] >= t0) & (ds[time] <= t1)
    }
    if not np.isnan(z0):
        isel[depth] = ds[depth] == z0

    # Creation of the calculation graph performing the query
    selected = ds.isel(isel)
    selected = selected.drop_vars(set(ds.data_vars) - set(variables))
    if not selected or not functools.reduce(operator.mul,
                                            selected.dims.values()):
        raise RuntimeError(f"invalid query: {row}")

    # stores the size of the selected data
    nbytes[0] = selected.nbytes
    
    # Reorganization of the selected zarr chunks
    for item in selected.data_vars:
        da = selected[item].data.rechunk(block_size_limit=block_size_limit)
        chunk = dict(zip(selected[item].dims, da.chunks))
        selected[item] = selected[item].chunk(chunk)
        del selected[item].encoding['chunks']

    if writer is Writers.NETCDF:
        if selected.nbytes > 50 * 1000**3:
            raise TooBig
        # Write a temporary file in netCDF format.
        write_netcdf(selected)
    elif writer is Writers.ZARR:
        # Write a temporary file in Zarr format.
        selected.to_zarr(TMPZARR, mode="w")
    else:
        # Measurement of data reading time
        _ = selected.mean().compute()

In [None]:
def benchmark(
        request: pd.DataFrame,
        selected: pd.DataFrame,
        dataset: str,
        depth: str = "depth",
        lng: str = "longitude",
        lat: str = "latitude",
        time: str = "time",
        block_size_limit: str = "256MB",
        write: bool = True
) -> None:
    """Runs benchmarks on a given dataset"""
    ds = xr.open_zarr(get_dataset_path(dataset), mask_and_scale=False)
    nbytes = np.array([0], dtype="uint64")
    
    for index, row in selected.iterrows():
        if write and np.isnan(request.loc[index, 'netcdf']):
            try:
                timer = %timeit -r 1 -n 1 -o run_benchmark(row, ds, nbytes, Writers.NETCDF, depth, lng, lat, time, block_size_limit)
                request.loc[index, 'netcdf'] = timer.average
            except TooBig:
                pass
        if write and np.isnan(request.loc[index, 'zarr']):
            try:
                timer = %timeit -r 1 -n 1 -o run_benchmark(row, ds, nbytes, Writers.ZARR, depth, lng, lat, time, block_size_limit)
                request.loc[index, 'zarr'] = timer.average
            finally:
                shutil.rmtree(TMPZARR, ignore_errors=True)
        if np.isnan(request.loc[index, 'average']):
            timer = %timeit -r 1 -n 1 -o run_benchmark(row, ds, nbytes, None, depth, lng, lat, time, block_size_limit)
            request.loc[index, [
                'average', 'best', 'stdev', 'worst', 'loops', 'repeat',
                "nbytes"
            ]] = (timer.average, timer.best, timer.stdev, timer.worst,
                  timer.loops, timer.repeat, nbytes[0])

In [None]:
def run(dataset: str,
        cores: int,
        request: pd.DataFrame,
        depth: str = "depth",
        lng: str = "longitude",
        lat: str = "latitude",
        time: str = "time",
        block_size_limit: str = "256MB",
        write: bool = True) -> None:
    """Run benchmarks for a given dataset"""
    selected = request[request["Dataset"] == dataset]
    benchmark(request,
              selected[selected["Cores"] == cores],
              dataset,
              depth=depth,
              lng=lng,
              lat=lat,
              time=time,
              block_size_limit=block_size_limit,
              write=write)
    request.to_csv(RESULT, sep=";", date_format="%Y-%m-%d %H:%M:%S.000")

In [None]:
request = load_result(RESULT) if os.path.exists(RESULT) else load_request(REQUEST)

In [None]:
run("global-analysis-forecast-phy-001-024", 10, request)

In [None]:
cluster.scale(20)

In [None]:
run("global-analysis-forecast-phy-001-024", 20, request)

In [None]:
cluster.scale(30)

In [None]:
run("global-analysis-forecast-phy-001-024", 30, request)

In [None]:
cluster.scale(40)

In [None]:
run("global-analysis-forecast-phy-001-024", 40, request)

In [None]:
cluster.scale(50)

In [None]:
run("global-analysis-forecast-phy-001-024", 50, request)

In [None]:
cluster.scale(10)

In [None]:
run("global-analysis-forecast-phy-001-024-hourly-t-u-v-ssh", 10, request)

In [None]:
run("LWQ100m", 10, request, lng='lon', lat='lat', block_size_limit="32MB")

In [None]:
cluster.scale(50)

In [None]:
run("LWQ100m", 50, request, lng='lon', lat='lat', block_size_limit="100MB", write=False)

In [None]:
client.close()
cluster.close()

In [None]:
cluster = dask_jobqueue.PBSCluster(cores=1,
                                   memory='16GB',
                                   interface='ib0',
                                   local_directory="$TMPDIR",
                                   walltime='12:00:00')
cluster.scale(100)
cluster

In [None]:
client = dask.distributed.Client(cluster)
client

In [None]:
run("LWQ100m", 100, request, lng='lon', lat='lat', block_size_limit="100MB", write=False)

In [None]:
request.to_csv(RESULT, sep=";", date_format="%Y-%m-%d %H:%M:%S.000", index=False)

In [None]:
client.close()
cluster.close()