Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: TelegramSam/Dequeue
base: e85530bd57
...
head fork: TelegramSam/Dequeue
compare: b188f2ee81
  • 5 commits
  • 6 files changed
  • 0 commit comments
  • 1 contributor
View
14 README.rdoc
@@ -45,6 +45,20 @@ queue items. There are no keys or reserved values in the body of an item. Pass i
body = ["a", "b", "c"]
queue.push(body,options)
+
+=== Batch Pushing
+When you need to push more then a few items, the delay of network requests can start to add up. In these cases, you can
+add more then a single item with a single call, like so:
+
+
+ queue.batchpush("foo")
+ queue.batchpush("bar")
+
+To perform the actual push, call the batchprocess method:
+
+ queue.batchprocess()
+
+
=== Popping Items
Pop items with the pop() method. Specifying a timeout (in seconds) will override the default timeout set in
View
2  VERSION
@@ -1 +1 @@
-0.4.1
+0.5.1
View
173 lib/mongo-dequeue.rb
@@ -5,13 +5,13 @@
# heavily inspired by https://github.com/skiz/mongo_queue
class Mongo::Dequeue
- attr_reader :collection, :config
-
+ attr_reader :collection, :config, :batch
+
DEFAULT_CONFIG = {
:timeout => 300,
:default_priority => 3
}.freeze
-
+
# Create a new instance of MongoDequeue with the provided mongodb connection and optional configuration.
# See +DEFAULT_CONFIG+ for default configuration and possible configuration options.
#
@@ -23,20 +23,21 @@ class Mongo::Dequeue
def initialize(collection, opts={})
@collection = collection
@config = DEFAULT_CONFIG.merge(opts)
+ @batch = []
end
-
+
# Remove all items from the queue. Use with caution!
def flush!
collection.drop
end
-
+
# Insert a new item into the queue.
#
# Example:
# queue.insert(:name => 'Billy', :email => 'billy@example.com', :message => 'Here is the thing you asked for')
def push(body, item_opts = {})
dup_key = item_opts[:duplicate_key] || Mongo::Dequeue.generate_duplicate_key(body)
-
+
selector = {
:duplicate_key => dup_key,
:complete => false,
@@ -55,37 +56,86 @@ def push(body, item_opts = {})
},
'$inc' => {:count => 1 }
}
-
+
id = collection.update(selector, item, :upsert => true)
end
-
+
+ # add a new item into the delayed batch
+ def batchpush(body, item_opts = {})
+ @batch << {
+ :body => body,
+ :duplicate_key => item_opts[:duplicate_key] || Mongo::Dequeue.generate_duplicate_key(body),
+ :priority => item_opts[:priority] || @config[:default_priority]
+ }
+ end
+
+ def batchprocess()
+ js = %Q|
+ function(batch) {
+ var nowutc = new Date();
+ var ret = [];
+ for(i in batch){
+ e = batch[i];
+ //ret.push(e);
+ var query = {
+ 'duplicate_key': e.duplicate_key,
+ 'complete': false,
+ 'locked_at': null
+ };
+ var object = {
+ '$set': {
+ 'body': e.body,
+ 'inserted_at': nowutc,
+ 'complete': false,
+ 'locked_till': null,
+ 'completed_at': null,
+ 'priority': e.priority,
+ 'duplicate_key': e.duplicate_key,
+ 'completecount': 0
+ },
+ '$inc': {'count': 1}
+ };
+
+ db.#{collection.name}.update(query, object, true);
+ }
+ return ret;
+ }
+ |
+ cmd = BSON::OrderedHash.new
+ cmd['$eval'] = js
+ cmd['args'] = [@batch]
+ result = collection.db.command(cmd)
+ @batch.clear
+ #pp result
+ end
+
# Lock and return the next queue message if one is available. Returns nil if none are available. Be sure to
# review the README.rdoc regarding proper usage of the locking process identifier (locked_by).
# Example:
# doc = queue.pop()
-
+
# {:body=>"foo", :id=>"4e039c372b70275e345206e4"}
def pop(opts = {})
begin
timeout = opts[:timeout] || @config[:timeout]
cmd = BSON::OrderedHash.new
- cmd['findandmodify'] = collection.name
- cmd['update'] = {'$set' => {:locked_till => Time.now.utc+timeout}}
- cmd['query'] = {:complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] }
- cmd['sort'] = {:priority=>-1,:inserted_at=>1}
- cmd['limit'] = 1
- cmd['new'] = true
- result = collection.db.command(cmd)
+ cmd['findandmodify'] = collection.name
+ cmd['update'] = {'$set' => {:locked_till => Time.now.utc+timeout}}
+ cmd['query'] = {:complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] }
+ cmd['sort'] = {:priority=>-1,:inserted_at=>1}
+ cmd['limit'] = 1
+ cmd['new'] = true
+ result = collection.db.command(cmd)
rescue Mongo::OperationFailure => of
- return nil
+ return nil
end
- return {
- :body => result['value']['body'],
- :id => result['value']['_id'].to_s
- }
+ return {
+ :body => result['value']['body'],
+ :id => result['value']['_id'].to_s
+ }
end
-
+
# Remove the document from the queue. This should be called when the work is done and the document is no longer needed.
# You must provide the process identifier that the document was locked with to complete it.
def complete(id)
@@ -97,22 +147,22 @@ def complete(id)
cmd['limit'] = 1
collection.db.command(cmd)
rescue Mongo::OperationFailure => of
- #opfailure happens when item has been already completed
- return nil
+ #opfailure happens when item has been already completed
+ return nil
end
end
-
+
# Removes completed job history
def cleanup()
collection.remove({:complete=>true});
end
-
+
# Provides some information about what is in the queue. We are using an eval to ensure that a
# lock is obtained during the execution of this query so that the results are not skewed.
# please be aware that it will lock the database during the execution, so avoid using it too
# often, even though it it very tiny and should be relatively fast.
def stats
- js = "function queue_stat(){
+ js = "function queue_stat(){
return db.eval(
function(){
var nowutc = new Date();
@@ -126,43 +176,72 @@ def stats
'$reduce': function(obj, prev){prev.count += (obj.completecount - 1);},
'initial': {count: 0}
});
-
- return [a, c, t, l, rc[0] ? rc[0].count : 0];
+ var p = db.#{collection.name}.group({
+ 'key': {'priority':1},
+ 'cond': {},
+ '$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
+ 'initial': {complete: 0, waiting:0}
+ });
+ var tasks = db.#{collection.name}.group({
+ 'key': {'body.task':1},
+ 'cond': {},
+ '$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
+ 'initial': {complete: 0, waiting:0}
+ });
+
+ return [a, c, t, l, rc[0] ? rc[0].count : 0, p, tasks];
}
);
}"
- available, complete, total, locked, redundant_completes = collection.db.eval(js)
+
+ #possible additions
+
+ #db.job_queue.group({
+ #'key': {'priority':1},
+ #'cond': {},
+ #'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
+ #'initial': {complete: 0, waiting:0}
+ #});
+
+ #db.job_queue.group({
+ #'key': {'body.task':1},
+ #'cond': {},
+ #'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
+ #'initial': {complete: 0, waiting:0}
+ #});
+
+ available, complete, total, locked, redundant_completes, priority, tasks = collection.db.eval(js)
{ :locked => locked.to_i,
- :complete => complete.to_i,
- :available => available.to_i,
- :total => total.to_i,
- :redundantcompletes => redundant_completes
+ :complete => complete.to_i,
+ :available => available.to_i,
+ :total => total.to_i,
+ :redundantcompletes => redundant_completes,
+ :priority => priority,
+ :tasks => tasks
}
end
-
+
def self.generate_duplicate_key(body)
return Digest::MD5.hexdigest(body) if body.class == "String"
return Digest::MD5.hexdigest(body) if body.class == "Fixnum"
- #else
+ #else
return Digest::MD5.hexdigest(body.to_json) #won't ever match a duplicate. Need a better way to handle hashes and arrays.
end
-
+
def peek
firstfew = collection.find({
- :complete => false,
- '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}]
- },
- :sort => [[:priority, :descending],[:inserted_at, :ascending]],
- :limit => 10)
+ :complete => false,
+ '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}]
+ },
+ :sort => [[:priority, :descending],[:inserted_at, :ascending]],
+ :limit => 10)
return firstfew
end
-
-
+
protected
-
+
def value_of(result) #:nodoc:
result['okay'] == 0 ? nil : result['value']
end
-
-
+
end
View
BIN  pkg/mongo-dequeue-0.4.1.gem
Binary file not shown
View
BIN  pkg/mongo-dequeue-0.5.0.gem
Binary file not shown
View
35 spec/mongo_dequeue_spec.rb
@@ -37,7 +37,7 @@ def insert_and_inspect(body, options={})
end
describe "Inserting a standard Job" do
- before(:each) do
+ before(:all) do
@item = insert_and_inspect({:message => 'MongoQueueSpec', :foo => 5})
end
@@ -70,6 +70,26 @@ def insert_and_inspect(body, options={})
@item['body']['foo'].should be(5)
end
end
+
+ describe "bulk inserting multiple jobs" do
+ before(:all) do
+ @queue.batchpush({:message => 'MongoQueueSpec1', :foo => 5})
+ @queue.batchpush({:message => 'MongoQueueSpec2', :foo => 5})
+ @queue.batchpush({:message => 'MongoQueueSpec3', :foo => 5})
+ end
+
+ it "should correctly count items in batch" do
+ @queue.batch.length.should be(3)
+ end
+
+ it "should correctly add items on process" do
+ @queue.batchprocess()
+ @queue.send(:collection).count.should == 3
+ @queue.batch.length.should == 0
+ end
+
+ end
+
describe "Inserting different body types" do
before(:each) do
@@ -261,7 +281,7 @@ def insert_and_inspect(body, options={})
@b = insert_and_inspect("b")
@c = insert_and_inspect("c")
@d = insert_and_inspect("d")
- @e = insert_and_inspect("e")
+ @e = insert_and_inspect({:task => "foo"})
@ap = @queue.pop(:timeout => 1)
@bp = @queue.pop
@@ -289,6 +309,17 @@ def insert_and_inspect(body, options={})
it "should count redundant completes" do
@stats[:redundantcompletes].should == 0
end
+ it "should count priorities" do
+ #pp @stats[:priority]
+ @stats[:priority].should == [{"priority"=>3.0, "complete"=>1.0, "waiting"=>4.0}]
+ end
+ it "should count tasks" do
+ #pp @stats[:tasks]
+ @stats[:tasks].should == [
+ {"body.task"=>nil, "complete"=>1.0, "waiting"=>3.0},
+ {"body.task"=>"foo", "complete"=>0.0, "waiting"=>1.0}
+ ]
+ end
end

No commit comments for this range

Something went wrong with that request. Please try again.