Permalink
Browse files

add snapshot quota and delete snapshot job

also update async job framwork:
1. Unify job related namespace to VCAP::Services::Base::AsyncJob
2. Update resque and resque-status gem
3. Move common job logic into base.

Change-Id: I2dac7d2d2b5ed20003926a607f4fdaf96096e33e
  • Loading branch information...
1 parent 1751d84 commit 64faf5e3ffe847d9c6c795f4591b3f932491f83d @andl andl committed Mar 15, 2012
View
@@ -10,7 +10,8 @@ PATH
eventmachine_httpserver (~> 0.2.1)
json (~> 1.4.6)
nats (~> 0.4.22.beta.8)
- resque-status (~> 0.2.4)
+ resque (~> 1.20)
+ resque-status (~> 0.3.2)
ruby-hmac (~> 0.4.0)
sinatra (~> 1.2.3)
thin (~> 1.3.1)
@@ -21,14 +22,14 @@ PATH
GEM
remote: http://rubygems.org/
specs:
- addressable (2.2.6)
+ addressable (2.2.7)
bcrypt-ruby (2.1.4)
builder (3.0.0)
ci_reporter (1.6.4)
builder (>= 2.1.2)
curb (0.7.16)
daemons (1.1.8)
- data_objects (0.10.7)
+ data_objects (0.10.8)
addressable (~> 2.1)
datamapper (1.1.0)
dm-aggregates (= 1.1.0)
@@ -66,8 +67,8 @@ GEM
uuidtools (~> 2.1.2)
dm-validations (1.1.0)
dm-core (~> 1.1.0)
- do_sqlite3 (0.10.7)
- data_objects (= 0.10.7)
+ do_sqlite3 (0.10.8)
+ data_objects (= 0.10.8)
em-http-request (0.3.0)
addressable (>= 2.0.0)
escape_utils
@@ -81,7 +82,7 @@ GEM
macaddr (1.5.0)
systemu (>= 2.4.0)
multi_json (1.0.4)
- nats (0.4.22.beta.8)
+ nats (0.4.22)
daemons (>= 1.1.4)
eventmachine (>= 0.12.10)
json_pure (>= 1.6.1)
@@ -96,15 +97,15 @@ GEM
redisk (0.2.2)
redis (>= 0.1.1)
redis-namespace (>= 0.1.0)
- resque (1.19.0)
+ resque (1.20.0)
multi_json (~> 1.0)
redis-namespace (~> 1.0.2)
sinatra (>= 0.9.2)
vegas (~> 0.1.2)
- resque-status (0.2.4)
+ resque-status (0.3.2)
redisk (>= 0.2.1)
- resque (>= 1.3.1)
- uuid (>= 2.0.2)
+ resque (~> 1.19)
+ uuid (~> 2.3)
rspec (2.5.0)
rspec-core (~> 2.5.0)
rspec-expectations (~> 2.5.0)
@@ -124,13 +125,13 @@ GEM
rack (~> 1.1)
tilt (>= 1.2.2, < 2.0)
stringex (1.2.2)
- systemu (2.4.2)
+ systemu (2.5.0)
thin (1.3.1)
daemons (>= 1.0.9)
eventmachine (>= 0.12.6)
rack (>= 1.0.0)
tilt (1.2.2)
- uuid (2.3.4)
+ uuid (2.3.5)
macaddr (~> 1.0)
uuidtools (2.1.2)
vcap_common (1.0.10)
@@ -141,7 +142,7 @@ GEM
yajl-ruby (~> 0.8.3)
vcap_logging (1.0.0)
rake
- vegas (0.1.8)
+ vegas (0.1.11)
rack (>= 1.0.0)
yajl-ruby (0.8.3)
@@ -287,6 +287,22 @@ def check_orphan(handles, callback, errback)
async_mode
end
+ # Delete a snapshot
+ delete "/gateway/v1/configurations/:service_id/snapshots/:snapshot_id" do
+ not_impl unless @api_extensions.include? "snapshots"
+ service_id = params["service_id"]
+ snapshot_id = params["snapshot_id"]
+ @logger.info("Delete service_id=#{service_id} to snapshot_id=#{snapshot_id}")
+ @provisioner.delete_snapshot(service_id, snapshot_id) do |msg|
+ if msg['success']
+ async_reply(VCAP::Services::Api::Job.new(msg['response']).encode)
+ else
+ async_reply_error(msg['response'])
+ end
+ end
+ async_mode
+ end
+
# Get serialized url
get "/gateway/v1/configurations/:service_id/serialized/url" do
not_impl unless @api_extensions.include? "serialization"
@@ -0,0 +1,5 @@
+# Copyright (c) 2009-2011 VMware, Inc.
+require 'job/config'
+require 'job/snapshot'
+require 'job/serialization'
+require 'job/lock'
View
@@ -13,6 +13,7 @@
$LOAD_PATH.unshift File.dirname(__FILE__)
require 'asynchronous_service_gateway'
+require 'job/config'
require 'abstract'
module VCAP
@@ -57,6 +58,15 @@ def setup_vcap_logging
@config[:logger] = logger
end
+ def setup_async_job_config
+ resque = @config[:resque]
+ if resque
+ resque = VCAP.symbolize_keys(resque)
+ VCAP::Services::Base::AsyncJob::Config.redis_config = resque
+ VCAP::Services::Base::AsyncJob::Config.logger = @config[:logger]
+ end
+ end
+
def setup_pid
if @config[:pid]
pf = VCAP::PidFile.new(@config[:pid])
@@ -71,6 +81,8 @@ def start
setup_pid
+ setup_async_job_config
+
@config[:host] = VCAP.local_ip(@config[:ip_route])
@config[:port] ||= VCAP.grab_ephemeral_port
@config[:service][:label] = "#{@config[:service][:name]}-#{@config[:service][:version]}"
@@ -1,6 +1,8 @@
# Copyright (c) 2009-2011 VMware, Inc.
-require "resque/job_with_status"
+require "resque-status"
+$LOAD_PATH.unshift File.dirname(__FILE__)
+require "config"
$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..')
require "service_error"
@@ -13,56 +15,34 @@ def enqueue(klass, *args)
enqueue_to(queue, klass, *args)
end
- # Backport from resque master branch, we can remove this method when gem is updated.
- def enqueue_to(queue, klass, *args)
- # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
- before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
- klass.send(hook, *args)
- end
- return nil if before_hooks.any? { |result| result == false }
-
- Job.create(queue, klass, *args)
-
- Plugin.after_enqueue_hooks(klass).each do |hook|
- klass.send(hook, *args)
- end
-
- return true
- end
-
- class Status
- # new attributes
- hash_accessor :complete_time
- end
end
-module VCAP
- module Services
- end
+module Resque::Plugins::Status
+ class Hash
+ # new attributes
+ hash_accessor :complete_time
+ end
end
# A thin layer wraps resque-status
-module VCAP::Services::AsyncJob
+module VCAP::Services::Base::AsyncJob
include VCAP::Services::Base::Error
- def self.logger=(logger)
- @logger = logger
- end
-
- def job_repo_setup(options={})
- raise "AsyncJob requires redis configuration." unless options[:redis]
- @logger.debug("Initialize Resque using #{options}")
- Resque.redis = options[:redis]
- Resque::Status.expire_in = options[:expire] if options[:expire]
+ def job_repo_setup
+ redis = Config.redis
+ @logger = Config.logger
+ raise "AsyncJob requires redis configuration." unless redis
+ @logger.debug("Initialize Resque using #{redis}") if @logger
+ ::Resque.redis = redis
end
def get_job(jobid)
- res = Resque::Status.get(jobid)
+ res = Resque::Plugins::Status::Hash.get(jobid)
job_to_json(res)
end
def get_all_jobs()
- Resque::Status.status_ids
+ Resque::Plugins::Status::Hash.keys
end
def job_to_json(job)
@@ -0,0 +1,27 @@
+# Copyright (c) 2009-2012 VMware, Inc.
+require "redis"
+require "resque"
+
+module VCAP
+ module Services
+ module Base
+ module AsyncJob
+ end
+ end
+ end
+end
+
+class VCAP::Services::Base::AsyncJob::Config
+ class << self
+ attr_reader :redis_config, :redis, :logger
+ def redis_config=(config)
+ @redis_config = config
+ @redis = ::Redis.new config
+ Resque.redis = @redis
+ end
+
+ def logger=(logger)
+ @logger = logger
+ end
+ end
+end
View
@@ -0,0 +1,116 @@
+# Copyright (c) 2009-2011 VMware, Inc.
+require "logger"
+require "redis"
+
+$LOAD_PATH.unshift File.dirname(__FILE__)
+require "config"
+
+# redis locking primitive using setnx.
+# http://redis.io/commands/setnx
+module VCAP::Services::Base::AsyncJob
+ class Lock
+ attr_reader :expiration, :timeout, :name
+ include VCAP::Services::Base::Error
+
+ def initialize(name, opts={})
+ @name = name
+ @timeout = opts[:timeout] || 10 #seconds
+ @expiration = opts[:expiration] || 10 # seconds
+ @logger = opts[:logger] || make_logger
+ config = Config.redis_config
+ raise "Can't find configuration of redis." unless config
+ @redis = ::Redis.new(config)
+ end
+
+ def make_logger
+ logger = Logger.new(STDOUT)
+ logger.level = Logger::ERROR
+ logger
+ end
+
+ def lock
+ @logger.debug("Acquiring lock: #{@name}")
+ started = Time.now.to_f
+ expiration = started.to_f + @expiration + 1
+ until @redis.setnx(@name, expiration)
+ existing_lock = @redis.get(@name)
+ if existing_lock.to_f < Time.now.to_f
+ @logger.debug("Lock #{@name} is expired, trying to acquire it.")
+ break if watch_and_update(expiration)
+ end
+
+ raise ServiceError.new(ServiceError::JOB_QUEUE_TIMEOUT, @timeout)if Time.now.to_f - started > @timeout
+
+ sleep(1)
+
+ expiration = Time.now.to_f + @expiration + 1
+ end
+
+ @lock_expiration = expiration
+ refresh_thread = setup_refresh_thread
+ @logger.debug("Lock #{@name} is acquired, will expire at #{@lock_expiration}")
+
+ begin
+ yield if block_given?
+ ensure
+ refresh_thread.exit
+ delete
+ end
+ end
+
+ def watch_and_update(expiration)
+ @redis.watch(@name)
+ res = @redis.multi do
+ @redis.set(@name, expiration)
+ end
+ if res
+ @logger.debug("Lock #{@name} is renewed and acquired.")
+ else
+ @logger.debug("Lock #{@name} was updated by others.")
+ end
+ res
+ end
+
+ def setup_refresh_thread
+ t = Thread.new do
+ sleep_interval = [1.0, @expiration/2].max
+ begin
+ loop do
+ @logger.debug("Renewing lock #{@name}")
+ @redis.watch(@name)
+ existing_lock = @redis.get(@name)
+
+ break if existing_lock.to_f > @lock_expiration # lock has been updated by others
+ expiration = Time.now.to_f + @expiration + 1
+ break unless watch_and_update(expiration)
+ @lock_expiration = expiration
+
+ sleep(sleep_interval)
+ end
+ rescue => e
+ @logger.error("Can't renew lock #{@name}, #{e}")
+ ensure
+ @logger.debug("Lock renew thread for #{@name} exited.")
+ @redis.quit
+ end
+ end
+ t
+ end
+
+ def delete
+ @logger.debug("Deleting lock: #{@name}")
+ existing_lock = @redis.get(@name)
+ @logger.debug("Lock #{@name} is acquired by others.")if existing_lock.to_f > @lock_expiration
+ @redis.watch(@name)
+ res = @redis.multi do
+ @redis.del(@name)
+ end
+ if res
+ @logger.debug("Lock #{@name} is deleted.")
+ else
+ @logger.debug("Lock #{@name} is acquired by others.")
+ end
+ true
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit 64faf5e

Please sign in to comment.