Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

move lifecycle framework into base

including async job, snapshot and serilized

Change-Id: I35fa7726c4f237b42edf346716c5c29e46dd440c
  • Loading branch information...
commit 31105b71cab17feabeca4cc8a706bccd442a10b1 1 parent 44823ca
@andl andl authored
View
32 Gemfile.lock
@@ -1,7 +1,8 @@
PATH
remote: .
specs:
- vcap_services_base (0.1.0)
+ vcap_services_base (0.1.1)
+ curb (~> 0.7.16)
datamapper (~> 1.1.0)
do_sqlite3 (~> 0.10.3)
em-http-request (~> 0.3.0)
@@ -9,6 +10,7 @@ PATH
eventmachine_httpserver (~> 0.2.1)
json (~> 1.4.6)
nats (~> 0.4.10)
+ resque-status (~> 0.2.4)
ruby-hmac (~> 0.4.0)
sinatra (~> 1.2.3)
thin (~> 1.2.11)
@@ -24,7 +26,8 @@ GEM
builder (3.0.0)
ci_reporter (1.6.4)
builder (>= 2.1.2)
- daemons (1.1.4)
+ curb (0.7.16)
+ daemons (1.1.5)
data_objects (0.10.7)
addressable (~> 2.1)
datamapper (1.1.0)
@@ -78,7 +81,10 @@ GEM
little-plugger (1.1.3)
logging (1.6.1)
little-plugger (>= 1.1.2)
- nats (0.4.12)
+ macaddr (1.5.0)
+ systemu (>= 2.4.0)
+ multi_json (1.0.4)
+ nats (0.4.10)
daemons (>= 1.1.0)
eventmachine (>= 0.12.10)
json_pure (>= 1.5.1)
@@ -86,6 +92,21 @@ GEM
rack (1.2.2)
rake (0.8.7)
rcov (0.9.9)
+ redis (2.2.2)
+ redis-namespace (1.0.3)
+ redis (< 3.0.0)
+ redisk (0.2.2)
+ redis (>= 0.1.1)
+ redis-namespace (>= 0.1.0)
+ resque (1.19.0)
+ multi_json (~> 1.0)
+ redis-namespace (~> 1.0.2)
+ sinatra (>= 0.9.2)
+ vegas (~> 0.1.2)
+ resque-status (0.2.4)
+ redisk (>= 0.2.1)
+ resque (>= 1.3.1)
+ uuid (>= 2.0.2)
rspec (2.5.0)
rspec-core (~> 2.5.0)
rspec-expectations (~> 2.5.0)
@@ -99,11 +120,14 @@ GEM
rack (~> 1.1)
tilt (>= 1.2.2, < 2.0)
stringex (1.2.2)
+ systemu (2.4.2)
thin (1.2.11)
daemons (>= 1.0.9)
eventmachine (>= 0.12.6)
rack (>= 1.0.0)
tilt (1.2.2)
+ uuid (2.3.4)
+ macaddr (~> 1.0)
uuidtools (2.1.2)
vcap_common (1.0.2)
eventmachine (~> 0.12.11.cloudfoundry.3)
@@ -113,6 +137,8 @@ GEM
thin (~> 1.2.11)
yajl-ruby (~> 0.8.3)
vcap_logging (0.1.3)
+ vegas (0.1.8)
+ rack (>= 1.0.0)
yajl-ruby (0.8.3)
PLATFORMS
View
87 lib/base/job/async_job.rb
@@ -0,0 +1,87 @@
+# Copyright (c) 2009-2011 VMware, Inc.
+require "resque/job_with_status"
+
+$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..')
+require "service_error"
+
+module Resque
+ extend self
+ # Patch Resque so we can determine queue by input args.
+ # Job class can define select_queue method and the result will be the queue name.
+ def enqueue(klass, *args)
+ queue = (klass.respond_to?(:select_queue) && klass.select_queue(*args)) || queue_from_class(klass)
+ enqueue_to(queue, klass, *args)
+ end
+
+ # Backport from resque master branch, we can remove this method when gem is updated.
+ def enqueue_to(queue, klass, *args)
+ # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
+ before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
+ klass.send(hook, *args)
+ end
+ return nil if before_hooks.any? { |result| result == false }
+
+ Job.create(queue, klass, *args)
+
+ Plugin.after_enqueue_hooks(klass).each do |hook|
+ klass.send(hook, *args)
+ end
+
+ return true
+ end
+
+ class Status
+ # new attributes
+ hash_accessor :complete_time
+ end
+end
+
+module VCAP
+ module Services
+ end
+end
+
+# A thin layer wraps resque-status
+module VCAP::Services::AsyncJob
+ include VCAP::Services::Base::Error
+
+ def self.logger=(logger)
+ @logger = logger
+ end
+
+ def job_repo_setup(options={})
+ raise "AsyncJob requires redis configuration." unless options[:redis]
+ @logger.debug("Initialize Resque using #{options}")
+ Resque.redis = options[:redis]
+ Resque::Status.expire_in = options[:expire] if options[:expire]
+ end
+
+ def get_job(jobid)
+ res = Resque::Status.get(jobid)
+ job_to_json(res)
+ end
+
+ def get_all_jobs()
+ Resque::Status.status_ids
+ end
+
+ def job_to_json(job)
+ return nil unless job
+ res = {
+ :job_id => job.uuid,
+ :status => job.status,
+ :start_time => job.time.to_s,
+ :description => job.options[:description] || "None"
+ }
+ res[:complete_time] = job.complete_time if job.complete_time
+ res[:result] = validate_message(job.message) if job.message
+ res
+ end
+
+ def validate_message(msg)
+ Yajl::Parser.parse(msg)
+ rescue => e
+ # generate internal error if we can't parse err msg
+ ServiceError.new(ServiceError::INTERNAL_ERROR).to_hash
+ end
+end
View
112 lib/base/job/serialization.rb
@@ -0,0 +1,112 @@
+# Copyright (c) 2009-2011 VMware, Inc.
+require "resque/job_with_status"
+require "fileutils"
+require "curb"
+require "vcap/logging"
+
+$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..')
+require "base/service_error"
+
+module VCAP
+ module Services
+ end
+end
+
+module VCAP::Services::Serialization
+ SERIALIZATION_KEY_PREFIX = "vcap:serialization".freeze
+
+ class << self
+ attr_reader :redis, :logger
+
+ # Config the redis using options
+ def redis_connect(opts)
+ resque = %w(host port password).inject({}){|res, o| res[o.to_sym] = opts[o]; res}
+ @redis = Redis.new(resque)
+ end
+
+ def redis=(redis)
+ raise "Serialization requires redis configuration." unless redis
+ @redis = redis
+ end
+
+ def logger=(logger)
+ @logger = logger
+ end
+ end
+
+ def redis_key(key)
+ "#{SERIALIZATION_KEY_PREFIX}:#{key}"
+ end
+
+ class SerializationJob < Resque::JobWithStatus
+ include VCAP::Services::Serialization
+ include VCAP::Services::Base::Error
+
+ class << self
+ attr_reader :logger
+
+ def queue_lookup_key
+ :node_id
+ end
+
+ def logger=(logger)
+ @logger = logger
+ end
+
+ def select_queue(*args)
+ result = nil
+ args.each do |arg|
+ result = arg[queue_lookup_key]if (arg.is_a? Hash )&& (arg.has_key?(queue_lookup_key))
+ end
+ @logger.info("Select queue #{result} for job #{self.class} with args:#{args.inspect}") if @logger
+ result
+ end
+ end
+
+ def initialize(*args)
+ super(*args)
+ parse_config
+ init_worker_logger()
+ end
+
+ def client
+ VCAP::Services::Serialization.redis
+ end
+
+ def init_worker_logger
+ VCAP::Logging.setup_from_config(@config["logging"])
+ @logger = VCAP::Logging.logger("#{@config["service_name"]}_worker")
+ end
+
+ # the serialize path structure looks like <base-dir>\serialize\<service-name>\<aa>\<bb>\<cc>\
+ # <aabbcc-rest-of-instance-guid>\<serialization data>
+ def get_serialized_data_path(name)
+ File.join(@config["serialization_base_dir"], "serialize", @config["service_name"] , name[0,2],name[2,2], name[4,2], name)
+ end
+
+ # Update the download token and save it in redis
+ def update_download_token(service, name, token)
+ client.set(redis_key("#{service}:#{name}:token"), token)
+ end
+
+ def parse_config
+ @config = Yajl::Parser.parse(ENV['WORKER_CONFIG'])
+ raise "Need environment variable: WORKER_CONFIG" unless @config
+ end
+
+ def cleanup(name)
+ return unless name
+ FileUtils.rm_rf(get_serialized_data_path(name))
+ end
+
+ # Fetch remote uri and stream content to file.
+ def fetch_url(url, file_path)
+ # TODO check the file size before download?
+ File.open(file_path, "wb+") do |f|
+ c = Curl::Easy.new(url)
+ c.on_body{|data| f.write(data)}
+ c.perform
+ end
+ end
+ end
+end
View
144 lib/base/job/snapshot.rb
@@ -0,0 +1,144 @@
+# Copyright (c) 2009-2011 VMware, Inc.
+require "resque/job_with_status"
+require "fileutils"
+require "vcap/logging"
+
+$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..')
+require "service_error"
+
+module VCAP
+ module Services
+ end
+end
+
+module VCAP::Services::Snapshot
+
+ SNAPSHOT_KEY_PREFIX = "vcap:snapshot".freeze
+ SNAPSHOT_ID = "maxid".freeze
+
+ class << self
+ attr_reader :redis, :logger
+
+ # Config the redis using options
+ def redis_connect(opts)
+ resque = %w(host port password).inject({}){|res, o| res[o.to_sym] = opts[o]; res}
+ @redis = Redis.new(resque)
+
+ redis_init
+ end
+
+ # Supply a redis instance
+ def redis=(redis)
+ raise "Snapshot requires redis configuration." unless redis
+ @redis = redis
+
+ redis_init
+ end
+
+ # initialize necessary keys
+ def redis_init
+ @redis.setnx("#{SNAPSHOT_KEY_PREFIX}:#{SNAPSHOT_ID}", 1)
+ end
+
+ def logger=(logger)
+ @logger = logger
+ end
+ end
+
+ def client
+ VCAP::Services::Snapshot.redis
+ end
+
+ # Get all snapshots related to a service instance
+ #
+ def service_snapshots(service_id)
+ return unless service_id
+ res = client.hgetall(redis_key(service_id))
+ res.values.map{|v| Yajl::Parser.parse(v)}
+ end
+
+ # Get detail information for a single snapshot
+ #
+ def snapshot_details(service_id, snapshot_id)
+ return unless service_id && snapshot_id
+ res = client.hget(redis_key(service_id), snapshot_id)
+ Yajl::Parser.parse(res) if res
+ end
+
+ # Generate unique id for a snapshot
+ def get_snapshot_id
+ client.incr(redis_key(SNAPSHOT_ID)).to_s
+ end
+
+ def save_snapshot(service_id , snapshot)
+ return unless service_id && snapshot
+ msg = Yajl::Encoder.encode(snapshot)
+ client.hset(redis_key(service_id), snapshot[:snapshot_id], msg)
+ end
+
+ def delete_snapshot(service_id , snapshot_id)
+ return unless service_id && snapshot_id
+ client.hdel(redis_key(name), snapshot_id)
+ end
+
+ protected
+
+ def redis_key(key)
+ "#{SNAPSHOT_KEY_PREFIX}:#{key}"
+ end
+
+ # common utils for snapshot job
+ class SnapshotJob < Resque::JobWithStatus
+ include VCAP::Services::Snapshot
+ include VCAP::Services::Base::Error
+
+ class << self
+ attr_reader :logger
+
+ def queue_lookup_key
+ :node_id
+ end
+
+ def logger=(logger)
+ @logger = logger
+ end
+
+ def select_queue(*args)
+ result = nil
+ args.each do |arg|
+ result = arg[queue_lookup_key]if (arg.is_a? Hash)&& (arg.has_key?(queue_lookup_key))
+ end
+ @logger.info("Select queue #{result} for job #{self.class} with args:#{args.inspect}") if @logger
+ result
+ end
+ end
+
+ def initialize(*args)
+ super(*args)
+ parse_config
+ init_worker_logger()
+ end
+
+ def init_worker_logger
+ VCAP::Logging.setup_from_config(@config["logging"])
+ @logger = VCAP::Logging.logger("#{@config["service_name"]}_worker")
+ end
+
+ # the snapshot path structure looks like <base-dir>\snapshots\<service-name>\<aa>\<bb>\<cc>\
+ # <aabbcc-rest-of-instance-guid>\snapshot_id\<service specific data>
+ def get_dump_path(name, snapshot_id)
+ File.join(@config["snapshots_base_dir"], "snapshots", @config["service_name"] , name[0,2],name[2,2], name[4,2], name, snapshot_id.to_s)
+ end
+
+ def parse_config
+ @config = Yajl::Parser.parse(ENV['WORKER_CONFIG'])
+ raise "Need environment variable: WORKER_CONFIG" unless @config
+ end
+
+ def cleanup(name, snapshot_id)
+ return unless name && snapshot_id
+ delete_snapshot(name, snapshot_id)
+ FileUtils.rm_rf(get_dump_path(name, snapshot_id))
+ end
+ end
+end
View
144 lib/base/provisioner.rb
@@ -6,11 +6,16 @@
$LOAD_PATH.unshift File.dirname(__FILE__)
require 'base/base'
+require 'base/job/async_job'
+require 'base/job/snapshot'
+require 'base/job/serialization'
require 'barrier'
require 'service_message'
class VCAP::Services::Base::Provisioner < VCAP::Services::Base::Base
include VCAP::Services::Internal
+ include VCAP::Services::AsyncJob
+ include VCAP::Services::Snapshot
BARRIER_TIMEOUT = 2
MASKED_PASSWORD = '********'
@@ -37,6 +42,13 @@ def initialize(options)
end
EM.add_periodic_timer(60) { process_nodes }
+
+ end
+
+ def create_redis(opt)
+ redis_client = Redis.new(opt)
+ raise "Can't connect to redis:#{opt.inspect}" unless redis_client
+ redis_client
end
def flavor
@@ -548,6 +560,115 @@ def recover(instance_id, backup_path, handles, &blk)
blk.call(internal_fail)
end
+ # Create a create_snapshot job and return the job object.
+ #
+ def create_snapshot(service_id, &blk)
+ @logger.debug("Create snapshot job for service_id=#{service_id}")
+ svc = @prov_svcs[service_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, service_id) unless svc
+ job_id = create_snapshot_job.create(:service_id => service_id, :node_id =>find_node(service_id))
+ job = get_job(job_id)
+ @logger.info("CreateSnapshotJob created: #{job.inspect}")
+ blk.call(success(job))
+ rescue => e
+ handle_error(e, &blk)
+ end
+
+ # Get detail job information by job id.
+ #
+ def job_details(service_id, job_id, &blk)
+ @logger.debug("Get job_id=#{job_id} for service_id=#{service_id}")
+ svc = @prov_svcs[service_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, service_id) unless svc
+ job = get_job(job_id)
+ raise ServiceError.new(ServiceError::NOT_FOUND, job_id) unless job
+ blk.call(success(job))
+ rescue => e
+ handle_error(e, &blk)
+ end
+
+ # Get detail snapshot information
+ #
+ def get_snapshot(service_id, snapshot_id, &blk)
+ @logger.debug("Get snapshot_id=#{snapshot_id} for service_id=#{service_id}")
+ svc = @prov_svcs[service_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, service_id) unless svc
+ snapshot = snapshot_details(service_id, snapshot_id)
+ raise ServiceError.new(ServiceError::NOT_FOUND, snapshot_id) unless snapshot
+ blk.call(success(snapshot))
+ rescue => e
+ handle_error(e, &blk)
+ end
+
+ # Get all snapshots related to an instance
+ #
+ def enumerate_snapshots(service_id, &blk)
+ @logger.debug("Get snapshots for service_id=#{service_id}")
+ svc = @prov_svcs[service_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, service_id) unless svc
+ snapshots = service_snapshots(service_id)
+ blk.call(success({:snapshots => snapshots}))
+ rescue => e
+ handle_error(e, &blk)
+ end
+
+ def rollback_snapshot(service_id, snapshot_id, &blk)
+ @logger.debug("Rollback snapshot=#{snapshot_id} for service_id=#{service_id}")
+ svc = @prov_svcs[service_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, service_id) unless svc
+ snapshot = snapshot_details(service_id, snapshot_id)
+ raise ServiceError.new(ServiceError::NOT_FOUND, snapshot_id) unless snapshot
+ job_id = rollback_snapshot_job.create(:service_id => service_id, :snapshot_id => snapshot_id,
+ :node_id => find_node(service_id))
+ job = get_job(job_id)
+ @logger.info("RoallbackSnapshotJob created: #{job.inspect}")
+ blk.call(success(job))
+ rescue => e
+ handle_error(e, &blk)
+ end
+
+ # Generate a url for user to download serialized data.
+ def get_serialized_url(service_id, &blk)
+ @logger.debug("get serialized url for service_id=#{service_id}")
+ svc = @prov_svcs[service_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, service_id) unless svc
+ job_id = create_serialized_url_job.create(:service_id => service_id, :node_id => find_node(service_id))
+ job = get_job(job_id)
+ blk.call(success(job))
+ rescue => e
+ handle_error(e, &blk)
+ end
+
+ #
+ def import_from_url(service_id, url, &blk)
+ @logger.debug("import serialized data from url:#{url} for service_id=#{service_id}")
+ svc = @prov_svcs[service_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, service_id) unless svc
+ job_id = import_from_url_job.create(:service_id => service_id, :url => url, :node_id => find_node(service_id))
+ job = get_job(job_id)
+ blk.call(success(job))
+ rescue => e
+ wrap_error(e, &blk)
+ end
+
+ def import_from_data(service_id, req, &blk)
+ @logger.debug("import serialized data from request for service_id=#{service_id}")
+ svc = @prov_svcs[service_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, service_id) unless svc
+ temp_path = File.join(@upload_temp_dir, "#{service_id}.gz")
+ # clean up previous upload
+ FileUtils.rm_rf(temp_path)
+
+ File.open(temp_path, "wb+") do |f|
+ f.write(Base64.decode64(req.data))
+ end
+ job_id = import_from_data_job.create(:service_id => service_id, :temp_file_path => temp_path, :node_id => find_node(service_id))
+ job = get_job(job_id)
+ blk.call(success(job))
+ rescue => e
+ handle_error(e, &blk)
+ end
+
# convert symbol key to string key
def hash_sym_key_to_str(hash)
new_hash = {}
@@ -642,6 +763,25 @@ def wrap_error(service_msg)
}
end
+ # handle request exception
+ def handle_error(e, &blk)
+ @logger.warn(e)
+ if e.instance_of? ServiceError
+ blk.call(failure(e))
+ else
+ blk.call(internal_fail)
+ end
+ end
+
+ # Find which node the service instance is running on.
+ def find_node(instance_id)
+ svc = @prov_svcs[instance_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, "instance_id #{instance_id}") if svc.nil?
+ node_id = svc[:credentials]["node_id"]
+ raise "Cannot find node_id for #{instance_id}" if node_id.nil?
+ node_id
+ end
+
# Service Provisioner subclasses must implement the following
# methods
@@ -653,5 +793,9 @@ def wrap_error(service_msg)
# service_name() --> string
# (inhereted from VCAP::Services::Base::Base)
+ #
+
+ # various lifecycle jobs class
+ abstract :create_snapshot_job, :rollback_snapshot_job, :create_serialized_url_job, :import_from_url_job, :import_from_data_job
end
View
2  lib/base/version.rb
@@ -1,7 +1,7 @@
module VCAP
module Services
module Base
- VERSION = "0.1.0"
+ VERSION = "0.1.1"
end
end
end
View
3  lib/vcap_services_base.rb
@@ -6,3 +6,6 @@
require 'base/service_error'
require 'base/asynchronous_service_gateway'
require 'base/datamapper_l'
+require 'base/job/async_job'
+require 'base/job/snapshot'
+require 'base/job/serialization'
View
2  vcap_services_base.gemspec
@@ -25,4 +25,6 @@ Gem::Specification.new do |s|
s.add_dependency "thin", "~> 1.2.11"
s.add_dependency "vcap_common", "~> 1.0.2"
s.add_dependency "vcap_logging", ">=0.1.3"
+ s.add_dependency "resque-status", "~> 0.2.4"
+ s.add_dependency "curb", "~> 0.7.16"
end
Please sign in to comment.
Something went wrong with that request. Please try again.