Skip to content

Commit

Permalink
Merge branch 'bulkpush'
Browse files Browse the repository at this point in the history
  • Loading branch information
TelegramSam committed Aug 26, 2011
2 parents e85530b + a062bd6 commit 91a647f
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 50 deletions.
14 changes: 14 additions & 0 deletions README.rdoc
Expand Up @@ -45,6 +45,20 @@ queue items. There are no keys or reserved values in the body of an item. Pass i
body = ["a", "b", "c"]

queue.push(body,options)

=== Batch Pushing
When you need to push more then a few items, the delay of network requests can start to add up. In these cases, you can
add more then a single item with a single call, like so:


queue.batchpush("foo")
queue.batchpush("bar")

To perform the actual push, call the batchprocess method:

queue.batchprocess()



=== Popping Items
Pop items with the pop() method. Specifying a timeout (in seconds) will override the default timeout set in
Expand Down
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.4.1
0.5.0
173 changes: 126 additions & 47 deletions lib/mongo-dequeue.rb
Expand Up @@ -5,13 +5,13 @@
# heavily inspired by https://github.com/skiz/mongo_queue

class Mongo::Dequeue
attr_reader :collection, :config
attr_reader :collection, :config, :batch

DEFAULT_CONFIG = {
:timeout => 300,
:default_priority => 3
}.freeze

# Create a new instance of MongoDequeue with the provided mongodb connection and optional configuration.
# See +DEFAULT_CONFIG+ for default configuration and possible configuration options.
#
Expand All @@ -23,20 +23,21 @@ class Mongo::Dequeue
def initialize(collection, opts={})
@collection = collection
@config = DEFAULT_CONFIG.merge(opts)
@batch = []
end

# Remove all items from the queue. Use with caution!
def flush!
collection.drop
end

# Insert a new item into the queue.
#
# Example:
# queue.insert(:name => 'Billy', :email => 'billy@example.com', :message => 'Here is the thing you asked for')
def push(body, item_opts = {})
dup_key = item_opts[:duplicate_key] || Mongo::Dequeue.generate_duplicate_key(body)

selector = {
:duplicate_key => dup_key,
:complete => false,
Expand All @@ -55,37 +56,86 @@ def push(body, item_opts = {})
},
'$inc' => {:count => 1 }
}

id = collection.update(selector, item, :upsert => true)
end


# add a new item into the delayed batch
def batchpush(body, item_opts = {})
@batch << {
:body => body,
:duplicate_key => item_opts[:duplicate_key] || Mongo::Dequeue.generate_duplicate_key(body),
:priority => item_opts[:priority] || @config[:default_priority]
}
end

def batchprocess()
js = %Q|
function(batch) {
var nowutc = new Date();
var ret = [];
for(i in batch){
e = batch[i];
//ret.push(e);
var query = {
'duplicate_key': e.duplicate_key,
'complete': false,
'locked_at': null
};
var object = {
'$set': {
'body': e.body,
'inserted_at': nowutc,
'complete': false,
'locked_till': null,
'completed_at': null,
'priority': e.priority,
'duplicate_key': e.duplicate_key,
'completecount': 0
},
'$inc': {'count': 1}
};
db.#{collection.name}.update(query, object, true);
}
return ret;
}
|
cmd = BSON::OrderedHash.new
cmd['$eval'] = js
cmd['args'] = [@batch]
result = collection.db.command(cmd)
@batch.clear
#pp result
end

# Lock and return the next queue message if one is available. Returns nil if none are available. Be sure to
# review the README.rdoc regarding proper usage of the locking process identifier (locked_by).
# Example:
# doc = queue.pop()

# {:body=>"foo", :id=>"4e039c372b70275e345206e4"}

def pop(opts = {})
begin
timeout = opts[:timeout] || @config[:timeout]
cmd = BSON::OrderedHash.new
cmd['findandmodify'] = collection.name
cmd['update'] = {'$set' => {:locked_till => Time.now.utc+timeout}}
cmd['query'] = {:complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] }
cmd['sort'] = {:priority=>-1,:inserted_at=>1}
cmd['limit'] = 1
cmd['new'] = true
result = collection.db.command(cmd)
cmd['findandmodify'] = collection.name
cmd['update'] = {'$set' => {:locked_till => Time.now.utc+timeout}}
cmd['query'] = {:complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] }
cmd['sort'] = {:priority=>-1,:inserted_at=>1}
cmd['limit'] = 1
cmd['new'] = true
result = collection.db.command(cmd)
rescue Mongo::OperationFailure => of
return nil
return nil
end
return {
:body => result['value']['body'],
:id => result['value']['_id'].to_s
}
return {
:body => result['value']['body'],
:id => result['value']['_id'].to_s
}
end

# Remove the document from the queue. This should be called when the work is done and the document is no longer needed.
# You must provide the process identifier that the document was locked with to complete it.
def complete(id)
Expand All @@ -97,22 +147,22 @@ def complete(id)
cmd['limit'] = 1
collection.db.command(cmd)
rescue Mongo::OperationFailure => of
#opfailure happens when item has been already completed
return nil
#opfailure happens when item has been already completed
return nil
end
end

