Skip to content

Commit

Permalink
Merge pull request #4 from ContinuumIO/explicit_dask
Browse files Browse the repository at this point in the history
Add and test to_dask method
  • Loading branch information
martindurant committed Jul 25, 2018
2 parents cafef8f + be78a4a commit 3b033cd
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
11 changes: 2 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,9 @@ install:
- |
echo "Configuring conda."
source $HOME/miniconda3/bin/activate root
conda install -y conda-build anaconda-client
conda install -y conda-build anaconda-client conda-verify
script:
- |
flake8 .
if [ "$TRAVIS_OS_NAME" = "linux" ]; then
conda build -c intake -c defaults -c conda-forge ./conda
else
# Workaround for Travis-CI bug #2: https://github.com/travis-ci/travis-ci/issues/7773
conda build -c intake -c defaults -c conda-forge --no-test ./conda
fi
- conda build -c intake -c defaults -c conda-forge ./conda
- |
if [ -n "$TRAVIS_TAG" ]; then
# If tagged git version, upload package to main channel
Expand Down
17 changes: 15 additions & 2 deletions intake_hbase/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
class HBaseSource(base.DataSource):
"""Execute a query on HBASE
The data are returned as tuples of (ID, data) where the data is a dict
of field-value pairs.
Parameters
----------
table: str
Expand Down Expand Up @@ -62,6 +65,16 @@ def _get_partition(self, i):
else:
return self._do_query(None, None)

def to_dask(self):
"""Return a dask-bag of results"""
import dask.bag as db
import dask.delayed
dpart = dask.delayed(self._get_partition)
return db.from_delayed([dpart(i) for i in range(self.npartitions)])

def read(self):
return sum([self._get_partition(i)
for i in range(self.npartitions)], [])
"""Return all results"""
if self.divisions:
return self.to_dask().compute()
else:
return self._get_partition(None)
14 changes: 14 additions & 0 deletions tests/test_intake_hbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,17 @@ def test_read_part(engine):
assert out[0][1][b'field:name'] == b'Charlie'
out = source.read()
assert len(out) == 4


def test_read_dask(engine):
source = HBaseSource(TEST, CONNECT, divisions=list('01234'))
source.discover()
assert source.npartitions == 4
b = source.to_dask()
out, = b.take(1)
assert out[1][b'field:name'] == b'Alice'
out2 = b.compute()
assert out2[0] == out
assert len(out2) == 4
assert b.pluck(1).pluck(b'field:name').map(bytes.decode).compute() == [
'Alice', 'Bob', 'Charlie', 'Eve']

0 comments on commit 3b033cd

Please sign in to comment.