Skip to content

Commit

Permalink
Added a profiler which can be enabled by calling Cubicle::Aggregation…
Browse files Browse the repository at this point in the history
…::Profiler.enabled = true

Profile results will then be sent to cubicle.profiler
  • Loading branch information
Nathan committed May 7, 2010
1 parent 11eb273 commit 226ac64
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 42 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rdoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
==0.1.22
*Added profiler, which is off by default, that will profile every operation performed by the aggregation engine
in a collection called cubicle.profiler. It uses a capped collection size at, by default, 250mb, but this can
be configured. Also added basic metadata table for overall aggregation definitions that will record the last processing
time, along with the duration of the processing job, in the collection cubicle.metadata

==0.1.21
*Added metadata tables in the database for cubicle to manage aggregation info. This was necessary because previously
I was trying to overload the collection name with metadata, which was making the names longer than MongoDb could support
Expand Down
5 changes: 3 additions & 2 deletions cubicle.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

Gem::Specification.new do |s|
s.name = %q{cubicle}
s.version = "0.1.21"
s.version = "0.1.22"

s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Nathan Stults"]
s.date = %q{2010-05-05}
s.date = %q{2010-05-07}
s.description = %q{Cubicle provides a dsl and aggregation caching framework for automating the generation, execution and caching of map reduce queries when using MongoDB in Ruby. Cubicle also includes a MongoMapper plugin for quickly performing ad-hoc, multi-level group-by queries against a MongoMapper model.}
s.email = %q{hereiam@sonic.net}
s.extra_rdoc_files = [
Expand All @@ -32,6 +32,7 @@ Gem::Specification.new do |s|
"lib/cubicle/aggregation/cubicle_metadata.rb",
"lib/cubicle/aggregation/dsl.rb",
"lib/cubicle/aggregation/map_reduce_helper.rb",
"lib/cubicle/aggregation/profiler.rb",
"lib/cubicle/bucketized_dimension.rb",
"lib/cubicle/calculated_measure.rb",
"lib/cubicle/data.rb",
Expand Down
1 change: 1 addition & 0 deletions lib/cubicle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"aggregation/aggregation_manager",
"aggregation/map_reduce_helper",
"aggregation/dsl",
"aggregation/profiler",
"aggregation",
"aggregation/ad_hoc",
"date_time",
Expand Down
91 changes: 65 additions & 26 deletions lib/cubicle/aggregation/aggregation_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ module Cubicle
module Aggregation
class AggregationManager

attr_reader :aggregation, :metadata
attr_reader :aggregation, :metadata, :profiler

def initialize(aggregation)
@aggregation = aggregation
@metadata = Cubicle::Aggregation::CubicleMetadata.new(aggregation)
@profiler = Cubicle::Aggregation::Profiler.new(aggregation)
end

def database
Expand Down Expand Up @@ -35,7 +36,7 @@ def execute_query(query,options={})
filter = {}

if query == aggregation || query.transient?
reduction = aggregate(query,options)
reduction = aggregate(query,options.merge(:reason=>"Transient query"))
else
process_if_required
agg_data = aggregation_for(query)
Expand All @@ -45,42 +46,46 @@ def execute_query(query,options={})
if query.all_dimensions? || (agg_data.member_names - query.member_names - [:all_measures]).blank?
filter = prepare_filter(query,options[:where] || {})
else
reduction = aggregate(query,:source_collection=>agg_data.target_collection_name)
reduction = aggregate(query,:source_collection=>agg_data.target_collection_name, :reason=>"Last mile reduction - source aggregation has too many members")
end
end

if reduction.blank?
Cubicle::Data::Table.new(query,[],0)
else
count = reduction.count
results = reduction.find(filter,find_options).to_a
reduction.drop if reduction.name =~ /^tmp.mr.*/
Cubicle::Data::Table.new(query, results, count)

@profiler.measure(:find, :source=>reduction.name, :reason=>"Fetch final query results") do
count = reduction.count
results = reduction.find(filter,find_options).to_a
reduction.drop if reduction.name =~ /^tmp.mr.*/
Cubicle::Data::Table.new(query, results, count)
end

end

end

def process(options={})
Cubicle.logger.info "Processing #{aggregation.name} @ #{Time.now}"
start = Time.now
expire!
aggregate(aggregation,options)
#Sort desc by length of array, so that larget
#aggregations are processed first, hopefully increasing efficiency
#of the processing step
aggregation.aggregations.sort!{|a,b|b.length<=>a.length}
aggregation.aggregations.each do |member_list|
agg_start = Time.now
aggregation_for(aggregation.query(:defer=>true){select member_list})
Cubicle.logger.info "#{aggregation.name} aggregation #{member_list.inspect} processed in #{Time.now-agg_start} seconds"
@metadata.update_processing_stats do
expire!
aggregate(aggregation,options.merge(:reason=>"Processing fact collection"))
#Sort desc by length of array, so that larget
#aggregations are processed first, hopefully increasing efficiency
#of the processing step
aggregation.aggregations.sort!{|a,b|b.length<=>a.length}
aggregation.aggregations.each do |member_list|
agg_start = Time.now
aggregation_for(aggregation.query(:defer=>true){select member_list})
Cubicle.logger.info "#{aggregation.name} aggregation #{member_list.inspect} processed in #{Time.now-agg_start} seconds"
end
end
duration = Time.now - start
Cubicle.logger.info "#{aggregation.name} processed @ #{Time.now}in #{duration} seconds."
end

def expire!
collection.drop
@metadata.expire!
@profiler.measure(:expire_aggregations, :reason=>"Expire aggregations") do
collection.drop
@metadata.expire!
end
end

def aggregate(query,options={})
Expand All @@ -104,11 +109,22 @@ def aggregate(query,options={})
return []
end

result = database[query.source_collection_name].map_reduce(expand_template(map, view),reduce,options)
reason = options.delete(:reason) || "Unknown"
agg_info= options.delete(:aggregation_info)

ensure_indexes(target_collection,query.dimension_names) if target_collection
result = map_reduce(query.source_collection_name,expand_template(map, view),reduce,options)

result
@profiler.record_map_reduce_result(query,options,result,reason,agg_info)

@profiler.measure(:create_indexes, :target_collection=>options[:out] || "transient", :reason=>:finalize_aggregation) do
ensure_indexes(target_collection,query.dimension_names)
end if target_collection && !query.transient?

#A bug, possibly in Mongo, does not produce a count on MR collections
#sometimes, so we'll just add it from the result.
output = database[result["result"]]
output.instance_eval "def count; #{result["counts"]["output"]}; end"
output
end

protected
Expand Down Expand Up @@ -217,6 +233,29 @@ def quote_if_required(filter_value)
(filter_value.is_a?(String) || filter_value.is_a?(Symbol)) ? "'#{filter_value}'" :filter_value
end

#this is just the Mongo driver's implementation of the MapReduce
#method, but instead of returning the resulting collection,
#I'm returning the full 'results' so that I can capture
#the delicious stats contained within its delicate hash shell
def map_reduce(source_collection_name,map, reduce, opts={})

map = BSON::Code.new(map) unless map.is_a?(BSON::Code)
reduce = BSON::Code.new(reduce) unless reduce.is_a?(BSON::Code)

hash = OrderedHash.new
hash['mapreduce'] = source_collection_name
hash['map'] = map
hash['reduce'] = reduce
hash.merge! opts

result = database.command(hash)
unless result["ok"] == 1
raise Mongo::OperationFailure, "map-reduce failed: #{result['errmsg']}"
end

result
end

end
end
end
23 changes: 18 additions & 5 deletions lib/cubicle/aggregation/aggregation_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def initialize(cubicle_metadata,member_names_or_attribute_hash)
unless @attributes
@attributes = HashWithIndifferentAccess.new({:aggregation=>@cubicle_metadata.aggregation.name,
:member_names=>member_names,
:document_count=>-1})
:document_count=>-1,
:created_at=>Time.now.utc})