# Removes completed job history
def cleanup()
collection.remove({:complete=>true});
end

# Provides some information about what is in the queue. We are using an eval to ensure that a
# lock is obtained during the execution of this query so that the results are not skewed.
# please be aware that it will lock the database during the execution, so avoid using it too
# often, even though it it very tiny and should be relatively fast.
def stats
js = "function queue_stat(){
js = "function queue_stat(){
return db.eval(
function(){
var nowutc = new Date();
Expand All @@ -126,43 +176,72 @@ def stats
'$reduce': function(obj, prev){prev.count += (obj.completecount - 1);},
'initial': {count: 0}
});
return [a, c, t, l, rc[0] ? rc[0].count : 0];
var p = db.#{collection.name}.group({
'key': {'priority':1},
'cond': {},
'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
'initial': {complete: 0, waiting:0}
});
var tasks = db.#{collection.name}.group({
'key': {'body.task':1},
'cond': {},
'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
'initial': {complete: 0, waiting:0}
});
return [a, c, t, l, rc[0] ? rc[0].count : 0, p, tasks];
}
);
}"
available, complete, total, locked, redundant_completes = collection.db.eval(js)

#possible additions

#db.job_queue.group({
#'key': {'priority':1},
#'cond': {},
#'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
#'initial': {complete: 0, waiting:0}
#});

#db.job_queue.group({
#'key': {'body.task':1},
#'cond': {},
#'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
#'initial': {complete: 0, waiting:0}
#});

available, complete, total, locked, redundant_completes, priority, tasks = collection.db.eval(js)
{ :locked => locked.to_i,
:complete => complete.to_i,
:available => available.to_i,
:total => total.to_i,
:redundantcompletes => redundant_completes
:complete => complete.to_i,
:available => available.to_i,
:total => total.to_i,
:redundantcompletes => redundant_completes,
:priority => priority,
:tasks => tasks
}
end

def self.generate_duplicate_key(body)
return Digest::MD5.hexdigest(body) if body.class == "String"
return Digest::MD5.hexdigest(body) if body.class == "Fixnum"
#else
#else
return Digest::MD5.hexdigest(body.to_json) #won't ever match a duplicate. Need a better way to handle hashes and arrays.
end

def peek
firstfew = collection.find({
:complete => false,
'$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}]
},
:sort => [[:priority, :descending],[:inserted_at, :ascending]],
:limit => 10)
:complete => false,
'$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}]
},
:sort => [[:priority, :descending],[:inserted_at, :ascending]],
:limit => 10)
return firstfew
end



protected

def value_of(result) #:nodoc:
result['okay'] == 0 ? nil : result['value']
end



end
Binary file added pkg/mongo-dequeue-0.4.1.gem
Binary file not shown.
35 changes: 33 additions & 2 deletions spec/mongo_dequeue_spec.rb
Expand Up @@ -37,7 +37,7 @@ def insert_and_inspect(body, options={})
end

describe "Inserting a standard Job" do
before(:each) do
before(:all) do
@item = insert_and_inspect({:message => 'MongoQueueSpec', :foo => 5})
end

Expand Down Expand Up @@ -70,6 +70,26 @@ def insert_and_inspect(body, options={})
@item['body']['foo'].should be(5)
end
end

describe "bulk inserting multiple jobs" do
before(:all) do
@queue.batchpush({:message => 'MongoQueueSpec1', :foo => 5})
@queue.batchpush({:message => 'MongoQueueSpec2', :foo => 5})
@queue.batchpush({:message => 'MongoQueueSpec3', :foo => 5})
end

it "should correctly count items in batch" do
@queue.batch.length.should be(3)
end

it "should correctly add items on process" do
@queue.batchprocess()
@queue.send(:collection).count.should == 3
@queue.batch.length.should == 0
end

end


describe "Inserting different body types" do
before(:each) do
Expand Down Expand Up @@ -261,7 +281,7 @@ def insert_and_inspect(body, options={})
@b = insert_and_inspect("b")
@c = insert_and_inspect("c")
@d = insert_and_inspect("d")
@e = insert_and_inspect("e")
@e = insert_and_inspect({:task => "foo"})

@ap = @queue.pop(:timeout => 1)
@bp = @queue.pop
Expand Down Expand Up @@ -289,6 +309,17 @@ def insert_and_inspect(body, options={})
it "should count redundant completes" do
@stats[:redundantcompletes].should == 0
end
it "should count priorities" do
#pp @stats[:priority]
@stats[:priority].should == [{"priority"=>3.0, "complete"=>1.0, "waiting"=>4.0}]
end
it "should count tasks" do
#pp @stats[:tasks]
@stats[:tasks].should == [
{"body.task"=>nil, "complete"=>1.0, "waiting"=>3.0},
{"body.task"=>"foo", "complete"=>0.0, "waiting"=>1.0}
]
end


end
Expand Down

0 comments on commit 91a647f

Please sign in to comment.