Permalink
Browse files

CCNG support for service gateways

 - Refactored cloud controller interactions to switch between cc/ccng via config
 - Send plan description (if specified) to CCNG
 - Bump Gem version

Change-Id: Ide9579ae49ebe68a15fcbe91c7cd612acb1fdaa1
  • Loading branch information...
1 parent 3619ef2 commit 0173709ec7a8d1f9561ffbd1373900d411c12bc1 Harshawardhan Gadgil committed Dec 10, 2012
View
@@ -10,6 +10,7 @@ group :test do
gem "simplecov"
gem "simplecov-rcov"
gem "dm-sqlite-adapter"
+ gem "cf-uaa-lib"
gem 'eventmachine', :git => 'git://github.com/cloudfoundry/eventmachine.git', :branch => 'release-0.12.11-cf'
gem 'vcap_common', :require => ['vcap/common', 'vcap/component'], :git => 'git://github.com/cloudfoundry/vcap-common.git', :ref => 'fd6b6d91'
gem 'vcap_logging', :require => ['vcap/logging'], :git => 'git://github.com/cloudfoundry/common.git', :ref => 'b96ec1192d'
View
@@ -43,7 +43,8 @@ GIT
PATH
remote: .
specs:
- vcap_services_base (0.1.16)
+ vcap_services_base (0.1.17)
+ cf-uaa-lib
curb (~> 0.7.16)
datamapper (~> 1.1.0)
do_sqlite3 (~> 0.10.3)
@@ -71,6 +72,8 @@ GEM
bcrypt-ruby (2.1.4)
beefcake (0.3.7)
builder (3.0.0)
+ cf-uaa-lib (1.3.1)
+ multi_json
ci_reporter (1.6.4)
builder (>= 2.1.2)
curb (0.7.18)
@@ -189,7 +192,7 @@ GEM
eventmachine (>= 0.12.6)
rack (>= 1.0.0)
tilt (1.2.2)
- uuid (2.3.5)
+ uuid (2.3.6)
macaddr (~> 1.0)
uuidtools (2.1.3)
vegas (0.1.11)
@@ -200,6 +203,7 @@ PLATFORMS
ruby
DEPENDENCIES
+ cf-uaa-lib
ci_reporter
dm-sqlite-adapter
eventmachine!
@@ -27,44 +27,25 @@ def setup(opts)
@provisioner = opts[:provisioner]
@hb_interval = opts[:heartbeat_interval] || 60
@node_timeout = opts[:node_timeout]
- @handles_uri = "#{@cld_ctrl_uri}/services/v1/offerings/#{@service[:label]}/handles"
@handle_fetch_interval = opts[:handle_fetch_interval] || 1
@check_orphan_interval = opts[:check_orphan_interval] || -1
@double_check_orphan_interval = opts[:double_check_orphan_interval] || 300
@handle_fetched = false
@fetching_handles = false
@version_aliases = @service[:version_aliases] || {}
- @svc_json = VCAP::Services::Api::ServiceOfferingRequest.new({
- :label => @service[:label],
- :url => @service[:url],
- :plans => @service[:plans],
- :cf_plan_id => @service[:cf_plan_id],
- :tags => @service[:tags],
- :active => true,
- :description => @service[:description],
- :plan_options => @service[:plan_options],
- :acls => @service[:acls],
- :timeout => @service[:timeout],
- :provider => @service[:provider] || 'core',
- :default_plan => @service[:default_plan],
- :supported_versions => @service[:supported_versions],
- :version_aliases => @service[:version_aliases],
- }).encode
-
- @deact_json = VCAP::Services::Api::ServiceOfferingRequest.new({
- :label => @service[:label],
- :url => @service[:url],
- :supported_versions => @service[:supported_versions],
- :version_aliases => @service[:version_aliases],
- :active => false,
- }).encode
-
- token_hdrs = VCAP::Services::Api::GATEWAY_TOKEN_HEADER
- @cc_req_hdrs = {
- 'Content-Type' => 'application/json',
- token_hdrs => @token,
- }
- @proxy_opts = opts[:proxy]
+
+ opts[:gateway_name] ||= "Service Gateway"
+
+ @cc_api_version = opts[:cc_api_version] || "v1"
+ if @cc_api_version == "v1"
+ require 'catalog_manager_v1'
+ @catalog_manager = VCAP::Services::CatalogManagerV1.new(opts)
+ elsif @cc_api_version == "v2"
+ require 'catalog_manager_v2'
+ @catalog_manager = VCAP::Services::CatalogManagerV2.new(opts)
+ else
+ raise "Unknown cc_api_version: #{@cc_api_version}"
+ end
# Setup heartbeats and exit handlers
EM.add_periodic_timer(@hb_interval) { send_heartbeat }
@@ -92,6 +73,7 @@ def setup(opts)
@logger.info("No current version alias is supplied, skip update version in CCDB.")
end
end
+
@fetch_handle_timer = EM.add_periodic_timer(@handle_fetch_interval) { fetch_handles(&update_callback) }
EM.next_tick { fetch_handles(&update_callback) }
@@ -108,6 +90,35 @@ def setup(opts)
@provisioner.register_update_handle_callback{|handle, &blk| update_service_handle(handle, &blk)}
end
+ def get_current_catalog
+ id, version = @service[:label].split(/-/)
+ version = @service[:version_aliases][:current] if @service[:version_aliases][:current]
+ provider = @service[:provider] || 'core'
+
+ catalog_key = @catalog_manager.create_key(id, version, provider)
+
+ catalog = {}
+ catalog[catalog_key] = {
+ "id" => id,
+ "version" => version,
+ "url" => @service[:url],
+ "plans" => @service[:plans],
+ "cf_plan_id" => @service[:cf_plan_id],
+ "tags" => @service[:tags],
+ "active" => true,
+ "description" => @service[:description],
+ "plan_options" => @service[:plan_options],
+ "acls" => @service[:acls],
+ "timeout" => @service[:timeout],
+ "provider" => provider,
+ "default_plan" => @service[:default_plan],
+ "supported_versions" => @service[:supported_versions],
+ "version_aliases" => @service[:version_aliases],
+ }
+
+ return catalog
+ end
+
def check_orphan(handles, callback, errback)
@provisioner.check_orphan(handles) do |msg|
if msg['success']
@@ -123,14 +134,17 @@ def check_orphan(handles, callback, errback)
def validate_incoming_request
unless request.media_type == Rack::Mime.mime_type('.json')
error_msg = ServiceError.new(ServiceError::INVALID_CONTENT).to_hash
+ @logger.error("Validation failure: #{error_msg.inspect}, request media type: #{request.media_type} is not json")
abort_request(error_msg)
end
unless auth_token && (auth_token == @token)
error_msg = ServiceError.new(ServiceError::NOT_AUTHORIZED).to_hash
+ @logger.error("Validation failure: #{error_msg.inspect}, expected token: #{@token}, specified token: #{auth_token}")
abort_request(error_msg)
end
unless @handle_fetched
error_msg = ServiceError.new(ServiceError::SERVICE_UNAVAILABLE).to_hash
+ @logger.error("Validation failure: #{error_msg.inspect}, handles not fetched")
abort_request(error_msg)
end
end
@@ -451,122 +465,48 @@ def validate_incoming_request
helpers do
+ # Fetches canonical state (handles) from the Cloud Controller
+ def fetch_handles(&cb)
+ f = Fiber.new do
+ @catalog_manager.fetch_handles_from_cc(@service[:label], cb)
+ end
+ f.resume
+ end
+
# Update a service handle using REST
def update_service_handle(handle, &cb)
- @logger.debug("Update service handle: #{handle.inspect}")
- if not handle
- cb.call(false) if cb
- return
- end
- id = handle["service_id"]
- uri = @handles_uri + "/#{id}"
- handle_json = Yajl::Encoder.encode(handle)
- req = {
- :head => @cc_req_hdrs,
- :body => handle_json,
- }
- http = EM::HttpRequest.new(uri).post(req)
- http.callback do
- if http.response_header.status == 200
- @logger.info("Successful update handle #{id}.")
- # Update local array in provisioner
- @provisioner.update_handles([handle])
- cb.call(true) if cb
- else
- @logger.error("Failed to update handle #{id}: http status #{http.response_header.status}, error: #{http.error}")
- cb.call(false) if cb
- end
- end
- http.errback do
- @logger.error("Failed to update handle #{id}: #{http.error}")
- cb.call(false) if cb
+ f = Fiber.new do
+ @catalog_manager.update_handle_in_cc(
+ @service[:label],
+ handle,
+ lambda {
+ # Update local array in provisioner
+ @provisioner.update_handles([handle])
+ cb.call(true) if cb
+ },
+ lambda { cb.call(false) if cb }
+ )
end
+ f.resume
end
# Lets the cloud controller know we're alive and where it can find us
def send_heartbeat
- @logger.info("Sending info to cloud controller: #{@offering_uri}")
-
- req = create_http_request(
- :head => @cc_req_hdrs,
- :body => @svc_json
+ @catalog_manager.update_catalog(
+ true,
+ lambda { return get_current_catalog },
+ nil
)
-
- http = EM::HttpRequest.new(@offering_uri).post(req)
-
- http.callback do
- if http.response_header.status == 200
- @logger.info("Successfully registered with cloud controller")
- else
- @logger.error("Failed registering with cloud controller, status=#{http.response_header.status}")
- end
end
- http.errback do
- @logger.error("Failed registering with cloud controller: #{http.error}")
- end
- end
-
- # Lets the cloud controller know that we're going away
- def send_deactivation_notice(stop_event_loop=true)
- @logger.info("Sending deactivation notice to cloud controller: #{@offering_uri}")
-
- req = create_http_request(
- :head => @cc_req_hdrs,
- :body => @deact_json
- )
-
- http = EM::HttpRequest.new(@offering_uri).post(req)
-
- http.callback do
- if http.response_header.status == 200
- @logger.info("Successfully deactivated with cloud controller")
- else
- @logger.error("Failed deactivation with cloud controller, status=#{http.response_header.status}")
- end
- EM.stop if stop_event_loop
- end
-
- http.errback do
- @logger.error("Failed deactivation with cloud controller: #{http.error}")
- EM.stop if stop_event_loop
- end
- end
-
- # Fetches canonical state (handles) from the Cloud Controller
- def fetch_handles(&cb)
- return if @fetching_handles
-
- @logger.info("Fetching handles from cloud controller @ #{@handles_uri}")
- @fetching_handles = true
-
- req = create_http_request :head => @cc_req_hdrs
- http = EM::HttpRequest.new(@handles_uri).get(req)
-
- http.callback do
- @fetching_handles = false
- if http.response_header.status == 200
- @logger.info("Successfully fetched handles")
- begin
- resp = VCAP::Services::Api::ListHandlesResponse.decode(http.response)
- rescue => e
- @logger.error("Error decoding reply from gateway:")
- @logger.error("#{e}")
- next
- end
- cb.call(resp)
- else
- @logger.error("Failed fetching handles, status=#{http.response_header.status}")
- end
- end
-
- http.errback do
- @fetching_handles = false
- @logger.error("Failed fetching handles: #{http.error}")
+ # Lets the cloud controller know that we're going away
+ def send_deactivation_notice(stop_event_loop=true)
+ @catalog_manager.update_catalog(
+ false,
+ lambda { return get_current_catalog },
+ lambda { EM.stop if stop_event_loop }
+ )
end
- end
end
-
-
end
@@ -0,0 +1,50 @@
+require 'abstract'
+
+module VCAP
+ module Services
+ class CatalogManagerBase
+
+ def initialize(opts)
+ @proxy_opts = opts[:proxy]
+ end
+
+ def create_http_request(args)
+ req = {
+ :head => args[:head],
+ :body => args[:body],
+ }
+ if (@proxy_opts)
+ req[:proxy] = @proxy_opts
+ # this is a workaround for em-http-requesr 0.3.0 so that headers are not lost
+ # more info: https://github.com/igrigorik/em-http-request/issues/130
+ req[:proxy][:head] = req[:head]
+ end
+
+ req
+ end
+
+ abstract :snapshot_and_reset_stats
+
+ # update_catalog(activate, load_catalog_callback, after_update_callback=nil)
+ abstract :update_catalog
+
+ # generate_cc_advertise_offering_request(svc, active = true)
+ abstract :generate_cc_advertise_offering_request
+
+ # advertise_service_to_cc(svc, active)
+ abstract :advertise_service_to_cc
+
+ # load_registered_services_from_cc
+ abstract :load_registered_services_from_cc
+
+ ##### Handles processing #####
+
+ # update_handle_in_cc(service_label, handle, on_success_callback, on_failure_callback)
+ abstract :update_handle_in_cc
+
+ # fetch_handles_from_cc(service_label, after_fetch_callback)
+ abstract :fetch_handles_from_cc
+
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit 0173709

Please sign in to comment.