#materialize the aggregation, and, if the operation was successful,
#register it as available for use by future queries
Expand Down Expand Up @@ -99,10 +100,20 @@ def document_count
@attributes["document_count"]
end

def method_missing(sym,*args)
if (@attributes.keys.include?(sym.to_s))
@attributes[sym.to_s]
else
super(sym,*args)
end
end

protected
def update_document_count!(new_doc_count)
self.class.collection.update({:_id=>@attributes[:_id]}, "$set"=>{:document_count=>new_doc_count})
def update_document_stats!(new_doc_count)
timestamp = Time.now.utc
self.class.collection.update({:_id=>@attributes[:_id]}, "$set"=>{:document_count=>new_doc_count, :updated_at=>timestamp})
@attributes["document_count"]=new_doc_count
@attributes["updated_at"] = timestamp
end

def materialize!
Expand All @@ -111,9 +122,11 @@ def materialize!
:source_collection=>source_collection_name,
:defer=>true)
self.collection = @cubicle_metadata.aggregation.aggregator.aggregate(exec_query,
:target_collection=>target_collection_name)
:target_collection=>target_collection_name,
:reason=>"Materializing intermediate aggregation",
:aggregation_info=>self)
end
update_document_count!(@collection.count) unless @collection.blank?
update_document_stats!(@collection.count) unless @collection.blank?
end

