Skip to content
Browse files

Bump version

  • Loading branch information...
1 parent 2447338 commit 6e8ac20ffbd980575c40d100e85b79173ba685f3 @PlasticLizard committed May 5, 2010
View
5 CHANGELOG.rdoc
@@ -1,3 +1,8 @@
+==0.1.21
+ *Added metadata tables in the database for cubicle to manage aggregation info. This was necessary because previously
+ I was trying to overload the collection name with metadata, which was making the names longer than MongoDb could support
+ and causing errors. This change will enable richer monitoring and profiling and optimization in the near future.
+
==0.1.20
*Updated to work with mongo driver 1.0 (and therefore latest versions of MongoMapper)
View
10 cubicle.gemspec
@@ -5,11 +5,11 @@
Gem::Specification.new do |s|
s.name = %q{cubicle}
- s.version = "0.1.20"
+ s.version = "0.1.21"
s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Nathan Stults"]
- s.date = %q{2010-05-03}
+ s.date = %q{2010-05-05}
s.description = %q{Cubicle provides a dsl and aggregation caching framework for automating the generation, execution and caching of map reduce queries when using MongoDB in Ruby. Cubicle also includes a MongoMapper plugin for quickly performing ad-hoc, multi-level group-by queries against a MongoMapper model.}
s.email = %q{hereiam@sonic.net}
s.extra_rdoc_files = [
@@ -27,7 +27,9 @@ Gem::Specification.new do |s|
"lib/cubicle/aggregation.rb",
"lib/cubicle/aggregation/ad_hoc.rb",
"lib/cubicle/aggregation/aggregation_manager.rb",
+ "lib/cubicle/aggregation/aggregation_metadata.rb",
"lib/cubicle/aggregation/aggregation_view.rb",
+ "lib/cubicle/aggregation/cubicle_metadata.rb",
"lib/cubicle/aggregation/dsl.rb",
"lib/cubicle/aggregation/map_reduce_helper.rb",
"lib/cubicle/bucketized_dimension.rb",
@@ -55,6 +57,8 @@ Gem::Specification.new do |s|
"lib/cubicle/version.rb",
"test/config/database.yml",
"test/cubicle/aggregation/ad_hoc_test.rb",
+ "test/cubicle/aggregation/aggregation_metadata_test.rb",
+ "test/cubicle/aggregation/cubicle_metadata_test.rb",
"test/cubicle/bucketized_dimension_test.rb",
"test/cubicle/cubicle_aggregation_test.rb",
"test/cubicle/cubicle_query_test.rb",
@@ -77,6 +81,8 @@ Gem::Specification.new do |s|
s.summary = %q{Pseudo-Multi Dimensional analysis / simplified aggregation for MongoDB in Ruby (NOLAP ;))}
s.test_files = [
"test/cubicle/aggregation/ad_hoc_test.rb",
+ "test/cubicle/aggregation/aggregation_metadata_test.rb",
+ "test/cubicle/aggregation/cubicle_metadata_test.rb",
"test/cubicle/bucketized_dimension_test.rb",
"test/cubicle/cubicle_aggregation_test.rb",
"test/cubicle/cubicle_query_test.rb",
View
2 lib/cubicle.rb
@@ -25,6 +25,8 @@
"data/level",
"data/hierarchy",
"data/table",
+ "aggregation/aggregation_metadata",
+ "aggregation/cubicle_metadata",
"aggregation/aggregation_view",
"aggregation/aggregation_manager",
"aggregation/map_reduce_helper",
View
108 lib/cubicle/aggregation/aggregation_manager.rb
@@ -2,10 +2,11 @@ module Cubicle
module Aggregation
class AggregationManager
- attr_reader :aggregation
+ attr_reader :aggregation, :metadata
def initialize(aggregation)
@aggregation = aggregation
+ @metadata = Cubicle::Aggregation::CubicleMetadata.new(aggregation)
end
def database
@@ -32,26 +33,28 @@ def execute_query(query,options={})
find_options[:sort] = prepare_order_by(query)
filter = {}
+
if query == aggregation || query.transient?
- aggregation = aggregate(query,options)
+ reduction = aggregate(query,options)
else
process_if_required
- aggregation = aggregation_for(query)
+ agg_data = aggregation_for(query)
+ reduction = agg_data.collection
#if the query exactly matches the aggregation in terms of requested members, we can issue a simple find
#otherwise, a second map reduce is required to reduce the data set one last time
- if query.all_dimensions? || ((aggregation.name.split("_")[-1].split(".")) - query.member_names - [:all_measures]).blank?
+ if query.all_dimensions? || (agg_data.member_names - query.member_names - [:all_measures]).blank?
filter = prepare_filter(query,options[:where] || {})
else
- aggregation = aggregate(query,:source_collection=>aggregation.name)
+ reduction = aggregate(query,:source_collection=>agg_data.target_collection_name)
end
end
- if aggregation.blank?
- Cubicle::Data::Table.new(query,[],0) if aggregation == []
+ if reduction.blank?
+ Cubicle::Data::Table.new(query,[],0)
else
- count = aggregation.count
- results = aggregation.find(filter,find_options).to_a
- aggregation.drop if aggregation.name =~ /^tmp.mr.*/
+ count = reduction.count
+ results = reduction.find(filter,find_options).to_a
+ reduction.drop if reduction.name =~ /^tmp.mr.*/
Cubicle::Data::Table.new(query, results, count)
end
@@ -77,60 +80,50 @@ def process(options={})
def expire!
collection.drop
- expire_aggregations!
+ @metadata.expire!
end
- protected
+ def aggregate(query,options={})
+ view = AggregationView.new(aggregation,query)
- def aggregation_collection_names
- database.collection_names.select {|col_name|col_name=~/#{aggregation.target_collection_name}_aggregation_(.*)/}
- end
+ map, reduce = MapReduceHelper.generate_map_function(query), MapReduceHelper.generate_reduce_function
- def expire_aggregations!
- aggregation_collection_names.each{|agg_col|database[agg_col].drop}
- end
+ options[:finalize] = MapReduceHelper.generate_finalize_function(query)
+ options["query"] = expand_template(prepare_filter(query,options[:where] || {}),view)
+
+ query.source_collection_name = options.delete(:source_collection) || query.source_collection_name || aggregation.source_collection_name
- def find_best_source_collection(dimension_names, existing_aggregations=self.aggregation_collection_names)
- #format of aggregation collection names is source_cubicle_collection_aggregation_dim1.dim2.dim3.dimn
- #this next ugly bit of algebra will create 2d array containing a list of the dimension names in each existing aggregation
- existing = existing_aggregations.map do |agg_col_name|
- agg_col_name.gsub("#{target_collection_name}_aggregation_","").split(".")
+ target_collection = options.delete(:target_collection)
+ target_collection ||= query.target_collection_name if query.respond_to?(:target_collection_name)
+
+ options[:out] = target_collection unless target_collection.blank? || query.transient?
+
+ #This is defensive - some tests run without ever initializing any collections
+ unless database.collection_names.include?(query.source_collection_name)
+ Cubicle.logger.info "No collection was found in the database with a name of #{query.source_collection_name}"
+ return []
end
- #This will select all the aggregations that contain ALL of the desired dimension names
- #we are sorting by length because the aggregation with the least number of members
- #is likely to be the most efficient data source as it will likely contain the smallest number of rows.
- #this will not always be true, and situations may exist where it is rarely true, however the alternative
- #is to actually count rows of candidates, which seems a bit wasteful. Of course only the profiler knows,
- #but until there is some reason to believe the aggregation caching process needs be highly performant,
- #this should do for now.
- candidates = existing.select {|candidate|(dimension_names - candidate).blank?}.sort {|a,b|a.length <=> b.length}
+ result = database[query.source_collection_name].map_reduce(expand_template(map, view),reduce,options)
- #If no suitable aggregation exists to base this one off of,
- #we'll just use the base cubes aggregation collection
- return target_collection_name if candidates.blank?
- "#{target_collection_name}_aggregation_#{candidates[0].join('.')}"
+ ensure_indexes(target_collection,query.dimension_names) if target_collection
+ result
end
+ protected
+
+
def aggregation_for(query)
- return collection if query.all_dimensions?
+ #return collection if query.all_dimensions?
aggregation_query = query.clone
#If the query needs to filter on a field, it had better be in the aggregation...if it isn't a $where filter...
filter = (query.where if query.respond_to?(:where))
filter.keys.each {|filter_key|aggregation_query.select(filter_key) unless filter_key=~/\$where/} unless filter.blank?
dimension_names = aggregation_query.dimension_names.sort
- agg_col_name = "#{aggregation.target_collection_name}_aggregation_#{dimension_names.join('.')}"
-
- unless database.collection_names.include?(agg_col_name)
- source_col_name = find_best_source_collection(dimension_names)
- exec_query = aggregation.query(dimension_names + [:all_measures], :source_collection=>source_col_name, :defer=>true)
- aggregate(exec_query, :target_collection=>agg_col_name)
- end
-
- database[agg_col_name]
+ @metadata.aggregation_for(dimension_names)
end
def ensure_indexes(collection_name,dimension_names)
@@ -146,31 +139,6 @@ def ensure_indexes(collection_name,dimension_names)
#col.create_index(dimension_names.map{|dim|[dim,1]})
end
- def aggregate(query,options={})
- view = AggregationView.new(aggregation,query)
-
- map, reduce = MapReduceHelper.generate_map_function(query), MapReduceHelper.generate_reduce_function
-
- options[:finalize] = MapReduceHelper.generate_finalize_function(query)
- options["query"] = expand_template(prepare_filter(query,options[:where] || {}),view)
-
- query.source_collection_name = options.delete(:source_collection) || query.source_collection_name || aggregation.source_collection_name
-
- target_collection = options.delete(:target_collection)
- target_collection ||= query.target_collection_name if query.respond_to?(:target_collection_name)
-
- options[:out] = target_collection unless target_collection.blank? || query.transient?
-
- #This is defensive - some tests run without ever initializing any collections
- return [] unless database.collection_names.include?(query.source_collection_name)
-
- result = database[query.source_collection_name].map_reduce(expand_template(map, view),reduce,options)
-
- ensure_indexes(target_collection,query.dimension_names) if target_collection
-
- result
- end
-
def expand_template(template,view)
return "" unless template
return Mustache.render(template,view) if template.is_a?(String)
View
121 lib/cubicle/aggregation/aggregation_metadata.rb
@@ -0,0 +1,121 @@
+module Cubicle
+ module Aggregation
+ class AggregationMetadata
+ class << self
+
+ def collection
+ @@aggregations_collection_name ||= "#{Cubicle::Aggregation::CubicleMetadata.collection.name}.aggregations"
+ Cubicle.mongo.database[@@aggregations_collection_name]
+ end
+
+ def collection=(collection_name)
+ @@aggregations_collection_name = collection_name
+ end
+
+ def min_records_to_reduce
+ @min_records_to_reduce ||= 100
+ end
+
+ def min_records_to_reduce=(min)
+ @min_records_to_reduce = min
+ end
+
+ def expire(aggregation)
+ aggregation_name = case aggregation
+ when String then aggregation
+ when Symbol then aggregation.to_s
+ when Cubicle::Aggregation::CubicleMetadata then aggregation.aggregation.name
+ else aggregation.name
+ end
+ Cubicle.mongo.database.collection_names.each do |col|
+ Cubicle.mongo.database[col].drop if col =~ /cubicle.aggregation.#{aggregation_name}._*/i
+ collection.remove(:aggregation=>aggregation_name)
+ end
+ end
+ end
+
+ def initialize(cubicle_metadata,member_names_or_attribute_hash)
+ @cubicle_metadata = cubicle_metadata
+ if (member_names_or_attribute_hash.kind_of?(Hash))
+ @attributes = member_names_or_attribute_hash
+ else
+ member_names = member_names_or_attribute_hash
+ @candidate_aggregation = self.class.collection.find(
+ :aggregation=>@cubicle_metadata.aggregation.name,
+ :member_names=>{"$all"=>member_names}, :document_count=>{"$gte"=>0}).sort([:document_count, :asc]).limit(1).next_document
+
+
+ #since the operator used in the query was $all, having equal lengths in the original and returned
+ #member array means that they are identical, which means that regardless of the number of documents
+ #in the aggregation, it is the candidate we want. Otherwise, we'll check to see if we
+ #boil down the data further, or just make our soup with what we've got.
+ @attributes = @candidate_aggregation if @candidate_aggregation &&
+ (@candidate_aggregation["member_names"].length == member_names.length ||
+ @candidate_aggregation["document_count"] < self.class.min_records_to_reduce)
+
+ unless @attributes
+ @attributes = HashWithIndifferentAccess.new({:aggregation=>@cubicle_metadata.aggregation.name,
+ :member_names=>member_names,
+ :document_count=>-1})
+
+ #materialize the aggregation, and, if the operation was successful,
+ #register it as available for use by future queries
+ @attributes[:_id] = self.class.collection.insert(@attributes)
+ materialize!
+ end
+
+ end
+ end
+
+ def target_collection_name
+ "cubicle.aggregation.#{@cubicle_metadata.aggregation.name}._#{@attributes["_id"].to_s}"
+ end
+
+ def source_collection_name
+ if @candidate_aggregation
+ candidate = Cubicle::Aggregation::AggregationMetadata.new(@cubicle_metadata,@candidate_aggregation)
+ return candidate.target_collection_name
+ end
+ @cubicle_metadata.aggregation.target_collection_name
+ end
+
+ def member_names; @attributes["member_names"] || []; end
+
+ def materialized?
+ document_count >= 0 &&
+ (!@collection.blank? ||
+ Cubicle.mongo.database.collection_names.include?(target_collection_name))
+ end
+
+ def collection
+ @collection ||= Cubicle.mongo.database[target_collection_name] if materialized?
+ end
+
+ def collection=(collection)
+ @collection = collection
+ end
+
+ def document_count
+ @attributes["document_count"]
+ end
+
+ protected
+ def update_document_count!(new_doc_count)
+ self.class.collection.update({:_id=>@attributes[:_id]}, "$set"=>{:document_count=>new_doc_count})
+ @attributes["document_count"]=new_doc_count
+ end
+
+ def materialize!
+ unless materialized?
+ exec_query = @cubicle_metadata.aggregation.query(member_names + [:all_measures],
+ :source_collection=>source_collection_name,
+ :defer=>true)
+ self.collection = @cubicle_metadata.aggregation.aggregator.aggregate(exec_query,
+ :target_collection=>target_collection_name)
+ end
+ update_document_count!(@collection.count) unless @collection.blank?
+ end
+
+ end
+ end
+end
View
30 lib/cubicle/aggregation/cubicle_metadata.rb
@@ -0,0 +1,30 @@
+module Cubicle
+ module Aggregation
+ class CubicleMetadata
+
+ class << self
+
+ def collection
+ @@collection_name ||= "cubicle.metadata"
+ Cubicle.mongo.database[@@collection_name]
+ end
+ def collection=(collection_name)
+ @@collection_name = collection_name
+ end
+ end
+
+ attr_reader :aggregation
+ def initialize(aggregation)
+ @aggregation = aggregation
+ end
+
+ def aggregation_for(member_names = [])
+ AggregationMetadata.new(self,member_names)
+ end
+
+ def expire!
+ AggregationMetadata.expire(self)
+ end
+ end
+ end
+end
View
2 lib/cubicle/aggregation/dsl.rb
@@ -11,7 +11,7 @@ def source_collection_name(collection_name = nil)
def target_collection_name(collection_name = nil)
return nil if transient?
return @target_name = collection_name if collection_name
- @target_name ||= "#{name.blank? ? source_collection_name : name.underscore.pluralize}_cubicle"
+ @target_name ||= "cubicle.fact.#{name.blank? ? source_collection_name : name.underscore}"
end
alias target_collection_name= target_collection_name
View
89 test/cubicle/aggregation/aggregation_metadata_test.rb
@@ -0,0 +1,89 @@
+require "test_helper"
+
+class AggregationMetadataTest < ActiveSupport::TestCase
+ context "Class level collection names" do
+ should "use appropriate default values for the aggregations collection" do
+ assert_equal "cubicle.metadata.aggregations", Cubicle::Aggregation::AggregationMetadata.collection.name
+ end
+ end
+
+ context "AggregationMetadata.update_document_count" do
+ setup do
+ @cm = Cubicle::Aggregation::CubicleMetadata.new(DefectCubicle)
+ end
+ should "update the document count for a given aggregation instance" do
+ agg_info = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product])
+ agg_info.send(:update_document_count!,1024)
+ assert_equal 1024, agg_info.document_count
+ assert_equal false,agg_info.materialized?
+ end
+ end
+
+ context "AggregationMetadata#new" do
+ setup do
+ @cm = Cubicle::Aggregation::CubicleMetadata.new(DefectCubicle)
+ end
+ should "create initialize an instance of AggregationMetadata in the database" do
+ agg_info = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product,:region])
+ assert /cubicle.aggregation.DefectCubicle._+/ =~ agg_info.target_collection_name
+ assert_equal [:product,:region], agg_info.member_names
+ assert_equal false, agg_info.materialized?
+ assert_nil agg_info.collection
+ end
+ should "fetch an existing aggregation from the database" do
+ ag = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product,:region])
+ ag.send(:update_document_count!,1)
+ col_name = ag.target_collection_name
+ assert_equal col_name, Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product,:region]).target_collection_name
+ end
+ should "ignore an existing aggregation that does not satisfy all fields" do
+ ag = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product])
+ ag.send(:update_document_count!,1)
+ col_name = ag.target_collection_name
+ assert col_name != Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product,:region]).target_collection_name
+ end
+ should "select an existing aggregation with rows below the minimum threshold instead of creating a new one" do
+ agg_info = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product,:region,:operator])
+ agg_info.send(:update_document_count!,99)
+ assert_equal agg_info.target_collection_name, Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product]).target_collection_name
+ end
+
+ should "ignore an existing aggregation with too many rows, but store that aggregation as a candidate source for use when materializing the aggregation" do
+ agg_info = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product,:region,:operator])
+ agg_info.send(:update_document_count!,101)
+ new_agg_info = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product])
+ assert agg_info.target_collection_name != new_agg_info.target_collection_name
+ assert_equal agg_info.target_collection_name, new_agg_info.source_collection_name
+ end
+ end
+
+ context "AggregationMetadata#materialize!" do
+ should "run a map reduce and produce the resulting collection" do
+ Defect.create_test_data
+ DefectCubicle.process
+ @cm = Cubicle::Aggregation::CubicleMetadata.new(DefectCubicle)
+ agg_info = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product])
+ aggregation = agg_info.collection
+ assert_not_nil aggregation
+ assert aggregation.count > 0
+ assert_equal aggregation.count, agg_info.document_count
+ end
+ end
+
+ context "AggregationMetadata.expire" do
+ should "drop any aggregation columns and remove metadata rows from the database" do
+ Defect.create_test_data
+ DefectCubicle.process
+ @cm = Cubicle::Aggregation::CubicleMetadata.new(DefectCubicle)
+ agg_info = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product])
+
+ assert Cubicle.mongo.database.collection_names.include?(agg_info.target_collection_name)
+ assert Cubicle::Aggregation::AggregationMetadata.collection.find(:aggregation=>"DefectCubicle").count > 0
+
+ Cubicle::Aggregation::AggregationMetadata.expire(@cm)
+
+ assert !Cubicle.mongo.database.collection_names.include?(agg_info.target_collection_name)
+ assert_equal 0, Cubicle::Aggregation::AggregationMetadata.collection.find(:aggregation=>"DefectCubicle").count
+ end
+ end
+end
View
9 test/cubicle/aggregation/cubicle_metadata_test.rb
@@ -0,0 +1,9 @@
+require "test_helper"
+
+class CubicleMetadataTest < ActiveSupport::TestCase
+ context "Class level collection names" do
+ should "use appropriate default values for the metadata collection" do
+ assert_equal "cubicle.metadata", Cubicle::Aggregation::CubicleMetadata.collection.name
+ end
+ end
+end
View
24 test/cubicle/cubicle_aggregation_test.rb
@@ -16,6 +16,8 @@ class CubicleAggregationTest < ActiveSupport::TestCase
puts @results.inspect
assert_equal 4, @results.length
+ @results.sort!{|x,y|x.manufacture_date<=>y.manufacture_date}
+
assert_equal "2009-12-09", @results[0]["manufacture_date"]
assert_equal "2009-12", @results[0]["month"]
assert_equal "2009", @results[0]["year"]
@@ -94,17 +96,17 @@ class CubicleAggregationTest < ActiveSupport::TestCase
end
end
- context "Processing a cube" do
- setup do
- DefectCubicle.expire!
- DefectCubicle.process
- end
- should "should create the specified aggregations" do
- assert Cubicle.mongo.database.collection_names.include? "defect_cubicles_cubicle_aggregation_month.product.year"
- assert Cubicle.mongo.database.collection_names.include? "defect_cubicles_cubicle_aggregation_month.region"
- end
-
- end
+# context "Processing a cube" do
+# setup do
+# DefectCubicle.expire!
+# DefectCubicle.process
+# end
+# should "should create the specified aggregations" do
+# assert Cubicle.mongo.database.collection_names.include? "defect_cubicles_cubicle_aggregation_month.product.year"
+# assert Cubicle.mongo.database.collection_names.include? "defect_cubicles_cubicle_aggregation_month.region"
+# end
+#
+# end
end
end
View
2 test/cubicle/cubicle_query_test.rb
@@ -25,7 +25,7 @@ class CubicleQueryTest < ActiveSupport::TestCase
select :product, :total_defects
end
query.execute()
- assert_equal "defect_cubicles_cubicle", query.source_collection_name
+ assert query.source_collection_name =~ /cubicle.aggregation.DefectCubicle._+/
end
should "Select dimensions in the by clause" do
query_results = DefectCubicle.query do
View
8 test/cubicle/mongo_mapper/aggregate_plugin_test.rb
@@ -1,14 +1,10 @@
begin
require "rubygems"
require "mongo_mapper"
- begin
- require "test_helper"
- rescue LoadError
- require "../../test_helper"
- end
+ require "test_helper"
require File.join(File.dirname(__FILE__),"../../../lib/cubicle/mongo_mapper/aggregate_plugin")
-
+
class AggregatePluginTest < ActiveSupport::TestCase
context "A freshly minted MongoMapper model" do
setup do

0 comments on commit 6e8ac20

Please sign in to comment.
Something went wrong with that request. Please try again.