Permalink
Browse files

create new redis client in lock refresh thread

Change-Id: I73f4efff1a7fda8bdc05b29eb216031137622fa8
  • Loading branch information...
1 parent 1d5dafe commit 554be907a098a9695116c6f1ecbf4acf083b203d @andl andl committed Apr 10, 2012
Binary file not shown.
@@ -20,6 +20,7 @@ def initialize(name, opts={})
config = Config.redis_config
raise "Can't find configuration of redis." unless config
@redis = ::Redis.new(config)
+ @released_thread = {}
end
def make_logger
@@ -36,7 +37,7 @@ def lock
existing_lock = @redis.get(@name)
if existing_lock.to_f < Time.now.to_f
@logger.debug("Lock #{@name} is expired, trying to acquire it.")
- break if watch_and_update(expiration)
+ break if watch_and_update(@redis, expiration)
end
raise ServiceError.new(ServiceError::JOB_QUEUE_TIMEOUT, @timeout)if Time.now.to_f - started > @timeout
@@ -53,15 +54,15 @@ def lock
begin
yield if block_given?
ensure
- refresh_thread.exit
+ release_thread(refresh_thread) if refresh_thread
delete
end
end
- def watch_and_update(expiration)
- @redis.watch(@name)
- res = @redis.multi do
- @redis.set(@name, expiration)
+ def watch_and_update(redis, expiration)
+ redis.watch(@name)
+ res = redis.multi do
+ redis.set(@name, expiration)
end
if res
@logger.debug("Lock #{@name} is renewed and acquired.")
@@ -71,27 +72,41 @@ def watch_and_update(expiration)
res
end
+ def release_thread t
+ @released_thread[t.object_id] = true
+ end
+
+ def released?
+ @released_thread[Thread.current.object_id]
+ end
+
def setup_refresh_thread
t = Thread.new do
+ redis = ::Redis.new(Config.redis_config)
sleep_interval = [1.0, @expiration/2].max
begin
loop do
+ break if released?
@logger.debug("Renewing lock #{@name}")
- @redis.watch(@name)
- existing_lock = @redis.get(@name)
+ redis.watch(@name)
+ existing_lock = redis.get(@name)
break if existing_lock.to_f > @lock_expiration # lock has been updated by others
expiration = Time.now.to_f + @expiration + 1
- break unless watch_and_update(expiration)
+ break unless watch_and_update(redis, expiration)
@lock_expiration = expiration
-
- sleep(sleep_interval)
+ sleep sleep_interval
end
rescue => e
@logger.error("Can't renew lock #{@name}, #{e}")
ensure
- @logger.debug("Lock renew thread for #{@name} exited.")
- @redis.quit
+ begin
+ @logger.debug("Lock renew thread for #{@name} exited.")
+ redis.quit
+ rescue => e
+ # just logging, ignore error
+ @logger.debug("Ignore error when quit: #{e}")
+ end
end
end
t
View
@@ -85,4 +85,21 @@
ran_once.should be_true
(@stored_value == start - 1).should_not be_true
end
+
+ it "should not update expiration time after the lock is released" do
+ start = Time.now.to_f
+
+ @redis.should_receive(:watch).with(@name).any_number_of_times
+ @redis.should_receive(:multi).any_number_of_times.and_yield
+
+ expiration = 0.5
+ lock = VCAP::Services::Base::AsyncJob::Lock.new(@name,:timeout => @timeout, :expiration => expiration, :logger => @logger)
+
+ ran_once = false
+ lock.lock{ran_once = true; sleep expiration *2 }
+
+ current_value = @stored_value
+ sleep expiration * 2
+ current_value.should == @stored_value
+ end
end
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 554be90

Please sign in to comment.