end
Expand Down
36 changes: 35 additions & 1 deletion lib/cubicle/aggregation/cubicle_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def collection
end
def collection=(collection_name)
@@collection_name = collection_name
end
end
end

attr_reader :aggregation
Expand All @@ -25,6 +25,40 @@ def aggregation_for(member_names = [])
def expire!
AggregationMetadata.expire(self)
end

def update_processing_stats
Cubicle.logger.info "Processing #{aggregation.name} @ #{Time.now}"
start = Time.now
error = nil
begin
yield if block_given?
Cubicle.logger.info "#{aggregation.name} processed @ #{Time.now}."
result=:success
rescue RuntimeError=>ex
error = ex
result = :error
fail
ensure
stop = Time.now
duration = stop - start
stats = {:timestamp=>Time.now.utc,
:aggregation=>aggregation.name,
:last_duration_in_seconds=>duration,
:result=>result
}

#If an error occurred, we want to record the message, but
#not overwrite the timestamp of the last successful process.
#If all was well, we want to indicate the now is the last
#succesful processing of the aggregation.
result == :error ? stats[:last_error] = error : stats[:last_processed]=stop

self.class.collection.update({:aggregation=>aggregation.name},#criteria
{"$set"=>stats},#data
:upsert=>true)#upsert

end
end
end
end
end
80 changes: 80 additions & 0 deletions lib/cubicle/aggregation/profiler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
module Cubicle
module Aggregation
class Profiler

class << self
def enabled?
@@enabled ||= false
end
def enabled=(val)
@@enabled = val
end

def max_size_in_mb
@@max_size_in_mb = 250
end
def max_size_in_mb=(max)
@@max_size_in_mb = max
end

def collection
@@collection_name ||= "cubicle.profiler"
unless Cubicle.mongo.database.collection_names.include?(@@collection_name)
@@collection = Cubicle.mongo.database.create_collection(@@collection_name, :capped=>true, :size=>max_size_in_mb * 1000)
else
@@collection ||= Cubicle.mongo.database[@@collection_name]
end
end
def collection=(collection_name)
@@collection_name = collection_name
end

def clear!
collection.drop
end
end

#Instance methods
attr_reader :aggregation
def initialize(aggregation)
@aggregation = aggregation
end

def record_map_reduce_result(query,mr_options,result,reason,aggregation_info=nil)
record_stats(result.merge({
:action=>:map_reduce,
:source=>query.source_collection_name,
:dimensions=>query.dimensions.map{|m|m.name},
:measures=>query.dimensions.map{|m|m.name},
:query=>mr_options["query"].inspect,
:reason=>reason,
:aggregation_info_id=>aggregation_info ? aggregation_info._id : nil
}))
end

def measure(action,stats)
start = Time.now
result = yield
stop = Time.now
record_stats(stats.merge({
:action=>action,
:timeMillis=>(stop-start)*1000
}))
result
end

protected
def record_stats(stats)
return unless self.class.enabled?
#Sometimes, an instance of Cubicle::AdHoc is used in lieu of a pre-defined
#aggregation, and AdHoc doesn't have a 'name' property
name = aggregation.respond_to?(:name) ? aggregation.name : aggregation.class.name
stats.merge!({:timestamp=>Time.now.utc,
:aggregation=>name})
self.class.collection.insert(stats)
Cubicle.logger.info "Profiler trace:#{stats.inspect}"
end

end
end
end
Loading

0 comments on commit 226ac64

Please sign in to comment.