Skip to content

Commit

Permalink
Create new partd for every bag groupby (#1867)
Browse files Browse the repository at this point in the history
Fixes #1851
  • Loading branch information
mrocklin committed Dec 26, 2016
1 parent b2b3444 commit 76fffb2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
4 changes: 2 additions & 2 deletions dask/bag/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1590,9 +1590,9 @@ def groupby_disk(b, grouper, npartitions=None, blocksize=2**20):
import partd
p = ('partd-' + token,)
try:
dsk1 = {p: (partd.Python, (partd.Snappy, partd.File()))}
dsk1 = {p: (partd.Python, (partd.Snappy, (partd.File,)))}
except AttributeError:
dsk1 = {p: (partd.Python, partd.File())}
dsk1 = {p: (partd.Python, (partd.File,))}

# Partition data on disk
name = 'groupby-part-{0}-{1}'.format(funcname(grouper), token)
Expand Down
6 changes: 6 additions & 0 deletions dask/bag/tests/test_bag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,3 +1100,9 @@ def func(part):
res = b.reduction(func, sum)

assert res.compute(get=dask.get) == sum(range(10))


def test_repeated_groupby():
b = db.range(10, npartitions=4)
c = b.groupby(lambda x: x % 3)
assert valmap(len, dict(c)) == valmap(len, dict(c))

0 comments on commit 76fffb2

Please sign in to comment.