Permalink
Browse files

Merge branch 'master' into lattice-proto-v3

Conflicts:
	lib/bud/rewrite.rb
  • Loading branch information...
2 parents d4ebe83 + e5d2b0a commit ead1766999b9dbfdb48fcd0a863b5d40ab507ab5 @neilconway neilconway committed May 20, 2012
Showing with 365 additions and 103 deletions.
  1. +11 −1 History.txt
  2. +1 −1 bud.gemspec
  3. +1 −1 docs/cheat.md
  4. +28 −0 lib/bud/aggs.rb
  5. +2 −4 lib/bud/collections.rb
  6. +13 −6 lib/bud/executor/elements.rb
  7. +5 −10 lib/bud/executor/group.rb
  8. +112 −79 lib/bud/executor/join.rb
  9. +25 −1 lib/bud/rewrite.rb
  10. +97 −0 test/tc_aggs.rb
  11. +34 −0 test/tc_module.rb
  12. +36 −0 test/tc_notin.rb
View
@@ -1,5 +1,15 @@
-== 0.9.2 / ???
+== 0.9.3 / ???
+== 0.9.2 / 2012-05-19
+
+* Add new aggregate functions: bool_and() and bool_or()
+* Fix bugs in notin() stratification and implementation (#271)
+* Fix a bug in processing multi-way joins defined inside modules
+* Fix two bugs in reduce() operator
+ * Incorrect default value was sometimes returned
+ * Didn't handle reduce() outputs that aren't tuples with two fields
+* Improve reduce() operator error reporting
+* Improve MRI 1.9 compatibility
== 0.9.1 / 2012-04-10
View
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = "bud"
- s.version = "0.9.2"
+ s.version = "0.9.3"
s.authors = ["Peter Alvaro", "Neil Conway", "Joseph M. Hellerstein", "William R. Marczak", "Sriram Srinivasan"]
s.email = ["bloomdevs@gmail.com"]
s.summary = "A prototype Bloom DSL for distributed programming."
View
@@ -237,7 +237,7 @@ Finally, we output every tuple of `bc` that does *not* appear in `t`.
## SQL-style grouping/aggregation (and then some) ##
* `bc.group([:col1, :col2], min(:col3))`. *akin to min(col3) GROUP BY col1,col2*
- * exemplary aggs: `min`, `max`, `choose`
+ * exemplary aggs: `min`, `max`, `bool_and`, `bool_or`, `choose`
* summary aggs: `sum`, `avg`, `count`
* structural aggs: `accum`
* `bc.argmax([:attr1], :attr2)`      *returns the bc items per attr1 that have highest attr2*
View
@@ -70,6 +70,34 @@ def max(x)
[Max.new, x]
end
+ class BooleanAnd < ArgExemplary #:nodoc: all
+ def trans(the_state, val)
+ if val == false
+ return val, :replace
+ else
+ return the_state, :ignore
+ end
+ end
+ end
+
+ def bool_and(x)
+ [BooleanAnd.new, x]
+ end
+
+ class BooleanOr < ArgExemplary #:nodoc: all
+ def trans(the_state, val)
+ if val == true
+ return val, :replace
+ else
+ return the_state, :ignore
+ end
+ end
+ end
+
+ def bool_or(x)
+ [BooleanOr.new, x]
+ end
+
class Choose < ArgExemplary #:nodoc: all
def trans(the_state, val)
if the_state.nil?
@@ -723,16 +723,14 @@ def argmax(gbkey_cols, col)
public
def *(collection)
elem1 = to_push_elem
- j = elem1.join(collection)
- return j
+ return elem1.join(collection)
end
def group(key_cols, *aggpairs, &blk)
elem = to_push_elem
key_cols = key_cols.map{|k| canonicalize_col(k)} unless key_cols.nil?
aggpairs = aggpairs.map{|ap| [ap[0], canonicalize_col(ap[1])].compact} unless aggpairs.nil?
- g = elem.group(key_cols, *aggpairs, &blk)
- return g
+ return elem.group(key_cols, *aggpairs, &blk)
end
def notin(collection, *preds, &blk)
@@ -1,8 +1,6 @@
require 'set'
require 'bud/collections'
-ELEMENT_BUFSIZE = 1
-
module Bud
# Usage example:
# p = PushElement.new(:r) do |inp|
@@ -530,8 +528,9 @@ def scan(first_iter)
class PushReduce < PushStatefulElement
def initialize(elem_name, bud_instance, collection_name,
schema_in, initial, &blk)
- @memo = initial
+ @initial = initial
@blk = blk
+ reset_memo
super(elem_name, bud_instance, collection_name, schema)
end
@@ -540,13 +539,21 @@ def insert(i, source=nil)
end
def invalidate_cache
- @memo.clear
+ puts "#{self.class}/#{self.tabname} invalidated" if $BUD_DEBUG
+ reset_memo
+ end
+
+ def reset_memo
+ @memo = Marshal.load(Marshal.dump(@initial))
end
public
def flush
- @memo.each do |k,v|
- push_out([k,v], false)
+ unless @memo.kind_of? Enumerable
+ raise Bud::TypeError, "output of reduce must be Enumerable: #{@memo.inspect}"
+ end
+ @memo.each do |t|
+ push_out(t, false)
end
end
end
@@ -21,7 +21,6 @@ def insert(item, source)
agg = (@groups[key].nil? or @groups[key][agg_ix].nil?) ? ap[0].send(:init, agg_input) : ap[0].send(:trans, @groups[key][agg_ix], agg_input)[0]
@groups[key] ||= Array.new(@aggpairs.length)
@groups[key][agg_ix] = agg
- push_out(nil)
end
end
@@ -40,7 +39,6 @@ def flush
(1..grp.length-1).each {|i| outval << grp[i]}
push_out(outval)
end
- #@groups = {}
end
end
@@ -82,23 +80,20 @@ def insert(item, source)
@winners[key].delete t unless @winners[key].empty?
end
else
- raise "strange result from argagg finalizer"
+ raise Bud::Error, "strange result from argagg finalizer"
end
end
@groups[key] ||= Array.new(@aggpairs.length)
@groups[key][agg_ix] = agg
- #push_out(nil)
end
end
def flush
- @groups.keys.each {|g|
- @winners[g].each{|t|
+ @groups.each_key do |g|
+ @winners[g].each do |t|
push_out(t, false)
- }
- }
- #@groups = {}
- #@winners = {}
+ end
+ end
end
end
end
Oops, something went wrong.

0 comments on commit ead1766

Please sign in to comment.