Permalink
Browse files

use thread local variable for redis connection so we can block and av…

…oid polling
  • Loading branch information...
1 parent 6d97652 commit 5898ee0e85c4c7c5dd57e767f73f52c38a4ea889 Scott Reis committed Oct 28, 2011
Showing with 23 additions and 28 deletions.
  1. +16 −16 lib/anemone/queue/redis.rb
  2. +7 −12 spec/queue_spec.rb
@@ -10,43 +10,43 @@ module Queue
class Redis
def initialize(opts = {})
- @redis = ::Redis.new(opts)
- @list = "#{opts[:key_prefix] || 'anemone'}:#{self.hash.abs}"
+ @opts = opts
+ @list = "#{@opts[:key_prefix] || 'anemone'}:#{self.hash.abs}"
@waiting = "#{@list}:waiting"
- @timeout = opts[:timeout] || 10
clear
end
def <<(job)
- @redis.lpush(@list,job.to_json)
+ redis.lpush(@list,job.to_json)
end
def deq
- json = @redis.rpop(@list)
- if json.nil?
- @redis.incr(@waiting)
- until json = @redis.rpop(@list)
- sleep(@timeout)
- end
- @redis.decr(@waiting)
- end
- JSON.parse(json) rescue nil
+ redis.incr(@waiting)
+ job = redis.brpop(@list, @opts[:timeout] || 0)
+ redis.decr(@waiting)
+ JSON.parse(job.last) rescue nil
end
def empty?
size == 0
end
def size
- @redis.llen(@list)
+ redis.llen(@list)
end
def num_waiting
- @redis.get(@waiting).to_i
+ redis.get(@waiting).to_i
end
def clear
- @redis.del(@list, @waiting)
+ redis.del(@list, @waiting)
+ end
+
+ private
+
+ def redis
+ Thread.current[:redis] ||= ::Redis.new(@opts)
end
end
View
@@ -56,18 +56,13 @@ module Queue
@queue.num_waiting.should == 0
- threads = []
- 2.times { threads << Thread.new { @queue.deq } }
- sleep(0.2)
- @queue.num_waiting.should == 2
+ 3.times { Thread.new { @queue.deq } }
- @queue << test_data
- sleep(0.2)
- @queue.num_waiting.should == 1
-
- @queue << test_data
- sleep(0.2)
- @queue.num_waiting.should == 0
+ [3,2,1,0].each do |n|
+ sleep(0.2)
+ @queue.num_waiting.should == n
+ @queue << test_data
+ end
end
it 'should implement clear' do
@@ -91,7 +86,7 @@ module Queue
describe Redis do
it_should_behave_like :an_adapter
- before(:all) { @queue = Queue.Redis(:timeout => 0.1) }
+ before(:all) { @queue = Queue.Redis }
after(:each) { @queue.clear }
after(:all) { @queue = nil }

0 comments on commit 5898ee0

Please sign in to comment.