From 27dc971a9c6e27ed31ed5c7c629e5d27df649f1c Mon Sep 17 00:00:00 2001
From: Philipp Rudiger
Date: Mon, 5 Feb 2018 14:33:59 +0000
Subject: [PATCH] Added support for dask arrays in GridInterface
---
holoviews/core/data/grid.py | 26 ++++--
tests/core/data/testdataset.py | 161 +++++++++++++++++++++++++++++++++
2 files changed, 181 insertions(+), 6 deletions(-)
diff --git a/holoviews/core/data/grid.py b/holoviews/core/data/grid.py
index de5a1a1eff..03262c965c 100644
--- a/holoviews/core/data/grid.py
+++ b/holoviews/core/data/grid.py
@@ -5,7 +5,16 @@
except ImportError:
pass
+
import numpy as np
+array_types = (np.ndarray,)
+
+try:
+ import dask.array as da
+ array_types += (da.Array,)
+except ImportError:
+ da = None
+
from .dictionary import DictInterface
from .interface import Interface, DataError
@@ -78,7 +87,7 @@ def init(cls, eltype, data, kdims, vdims):
name = dim.name if isinstance(dim, Dimension) else dim
if name not in data:
raise ValueError("Values for dimension %s not found" % dim)
- if not isinstance(data[name], np.ndarray):
+ if not isinstance(data[name], array_types):
data[name] = np.array(data[name])
kdim_names = [d.name if isinstance(d, Dimension) else d for d in kdims]
@@ -226,18 +235,18 @@ def canonicalize(cls, dataset, data, data_coords=None, virtual_coords=[]):
invert = True
else:
slices.append(slice(None))
- data = data[slices] if invert else data
+ data = data[tuple(slices)] if invert else data
# Transpose data
dims = [name for name in data_coords
- if isinstance(cls.coords(dataset, name), np.ndarray)]
+ if isinstance(cls.coords(dataset, name), array_types)]
dropped = [dims.index(d) for d in dims
if d not in dataset.kdims+virtual_coords]
if dropped:
data = data.squeeze(axis=tuple(dropped))
if not any(cls.irregular(dataset, d) for d in dataset.kdims):
- inds = [dims.index(kd.name)for kd in dataset.kdims]
+ inds = [dims.index(kd.name) for kd in dataset.kdims]
inds = [i - sum([1 for d in dropped if i>=d]) for i in inds]
if inds:
data = data.transpose(inds[::-1])
@@ -301,6 +310,8 @@ def values(cls, dataset, dim, expanded=True, flat=True):
if dim in dataset.vdims or dataset.data[dim.name].ndim > 1:
data = dataset.data[dim.name]
data = cls.canonicalize(dataset, data)
+ if da and isinstance(data, da.Array):
+ data = data.compute()
return data.T.flatten() if flat else data
elif expanded:
data = cls.coords(dataset, dim.name, expanded=True)
@@ -364,7 +375,7 @@ def groupby(cls, dataset, dim_names, container_type, group_type, **kwargs):
def key_select_mask(cls, dataset, values, ind):
if isinstance(ind, tuple):
ind = slice(*ind)
- if isinstance(ind, np.ndarray):
+ if isinstance(ind, array_types):
mask = ind
elif isinstance(ind, slice):
mask = True
@@ -491,7 +502,10 @@ def sample(cls, dataset, samples=[]):
data[d].append(arr)
for vdim, array in zip(dataset.vdims, arrays):
flat_index = np.ravel_multi_index(tuple(int_inds)[::-1], array.shape)
- data[vdim.name].append(array.flat[flat_index])
+ if da and isinstance(array, da.Array):
+ data[vdim.name].append(array.flatten()[tuple(flat_index)])
+ else:
+ data[vdim.name].append(array.flat[flat_index])
concatenated = {d: np.concatenate(arrays).flatten() for d, arrays in data.items()}
return concatenated
diff --git a/tests/core/data/testdataset.py b/tests/core/data/testdataset.py
index 1d35fa4626..294a816188 100644
--- a/tests/core/data/testdataset.py
+++ b/tests/core/data/testdataset.py
@@ -7,6 +7,11 @@
from itertools import product
import numpy as np
+try:
+ import dask.array as da
+except ImportError:
+ da = None
+
from holoviews import Dataset, HoloMap, Dimension, Image
from holoviews.element import Distribution, Points, Scatter
from holoviews.element.comparison import ComparisonTestCase
@@ -1465,6 +1470,162 @@ def test_dataset_groupby_drop_dims_dynamic_with_vdim(self):
self.assertEqual(partial[19]['Val'], array[:, -1, :].T.flatten())
+class DaskGridDatasetTest(GridDatasetTest):
+
+ def setUp(self):
+ if da is None:
+ raise SkipTest('Requires dask')
+ self.restore_datatype = Dataset.datatype
+ Dataset.datatype = ['grid']
+ self.eltype = Dataset
+ self.data_instance_type = dict
+ self.init_grid_data()
+ self.init_column_data()
+
+ def init_column_data(self):
+ self.xs = np.arange(11)
+ self.xs_2 = self.xs**2
+ self.y_ints = da.from_array(self.xs*2, 3)
+ self.dataset_hm = Dataset((self.xs, self.y_ints),
+ kdims=['x'], vdims=['y'])
+ self.dataset_hm_alias = Dataset((self.xs, self.y_ints),
+ kdims=[('x', 'X')], vdims=[('y', 'Y')])
+
+ def init_grid_data(self):
+ import dask.array as da
+ self.grid_xs = np.array([0, 1])
+ self.grid_ys = np.array([0.1, 0.2, 0.3])
+ self.grid_zs = da.from_array(np.array([[0, 1], [2, 3], [4, 5]]), 3)
+ self.dataset_grid = self.eltype((self.grid_xs, self.grid_ys,
+ self.grid_zs), kdims=['x', 'y'],
+ vdims=['z'])
+ self.dataset_grid_alias = self.eltype((self.grid_xs, self.grid_ys,
+ self.grid_zs), kdims=[('x', 'X'), ('y', 'Y')],
+ vdims=[('z', 'Z')])
+ self.dataset_grid_inv = self.eltype((self.grid_xs[::-1], self.grid_ys[::-1],
+ self.grid_zs), kdims=['x', 'y'],
+ vdims=['z'])
+
+ def test_dataset_add_dimensions_values_hm(self):
+ arr = da.from_array(np.arange(1, 12), 3)
+ table = self.dataset_hm.add_dimension('z', 1, arr, vdim=True)
+ self.assertEqual(table.vdims[1], 'z')
+ self.compare_arrays(table.dimension_values('z'), np.arange(1,12))
+
+ def test_dataset_add_dimensions_values_hm_alias(self):
+ arr = da.from_array(np.arange(1, 12), 3)
+ table = self.dataset_hm.add_dimension(('z', 'Z'), 1, arr, vdim=True)
+ self.assertEqual(table.vdims[1], 'Z')
+ self.compare_arrays(table.dimension_values('Z'), np.arange(1,12))
+
+ def test_dataset_2D_columnar_shape(self):
+ array = da.from_array(np.random.rand(11, 11), 3)
+ dataset = Dataset({'x':self.xs, 'y':self.y_ints, 'z': array},
+ kdims=['x', 'y'], vdims=['z'])
+ self.assertEqual(dataset.shape, (11*11, 3))
+
+ def test_dataset_2D_gridded_shape(self):
+ array = da.from_array(np.random.rand(12, 11), 3)
+ dataset = Dataset({'x':self.xs, 'y': range(12), 'z': array},
+ kdims=['x', 'y'], vdims=['z'])
+ self.assertEqual(dataset.interface.shape(dataset, gridded=True),
+ (12, 11))
+
+ def test_dataset_2D_aggregate_partial_hm(self):
+ array = da.from_array(np.random.rand(11, 11), 3)
+ dataset = Dataset({'x':self.xs, 'y':self.y_ints, 'z': array},
+ kdims=['x', 'y'], vdims=['z'])
+ self.assertEqual(dataset.aggregate(['x'], np.mean),
+ Dataset({'x':self.xs, 'z': np.mean(array, axis=0).compute()},
+ kdims=['x'], vdims=['z']))
+
+ def test_dataset_2D_aggregate_partial_hm_alias(self):
+ array = da.from_array(np.random.rand(11, 11), 3)
+ dataset = Dataset({'x':self.xs, 'y':self.y_ints, 'z': array},
+ kdims=[('x', 'X'), ('y', 'Y')], vdims=[('z', 'Z')])
+ self.assertEqual(dataset.aggregate(['X'], np.mean),
+ Dataset({'x':self.xs, 'z': np.mean(array, axis=0).compute()},
+ kdims=[('x', 'X')], vdims=[('z', 'Z')]))
+
+ def test_dataset_2D_reduce_hm(self):
+ array = da.from_array(np.random.rand(11, 11), 3)
+ dataset = Dataset({'x':self.xs, 'y':self.y_ints, 'z': array},
+ kdims=['x', 'y'], vdims=['z'])
+ self.assertEqual(np.array(dataset.reduce(['x', 'y'], np.mean)),
+ np.mean(array))
+
+ def test_dataset_2D_reduce_hm_alias(self):
+ array = np.random.rand(11, 11)
+ dataset = Dataset({'x':self.xs, 'y':self.y_ints, 'z': array},
+ kdims=[('x', 'X'), ('y', 'Y')], vdims=[('z', 'Z')])
+ self.assertEqual(np.array(dataset.reduce(['x', 'y'], np.mean)),
+ np.mean(array))
+ self.assertEqual(np.array(dataset.reduce(['X', 'Y'], np.mean)),
+ np.mean(array))
+
+ def test_dataset_groupby_dynamic(self):
+ array = da.from_array(np.random.rand(11, 11), 3)
+ dataset = Dataset({'x':self.xs, 'y':self.y_ints, 'z': array},
+ kdims=['x', 'y'], vdims=['z'])
+ with DatatypeContext([self.datatype, 'dictionary' , 'dataframe'], dataset):
+ grouped = dataset.groupby('x', dynamic=True)
+ first = Dataset({'y': self.y_ints, 'z': array[:, 0]},
+ kdims=['y'], vdims=['z'])
+ self.assertEqual(grouped[0], first)
+
+ def test_dataset_groupby_dynamic_alias(self):
+ array = da.from_array(np.random.rand(11, 11), 3)
+ dataset = Dataset({'x':self.xs, 'y':self.y_ints, 'z': array},
+ kdims=[('x', 'X'), ('y', 'Y')], vdims=[('z', 'Z')])
+ with DatatypeContext([self.datatype, 'dictionary' , 'dataframe'], dataset):
+ grouped = dataset.groupby('X', dynamic=True)
+ first = Dataset({'y': self.y_ints, 'z': array[:, 0].compute()},
+ kdims=[('y', 'Y')], vdims=[('z', 'Z')])
+ self.assertEqual(grouped[0], first)
+
+ def test_dataset_groupby_multiple_dims(self):
+ dataset = Dataset((range(8), range(8), range(8), range(8),
+ da.from_array(np.random.rand(8, 8, 8, 8), 4)),
+ kdims=['a', 'b', 'c', 'd'], vdims=['Value'])
+ grouped = dataset.groupby(['c', 'd'])
+ keys = list(product(range(8), range(8)))
+ self.assertEqual(list(grouped.keys()), keys)
+ for c, d in keys:
+ self.assertEqual(grouped[c, d], dataset.select(c=c, d=d).reindex(['a', 'b']))
+
+ def test_dataset_groupby_drop_dims(self):
+ array = da.from_array(np.random.rand(3, 20, 10), 3)
+ ds = Dataset({'x': range(10), 'y': range(20), 'z': range(3), 'Val': array},
+ kdims=['x', 'y', 'z'], vdims=['Val'])
+ with DatatypeContext([self.datatype, 'dictionary' , 'dataframe'], (ds, Dataset)):
+ partial = ds.to(Dataset, kdims=['x'], vdims=['Val'], groupby='y')
+ self.assertEqual(partial.last['Val'], array[:, -1, :].T.flatten().compute())
+
+ def test_dataset_groupby_drop_dims_dynamic(self):
+ array = da.from_array(np.random.rand(3, 20, 10), 3)
+ ds = Dataset({'x': range(10), 'y': range(20), 'z': range(3), 'Val': array},
+ kdims=['x', 'y', 'z'], vdims=['Val'])
+ with DatatypeContext([self.datatype, 'dictionary' , 'dataframe'], (ds, Dataset)):
+ partial = ds.to(Dataset, kdims=['x'], vdims=['Val'], groupby='y', dynamic=True)
+ self.assertEqual(partial[19]['Val'], array[:, -1, :].T.flatten().compute())
+
+ def test_dataset_groupby_drop_dims_with_vdim(self):
+ array = da.from_array(np.random.rand(3, 20, 10), 3)
+ ds = Dataset({'x': range(10), 'y': range(20), 'z': range(3), 'Val': array, 'Val2': array*2},
+ kdims=['x', 'y', 'z'], vdims=['Val', 'Val2'])
+ with DatatypeContext([self.datatype, 'dictionary' , 'dataframe'], (ds, Dataset)):
+ partial = ds.to(Dataset, kdims=['Val'], vdims=['Val2'], groupby='y')
+ self.assertEqual(partial.last['Val'], array[:, -1, :].T.flatten().compute())
+
+ def test_dataset_groupby_drop_dims_dynamic_with_vdim(self):
+ array = da.from_array(np.random.rand(3, 20, 10), 3)
+ ds = Dataset({'x': range(10), 'y': range(20), 'z': range(3), 'Val': array, 'Val2': array*2},
+ kdims=['x', 'y', 'z'], vdims=['Val', 'Val2'])
+ with DatatypeContext([self.datatype, 'dictionary' , 'dataframe'], (ds, Dataset)):
+ partial = ds.to(Dataset, kdims=['Val'], vdims=['Val2'], groupby='y', dynamic=True)
+ self.assertEqual(partial[19]['Val'], array[:, -1, :].T.flatten().compute())
+
+
@attr(optional=1)
class IrisDatasetTest(GridDatasetTest):
"""