Permalink
Browse files

Fix capacity calculation for service migration

Change-Id: I2befe4eed16a190e2128a8744a44a9f72bff57dc
  • Loading branch information...
1 parent 11c38be commit 94685031def07ab094face438a602eb4d9bb1886 Tang Rui committed Apr 1, 2012
Showing with 919 additions and 328 deletions.
  1. +1 −1 atmos/Gemfile.lock
  2. BIN atmos/vendor/cache/vcap_services_base-0.1.8.gem
  3. BIN atmos/vendor/cache/vcap_services_base-0.1.9.gem
  4. +1 −1 base/Gemfile.lock
  5. +25 −0 base/lib/base/asynchronous_service_gateway.rb
  6. +108 −34 base/lib/base/node.rb
  7. +85 −0 base/lib/base/provisioner.rb
  8. +1 −1 base/lib/base/version.rb
  9. +48 −0 base/spec/async_gw_spec.rb
  10. +50 −2 base/spec/helper/async_gw_spec_helper.rb
  11. +129 −2 base/spec/helper/node_spec_helper.rb
  12. +48 −0 base/spec/helper/provision_spec_helper.rb
  13. +147 −0 base/spec/node_spec.rb
  14. +63 −0 base/spec/provision_spec.rb
  15. +1 −1 filesystem/Gemfile.lock
  16. BIN filesystem/vendor/cache/vcap_services_base-0.1.8.gem
  17. BIN filesystem/vendor/cache/vcap_services_base-0.1.9.gem
  18. +1 −1 mongodb/Gemfile.lock
  19. +16 −2 mongodb/lib/mongodb_service/mongodb_node.rb
  20. +16 −2 mongodb/spec/mongodb_rebalance_spec.rb
  21. BIN mongodb/vendor/cache/vcap_services_base-0.1.8.gem
  22. BIN mongodb/vendor/cache/vcap_services_base-0.1.9.gem
  23. +1 −1 mysql/Gemfile.lock
  24. +15 −1 mysql/lib/mysql_service/node.rb
  25. +22 −1 mysql/spec/mysql_node_spec.rb
  26. BIN mysql/vendor/cache/vcap_services_base-0.1.8.gem
  27. BIN mysql/vendor/cache/vcap_services_base-0.1.9.gem
  28. +1 −1 neo4j/Gemfile.lock
  29. BIN neo4j/vendor/cache/vcap_services_base-0.1.8.gem
  30. BIN neo4j/vendor/cache/vcap_services_base-0.1.9.gem
  31. +1 −1 postgresql/Gemfile.lock
  32. +16 −17 postgresql/lib/postgresql_service/node.rb
  33. +22 −2 postgresql/spec/postgresql_node_spec.rb
  34. BIN postgresql/vendor/cache/vcap_services_base-0.1.8.gem
  35. BIN postgresql/vendor/cache/vcap_services_base-0.1.9.gem
  36. +1 −1 rabbit/Gemfile.lock
  37. BIN rabbit/vendor/cache/vcap_services_base-0.1.8.gem
  38. BIN rabbit/vendor/cache/vcap_services_base-0.1.9.gem
  39. +1 −1 redis/Gemfile.lock
  40. +15 −16 redis/lib/redis_service/redis_node.rb
  41. +4 −4 redis/spec/node_spec.rb
  42. BIN redis/vendor/cache/vcap_services_base-0.1.8.gem
  43. BIN redis/vendor/cache/vcap_services_base-0.1.9.gem
  44. +1 −1 service_broker/Gemfile.lock
  45. BIN service_broker/vendor/cache/vcap_services_base-0.1.8.gem
  46. BIN service_broker/vendor/cache/vcap_services_base-0.1.9.gem
  47. +1 −1 tools/backup/manager/Gemfile.lock
  48. BIN tools/backup/manager/vendor/cache/vcap_services_base-0.1.8.gem
  49. BIN tools/backup/manager/vendor/cache/vcap_services_base-0.1.9.gem
  50. +1 −0 tools/rebalance/Gemfile
  51. +4 −0 tools/rebalance/Gemfile.lock
  52. +72 −232 tools/rebalance/bin/rebalance
  53. BIN tools/rebalance/vendor/cache/mime-types-1.17.2.gem
  54. BIN tools/rebalance/vendor/cache/rest-client-1.6.7.gem
  55. +1 −1 vblob/Gemfile.lock
  56. BIN vblob/vendor/cache/vcap_services_base-0.1.8.gem
  57. BIN vblob/vendor/cache/vcap_services_base-0.1.9.gem
View
@@ -126,7 +126,7 @@ GEM
thin (~> 1.3.1)
yajl-ruby (~> 0.8.3)
vcap_logging (0.1.3)
- vcap_services_base (0.1.8)
+ vcap_services_base (0.1.9)
curb (~> 0.7.16)
datamapper (~> 1.1.0)
do_sqlite3 (~> 0.10.3)
Binary file not shown.
Binary file not shown.
View
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
- vcap_services_base (0.1.8)
+ vcap_services_base (0.1.9)
curb (~> 0.7.16)
datamapper (~> 1.1.0)
do_sqlite3 (~> 0.10.3)
@@ -428,6 +428,31 @@ def check_orphan(handles, callback, errback)
async_mode
end
+ # Service migration API
+ post "/service/internal/v1/migration/:node_id/:instance_id/:action" do
+ @logger.info("Migration: #{params["action"]} instance #{params["instance_id"]} in #{params["node_id"]}")
+ @provisioner.migrate_instance(params["node_id"], params["instance_id"], params["action"]) do |msg|
+ if msg["success"]
+ async_reply(msg["response"].to_json)
+ else
+ async_reply_error(msg["response"])
+ end
+ end
+ async_mode
+ end
+
+ get "/service/internal/v1/migration/:node_id/instances" do
+ @logger.info("Migration: get instance id list of node #{params["node_id"]}")
+ @provisioner.get_instance_id_list(params["node_id"]) do |msg|
+ if msg["success"]
+ async_reply(msg["response"].to_json)
+ else
+ async_reply_error(msg["response"])
+ end
+ end
+ async_mode
+ end
+
#################### Helpers ####################
View
@@ -40,7 +40,7 @@ def on_connect_node
@logger.debug("#{service_description}: Connected to node mbus")
%w[provision unprovision bind unbind restore disable_instance
- enable_instance import_instance cleanup_nfs purge_orphan
+ enable_instance import_instance update_instance cleanupnfs_instance purge_orphan
].each do |op|
eval %[@node_nats.subscribe("#{service_name}.#{op}.#{@node_id}") { |msg, reply| EM.defer{ on_#{op}(msg, reply) } }]
end
@@ -134,46 +134,111 @@ def on_restore(msg, reply)
publish(reply, encode_failure(response, e))
end
- # disable and dump instance
+ # Disable and dump instance
def on_disable_instance(msg, reply)
@logger.debug("#{service_description}: Disable instance #{msg} request from #{reply}")
- credentials = Yajl::Parser.parse(msg)
- prov_cred, binding_creds = credentials
- instance = prov_cred['name']
- file_path = get_migration_folder(instance)
+ response = SimpleResponse.new
+ request = Yajl::Parser.parse(msg)
+ prov_handle, binding_handles = request
+ prov_cred = prov_handle["credentials"]
+ binding_creds = get_all_bindings(binding_handles)
+ file_path = get_migration_folder(prov_handle["service_id"])
FileUtils.mkdir_p(file_path)
result = disable_instance(prov_cred, binding_creds)
- result = dump_instance(prov_cred, binding_creds, file_path) if result
- publish(reply, Yajl::Encoder.encode(result))
+ if result
+ # Do dump together with disable for simpler migration logic
+ result = dump_instance(prov_cred, binding_creds, file_path)
+ if result
+ publish(reply, encode_success(response))
+ else
+ publish(reply, encode_failure(response))
+ end
+ else
+ publish(reply, encode_failure(response))
+ end
rescue => e
@logger.warn("Exception at on_disable_instance #{e}")
+ publish(reply, encode_failure(response, e))
end
- # enable instance and send updated credentials back
+ # Enable instance, the opposite operation of disable
def on_enable_instance(msg, reply)
@logger.debug("#{service_description}: enable instance #{msg} request from #{reply}")
- credentials = Yajl::Parser.parse(msg)
- prov_cred, binding_creds_hash = credentials
+ response = SimpleResponse.new
+ request = Yajl::Parser.parse(msg)
+ prov_handle, binding_handles = request
+ prov_cred = prov_handle["credentials"]
+ binding_creds_hash = get_all_bindings_with_option(binding_handles)
result = enable_instance(prov_cred, binding_creds_hash)
- # Update node_id in provision credentials..
- prov_cred, binding_creds_hash = result
- prov_cred['node_id'] = @node_id
- result = [prov_cred, binding_creds_hash]
- publish(reply, Yajl::Encoder.encode(result))
+ if result
+ publish(reply, encode_success(response))
+ else
+ publish(reply, encode_failure(response))
+ end
rescue => e
@logger.warn("Exception at on_enable_instance #{e}")
+ publish(reply, encode_failure(response, e))
+ end
+
+ # Import the generated data
+ def on_import_instance(msg, reply)
+ @logger.debug("#{service_description}: import instance #{msg} request from #{reply}")
+ response = SimpleResponse.new
+ request = Yajl::Parser.parse(msg)
+ prov_handle, binding_handles = request
+ prov_cred = prov_handle["credentials"]
+ binding_creds_hash = get_all_bindings_with_option(binding_handles)
+ plan = prov_handle["configuration"]["plan"]
+ file_path = get_migration_folder(prov_handle["service_id"])
+ result = import_instance(prov_cred, binding_creds_hash, file_path, plan)
+ if result
+ publish(reply, encode_success(response))
+ else
+ publish(reply, encode_failure(response))
+ end
+ rescue => e
+ @logger.warn("Exception at on_import_instance #{e}")
+ publish(reply, encode_failure(response, e))
+ end
+
+ # Update credentials in destination node of migration
+ def on_update_instance(msg, reply)
+ @logger.debug("#{service_description}: update instance #{msg} request from #{reply}")
+ request = Yajl::Parser.parse(msg)
+ prov_handle, binding_handles = request
+ prov_cred = prov_handle["credentials"]
+ binding_creds_hash = get_all_bindings_with_option(binding_handles)
+ result = update_instance(prov_cred, binding_creds_hash)
+ # Need decrease the capacity in destination node when finish migration
+ @capacity_lock.synchronize{ @capacity -= capacity_unit }
+ prov_cred, binding_creds_hash = result
+ # Update node_id in provision credentials
+ prov_cred["node_id"] = @node_id
+ handles = []
+ prov_handle["credentials"] = prov_cred
+ handles << prov_handle
+ binding_handles.each do |handle|
+ handle["credentials"] = binding_creds_hash[handle["service_id"]]["credentials"]
+ handles << handle
+ end
+ publish(reply, Yajl::Encoder.encode(handles))
+ rescue => e
+ @logger.warn("Exception at on_update_instance #{e}")
+ response = SimpleResponse.new
+ publish(reply, encode_failure(response, e))
end
# Cleanup nfs folder which contains migration data
- def on_cleanup_nfs(msg, reply)
+ def on_cleanupnfs_instance(msg, reply)
@logger.debug("#{service_description}: cleanup nfs request #{msg} from #{reply}")
+ response = SimpleResponse.new
request = Yajl::Parser.parse(msg)
- prov_cred, binding_creds = request
- instance = prov_cred['name']
- FileUtils.rm_rf(get_migration_folder(instance))
- publish(reply, Yajl::Encoder.encode(true))
+ prov_handle, _ = request
+ FileUtils.rm_rf(get_migration_folder(prov_handle["service_id"]))
+ publish(reply, encode_success(response))
rescue => e
- @logger.warn("Exception at on_cleanup_nfs #{e}")
+ @logger.warn("Exception at on_cleanupnfs_instance #{e}")
+ publish(reply, encode_failure(response, e))
end
# Send all handles to gateway to check orphan
@@ -206,7 +271,7 @@ def purge_orphan(oi_list,ob_list)
oi_list.each do |ins|
begin
@logger.debug("Unprovision orphan instance #{ins}")
- unprovision(ins,[])
+ @capacity_lock.synchronize{ @capacity += capacity_unit } if unprovision(ins,[])
rescue => e
@logger.debug("Error on purge orphan instance #{ins}: #{e}")
end
@@ -244,16 +309,25 @@ def get_migration_folder(instance)
File.join(@migration_nfs, 'migration', service_name, instance)
end
- def on_import_instance(msg, reply)
- @logger.debug("#{service_description}: import instance #{msg} request from #{reply}")
- credentials = Yajl::Parser.parse(msg)
- plan, prov_cred, binding_creds_hash = credentials
- instance = prov_cred['name']
- file_path = get_migration_folder(instance)
- result = import_instance(prov_cred, binding_creds_hash, file_path, plan)
- publish(reply, Yajl::Encoder.encode(result))
- rescue => e
- @logger.warn("Exception at on_import_instance #{e}")
+ def get_all_bindings(handles)
+ binding_creds = []
+ handles.each do |handle|
+ binding_creds << handle["credentials"]
+ end
+ binding_creds
+ end
+
+ def get_all_bindings_with_option(handles)
+ binding_creds_hash = {}
+ handles.each do |handle|
+ value = {
+ "credentials" => handle["credentials"],
+ "binding_options" => nil
+ }
+ value["binding_options"] = handle["configuration"]["data"]["binding_options"] if handle["configuration"].has_key?("data")
+ binding_creds_hash[handle["service_id"]] = value
+ end
+ binding_creds_hash
end
def on_discover(msg, reply)
@@ -341,6 +415,6 @@ def get_host
# (inhereted from VCAP::Services::Base::Base)
# <action>_instance(prov_credential, binding_credentials) --> true for success and nil for fail
- abstract :disable_instance, :dump_instance, :import_instance, :enable_instance
+ abstract :disable_instance, :dump_instance, :import_instance, :enable_instance, :update_instance
end
@@ -576,6 +576,91 @@ def recover(instance_id, backup_path, handles, &blk)
blk.call(internal_fail)
end
+ def migrate_instance(node_id, instance_id, action, &blk)
+ @logger.debug("[#{service_description}] Attempting to #{action} instance #{instance_id} in node #{node_id}")
+
+ begin
+ svc = @prov_svcs[instance_id]
+ raise ServiceError.new(ServiceError::NOT_FOUND, instance_id) if svc.nil?
+
+ binding_handles = []
+ @prov_svcs.each do |_, handle|
+ if handle[:service_id] != instance_id
+ binding_handles << handle if handle[:credentials]["name"] == instance_id
+ end
+ end
+ subscription = nil
+ message = nil
+ channel = nil
+ if action == "disable" || action == "enable" || action == "import" || action == "update" || action == "cleanupnfs"
+ channel = "#{service_name}.#{action}_instance.#{node_id}"
+ message = Yajl::Encoder.encode([svc, binding_handles])
+ elsif action == "unprovision"
+ channel = "#{service_name}.unprovision.#{node_id}"
+ bindings = find_all_bindings(instance_id)
+ request = UnprovisionRequest.new
+ request.name = instance_id
+ request.bindings = bindings
+ message = request.encode
+ elsif action == "check"
+ if node_id == svc[:credentials]["node_id"]
+ blk.call(success())
+ return
+ else
+ raise ServiceError.new(ServiceError::NOT_FOUND, instance_id)
+ end
+ else
+ raise ServiceError.new(ServiceError::NOT_FOUND, action)
+ end
+ timer = EM.add_timer(@node_timeout) {
+ @node_nats.unsubscribe(subscription)
+ blk.call(timeout_fail)
+ }
+ subscription = @node_nats.request(channel, message) do |msg|
+ EM.cancel_timer(timer)
+ @node_nats.unsubscribe(subscription)
+ if action != "update"
+ response = SimpleResponse.decode(msg)
+ if response.success
+ blk.call(success())
+ else
+ blk.call(wrap_error(response))
+ end
+ else
+ handles = Yajl::Parser.parse(msg)
+ handles.each do |handle|
+ @update_handle_callback.call(handle) do |update_res|
+ if update_res
+ @logger.info("Migration: success to update handle: #{handle}")
+ else
+ @logger.error("Migration: failed to update handle: #{handle}")
+ blk.call(wrap_error(response))
+ end
+ end
+ blk.call(success())
+ end
+ end
+ end
+ rescue => e
+ if e.instance_of? ServiceError
+ blk.call(failure(e))
+ else
+ @logger.warn("Exception at migrate_instance #{e}")
+ blk.call(internal_fail)
+ end
+ end
+ end
+
+ def get_instance_id_list(node_id, &blk)
+ @logger.debug("Get instance id list for migration")
+
+ id_list = []
+ @prov_svcs.each do |k, v|
+ id_list << k if (k == v[:credentials]["name"] && node_id == v[:credentials]["node_id"])
+ end
+ blk.call(success(id_list))
+ end
+
# Create a create_snapshot job and return the job object.
#
def create_snapshot(service_id, &blk)
View
@@ -1,7 +1,7 @@
module VCAP
module Services
module Base
- VERSION = "0.1.8"
+ VERSION = "0.1.9"
end
end
end
Oops, something went wrong.

0 comments on commit 9468503

Please sign in to comment.