Permalink
Browse files

create new redis client in lock refresh thread

Change-Id: I73f4efff1a7fda8bdc05b29eb216031137622fa8
  • Loading branch information...
1 parent 1d5dafe commit 554be907a098a9695116c6f1ecbf4acf083b203d @andl andl committed Apr 10, 2012
View
BIN atmos/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
41 base/lib/base/job/lock.rb
@@ -20,6 +20,7 @@ def initialize(name, opts={})
config = Config.redis_config
raise "Can't find configuration of redis." unless config
@redis = ::Redis.new(config)
+ @released_thread = {}
end
def make_logger
@@ -36,7 +37,7 @@ def lock
existing_lock = @redis.get(@name)
if existing_lock.to_f < Time.now.to_f
@logger.debug("Lock #{@name} is expired, trying to acquire it.")
- break if watch_and_update(expiration)
+ break if watch_and_update(@redis, expiration)
end
raise ServiceError.new(ServiceError::JOB_QUEUE_TIMEOUT, @timeout)if Time.now.to_f - started > @timeout
@@ -53,15 +54,15 @@ def lock
begin
yield if block_given?
ensure
- refresh_thread.exit
+ release_thread(refresh_thread) if refresh_thread
delete
end
end
- def watch_and_update(expiration)
- @redis.watch(@name)
- res = @redis.multi do
- @redis.set(@name, expiration)
+ def watch_and_update(redis, expiration)
+ redis.watch(@name)
+ res = redis.multi do
+ redis.set(@name, expiration)
end
if res
@logger.debug("Lock #{@name} is renewed and acquired.")
@@ -71,27 +72,41 @@ def watch_and_update(expiration)
res
end
+ def release_thread t
+ @released_thread[t.object_id] = true
+ end
+
+ def released?
+ @released_thread[Thread.current.object_id]
+ end
+
def setup_refresh_thread
t = Thread.new do
+ redis = ::Redis.new(Config.redis_config)
sleep_interval = [1.0, @expiration/2].max
begin
loop do
+ break if released?
@logger.debug("Renewing lock #{@name}")
- @redis.watch(@name)
- existing_lock = @redis.get(@name)
+ redis.watch(@name)
+ existing_lock = redis.get(@name)
break if existing_lock.to_f > @lock_expiration # lock has been updated by others
expiration = Time.now.to_f + @expiration + 1
- break unless watch_and_update(expiration)
+ break unless watch_and_update(redis, expiration)
@lock_expiration = expiration
-
- sleep(sleep_interval)
+ sleep sleep_interval
end
rescue => e
@logger.error("Can't renew lock #{@name}, #{e}")
ensure
- @logger.debug("Lock renew thread for #{@name} exited.")
- @redis.quit
+ begin
+ @logger.debug("Lock renew thread for #{@name} exited.")
+ redis.quit
+ rescue => e
+ # just logging, ignore error
+ @logger.debug("Ignore error when quit: #{e}")
+ end
end
end
t
View
17 base/spec/job_spec.rb
@@ -85,4 +85,21 @@
ran_once.should be_true
(@stored_value == start - 1).should_not be_true
end
+
+ it "should not update expiration time after the lock is released" do
+ start = Time.now.to_f
+
+ @redis.should_receive(:watch).with(@name).any_number_of_times
+ @redis.should_receive(:multi).any_number_of_times.and_yield
+
+ expiration = 0.5
+ lock = VCAP::Services::Base::AsyncJob::Lock.new(@name,:timeout => @timeout, :expiration => expiration, :logger => @logger)
+
+ ran_once = false
+ lock.lock{ran_once = true; sleep expiration *2 }
+
+ current_value = @stored_value
+ sleep expiration * 2
+ current_value.should == @stored_value
+ end
end
View
BIN filesystem/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN mongodb/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN mysql/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN neo4j/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN postgresql/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN rabbit/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN redis/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN service_broker/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN tools/backup/manager/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.
View
BIN vblob/vendor/cache/vcap_services_base-0.1.9.gem
Binary file not shown.

0 comments on commit 554be90

Please sign in to comment.