Skip to content

Support dask.bag reductions on no partitions#2324

Merged
mrocklin merged 3 commits intodask:masterfrom
mrocklin:bag-no-partitions
May 14, 2017
Merged

Support dask.bag reductions on no partitions#2324
mrocklin merged 3 commits intodask:masterfrom
mrocklin:bag-no-partitions

Conversation

@mrocklin
Copy link
Copy Markdown
Member

No description provided.

@mrocklin mrocklin force-pushed the bag-no-partitions branch from e522bf5 to 67f3972 Compare May 10, 2017 19:21
return Item({b: task}, b)
else:
return Bag({(b, 0): task}, b, 1)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be wrong, but I think this patch is equivalent, but doesn't special case certain numbers of partitions:

--- a/dask/bag/core.py
+++ b/dask/bag/core.py
@@ -731,11 +731,10 @@ class Bag(Base):
         b = a
         fmt = '%s-aggregate-%s' % (name or funcname(aggregate), token)
         depth = 0
-        while k > 1:
+        while k > split_every:
             c = fmt + str(depth)
-            is_last = k <= split_every
             dsk2 = dict(((c, i), (empty_safe_aggregate, aggregate,
-                                  [(b, j) for j in inds], is_last))
+                                  [(b, j) for j in inds], False))
                         for i, inds in enumerate(partition_all(split_every,
                                                                range(k))))
             dsk.update(dsk2)
@@ -743,11 +742,14 @@ class Bag(Base):
             b = c
             depth += 1

+        dsk[(fmt, 0)] = (empty_safe_aggregate, aggregate,
+                         [(b, j) for j in range(k)], True)
+
         if out_type is Item:
-            dsk[b] = dsk.pop((b, 0))
-            return Item(merge(self.dask, dsk), b)
+            dsk[fmt] = dsk.pop((fmt, 0))
+            return Item(merge(self.dask, dsk), fmt)
         else:
-            return Bag(merge(self.dask, dsk), b, 1)
+            return Bag(merge(self.dask, dsk), fmt, 1)

     def sum(self, split_every=None):
         """ Sum all elements """

Suggestion by jcrist
@mrocklin
Copy link
Copy Markdown
Member Author

Merging this soon if there are no further comments

@jcrist
Copy link
Copy Markdown
Member

jcrist commented May 12, 2017

Looks good to me.

@mrocklin mrocklin merged commit 3491831 into dask:master May 14, 2017
@mrocklin mrocklin deleted the bag-no-partitions branch May 14, 2017 17:02
@sinhrks sinhrks added this to the 0.15.0 milestone Aug 30, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants