Skip to content

Commit

Permalink
==0.5.2
Browse files Browse the repository at this point in the history
  *Fixed a bug with the naming of expansion variables when expanding embedded collections or hashes
  *Refactored the cube processing logic a little to make processing an aggregation somewhat safe even
   while the aggregation is being queried, by not dropping cached aggregated data but simply overwriting it
   when map reduce is complete.
  • Loading branch information
Nathan committed Aug 11, 2010
1 parent 019f1fa commit f7a3ce8
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 41 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rdoc
@@ -1,3 +1,9 @@
==0.5.2
*Fixed a bug with the naming of expansion variables when expanding embedded collections or hashes
*Refactored the cube processing logic a little to make processing an aggregation somewhat safe even
while the aggregation is being queried, by not dropping cached aggregated data but simply overwriting it
when map reduce is complete.

==0.5.1
*Minor adjustment to prevent Cubicle from vomiting blood and pieces of chewed up cats when filtering a query
using MongoDB's new $or operator.
Expand Down
8 changes: 4 additions & 4 deletions cubicle.gemspec
Expand Up @@ -5,11 +5,11 @@

Gem::Specification.new do |s|
s.name = %q{cubicle}
s.version = "0.5.1"
s.version = "0.5.2"

s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Nathan Stults"]
s.date = %q{2010-08-09}
s.date = %q{2010-08-11}
s.description = %q{Cubicle provides a dsl and aggregation caching framework for automating the generation, execution and caching of map reduce queries when using MongoDB in Ruby. Cubicle also includes a MongoMapper plugin for quickly performing ad-hoc, multi-level group-by queries against a MongoMapper model.}
s.email = %q{hereiam@sonic.net}
s.extra_rdoc_files = [
Expand Down Expand Up @@ -83,7 +83,7 @@ Gem::Specification.new do |s|
s.homepage = %q{http://github.com/PlasticLizard/cubicle}
s.rdoc_options = ["--charset=UTF-8"]
s.require_paths = ["lib"]
s.rubygems_version = %q{1.3.6}
s.rubygems_version = %q{1.3.7}
s.summary = %q{Pseudo-Multi Dimensional analysis / simplified aggregation for MongoDB in Ruby (NOLAP ;))}
s.test_files = [
"test/cubicle/aggregation/ad_hoc_test.rb",
Expand Down Expand Up @@ -114,7 +114,7 @@ Gem::Specification.new do |s|
current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
s.specification_version = 3

if Gem::Version.new(Gem::RubyGemsVersion) >= Gem::Version.new('1.2.0') then
if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
s.add_runtime_dependency(%q<activesupport>, [">= 2.3"])
s.add_runtime_dependency(%q<mongo>, [">= 0.18.3"])
s.add_runtime_dependency(%q<mustache>, [">= 0.10.0"])
Expand Down
16 changes: 11 additions & 5 deletions lib/cubicle/aggregation/aggregation_manager.rb
Expand Up @@ -67,27 +67,33 @@ def execute_query(query,options={})

def process(options={})
@metadata.update_processing_stats do
expire!
aggregate(aggregation,options.merge(:reason=>"Processing fact collection"))

@metadata.unprotect_all #Mark all aggregations available for deletion.
#Sort desc by length of array, so that larget
#aggregations are processed first, hopefully increasing efficiency
#of the processing step
aggregation.aggregations.sort!{|a,b|b.length<=>a.length}
aggregation.aggregations.each do |member_list|
agg_start = Time.now
aggregation_for(aggregation.query(:defer=>true){select member_list})
aggregation_for(aggregation.query(:defer=>true){select member_list},true)
Cubicle.logger.info "#{aggregation.name} aggregation #{member_list.inspect} processed in #{Time.now-agg_start} seconds"
end
expire_metadata!
end
end

def expire!
@profiler.measure(:expire_aggregations, :reason=>"Expire aggregations") do
collection.drop
@metadata.expire!
expire_metadata! :force=>true
end
end

def expire_metadata!(opts={})
@metadata.expire!(opts)
end

def aggregate(query,options={})
view = AggregationView.new(aggregation,query)

Expand Down Expand Up @@ -133,7 +139,7 @@ def aggregate(query,options={})
protected


def aggregation_for(query)
def aggregation_for(query, protect=false)
#return collection if query.all_dimensions?

aggregation_query = query.clone
Expand All @@ -142,7 +148,7 @@ def aggregation_for(query)
filter.keys.each {|filter_key|aggregation_query.select(filter_key) unless filter_key.to_s.start_with?("$")} unless filter.blank?

dimension_names = aggregation_query.dimension_names.sort
@metadata.aggregation_for(dimension_names)
@metadata.aggregation_for(dimension_names,protect)
end

def ensure_indexes(collection_name,dimension_names)
Expand Down
71 changes: 47 additions & 24 deletions lib/cubicle/aggregation/aggregation_metadata.rb
Expand Up @@ -12,6 +12,10 @@ def collection=(collection_name)
@@aggregations_collection_name = collection_name
end

def aggregation_collection_name(cubicle_name,aggregation_id)
"cubicle.aggregation.#{cubicle_name}._#{aggregation_id}"
end

def min_records_to_reduce
@min_records_to_reduce ||= 100
end
Expand All @@ -20,44 +24,63 @@ def min_records_to_reduce=(min)
@min_records_to_reduce = min
end

def expire(aggregation)
def unprotect_all(aggregation)
aggregation_name = case aggregation
when String then aggregation
when Symbol then aggregation.to_s
when Cubicle::Aggregation::CubicleMetadata then aggregation.aggregation.name
else aggregation.name
end
Cubicle.mongo.database.collection_names.each do |col|
Cubicle.mongo.database[col].drop if col =~ /cubicle.aggregation.#{aggregation_name}._*/i
collection.remove(:aggregation=>aggregation_name)
when String then aggregation
when Symbol then aggregation.to_s
when Cubicle::Aggregation::CubicleMetadata then aggregation.aggregation.name
else aggregation.name
end
collection.update({:aggregation=>aggregation_name},{"$set"=>{:protect=>false}})
end

def expire(aggregation,opts={})
aggregation_name = case aggregation
when String then aggregation
when Symbol then aggregation.to_s
when Cubicle::Aggregation::CubicleMetadata then aggregation.aggregation.name
else aggregation.name
end
collection.find(:aggregation=>aggregation_name).each do |agg|
next if agg["protect"] && !opts[:force]
col_name = aggregation_collection_name(aggregation_name,agg["_id"].to_s)
Cubicle.mongo.database[col_name].drop if Cubicle.mongo.database[col_name]
collection.remove(:_id=>agg["_id"])
end
end
end

def initialize(cubicle_metadata,member_names_or_attribute_hash)
def initialize(cubicle_metadata,member_names_or_attribute_hash,protect=false)
@cubicle_metadata = cubicle_metadata
@attributes = nil
if (member_names_or_attribute_hash.kind_of?(Hash))
@attributes = member_names_or_attribute_hash
else
member_names = member_names_or_attribute_hash
@candidate_aggregation = self.class.collection.find(
:aggregation=>@cubicle_metadata.aggregation.name,
:member_names=>{"$all"=>member_names}, :document_count=>{"$gte"=>0}).sort([:document_count, :asc]).limit(1).next_document

unless protect
@candidate_aggregation = self.class.collection.find(
:aggregation=>@cubicle_metadata.aggregation.name,
:member_names=>{"$all"=>member_names}, :document_count=>{"$gte"=>0}).sort([:document_count, :asc]).limit(1).next_document


#since the operator used in the query was $all, having equal lengths in the original and returned
#member array means that they are identical, which means that regardless of the number of documents
#in the aggregation, it is the candidate we want. Otherwise, we'll check to see if we
#boil down the data further, or just make our soup with what we've got.
@attributes = @candidate_aggregation if @candidate_aggregation &&
(@candidate_aggregation["member_names"].length == member_names.length ||
@candidate_aggregation["document_count"] < self.class.min_records_to_reduce)
#since the operator used in the query was $all, having equal lengths in the original and returned
#member array means that they are identical, which means that regardless of the number of documents
#in the aggregation, it is the candidate we want. Otherwise, we'll check to see if we
#boil down the data further, or just make our soup with what we've got.
@attributes = @candidate_aggregation if @candidate_aggregation &&
(@candidate_aggregation["member_names"].length == member_names.length ||
@candidate_aggregation["document_count"] < self.class.min_records_to_reduce)

end

unless @attributes
self.class.collection.remove(:aggregation=>@cubicle_metadata.aggregation.name,:member_names=>member_names)
@attributes = HashWithIndifferentAccess.new({:aggregation=>@cubicle_metadata.aggregation.name,
:member_names=>member_names,
:document_count=>-1,
:created_at=>Time.now.utc})
:created_at=>Time.now.utc,
:protect=>protect})

#materialize the aggregation, and, if the operation was successful,
#register it as available for use by future queries
Expand All @@ -69,7 +92,7 @@ def initialize(cubicle_metadata,member_names_or_attribute_hash)
end

def target_collection_name
"cubicle.aggregation.#{@cubicle_metadata.aggregation.name}._#{@attributes["_id"].to_s}"
AggregationMetadata.aggregation_collection_name(@cubicle_metadata.aggregation.name,@attributes["_id"].to_s)
end

def source_collection_name
Expand All @@ -84,8 +107,8 @@ def member_names; @attributes["member_names"] || []; end

def materialized?
document_count >= 0 &&
(!@collection.blank? ||
Cubicle.mongo.database.collection_names.include?(target_collection_name))
(!@collection.blank? ||
Cubicle.mongo.database.collection_names.include?(target_collection_name))
end

def collection
Expand Down
12 changes: 8 additions & 4 deletions lib/cubicle/aggregation/cubicle_metadata.rb
Expand Up @@ -18,12 +18,16 @@ def initialize(aggregation)
@aggregation = aggregation
end

def aggregation_for(member_names = [])
AggregationMetadata.new(self,member_names)
def aggregation_for(member_names = [], force=false)
AggregationMetadata.new(self,member_names,force)
end

def expire!
AggregationMetadata.expire(self)
def expire!(opts={})
AggregationMetadata.expire(self,opts)
end

def unprotect_all
AggregationMetadata.unprotect_all(self)
end

def update_processing_stats
Expand Down
2 changes: 1 addition & 1 deletion lib/cubicle/version.rb
@@ -1,3 +1,3 @@
module Cubicle
VERSION = '0.5.1'
VERSION = '0.5.2'
end
8 changes: 6 additions & 2 deletions test/cubicle/aggregation/aggregation_metadata_test.rb
Expand Up @@ -77,16 +77,20 @@ class AggregationMetadataTest < ActiveSupport::TestCase
should "drop any aggregation columns and remove metadata rows from the database" do
Defect.create_test_data
DefectCubicle.process
Cubicle::Aggregation::AggregationMetadata.min_records_to_reduce = 1
@cm = Cubicle::Aggregation::CubicleMetadata.new(DefectCubicle)
agg_info = Cubicle::Aggregation::AggregationMetadata.new(@cm,[:product])

assert Cubicle.mongo.database.collection_names.include?(agg_info.target_collection_name)
assert Cubicle::Aggregation::AggregationMetadata.collection.find(:aggregation=>"DefectCubicle").count > 0
#two standard aggregations and the ad hoc one just created
assert_equal 3, Cubicle::Aggregation::AggregationMetadata.collection.find(:aggregation=>"DefectCubicle").count

Cubicle::Aggregation::AggregationMetadata.expire(@cm)

#the two standard ('protected') aggregations should remain, the ad hoc one should be gone
assert !Cubicle.mongo.database.collection_names.include?(agg_info.target_collection_name)
assert_equal 0, Cubicle::Aggregation::AggregationMetadata.collection.find(:aggregation=>"DefectCubicle").count
assert_equal 2, Cubicle::Aggregation::AggregationMetadata.collection.find(:aggregation=>"DefectCubicle").count
Cubicle::Aggregation::AggregationMetadata.min_records_to_reduce = nil
end
end
end
2 changes: 1 addition & 1 deletion test/cubicles/hash_pipe_cubicle.rb
Expand Up @@ -4,7 +4,7 @@ class HashPipeCubicle
source_collection "defects"
expand :hash_pipes

define :score, '(hash_pipe_value.vote == "no" ? -1 : 1) * hash_pipe_value.weight'
define :score, '(hash_pipe.vote == "no" ? -1 : 1) * hash_pipe.weight'

dimension :product, :field_name=>'product.name'

Expand Down

0 comments on commit f7a3ce8

Please sign in to comment.