Permalink
Browse files

Handle marketplace removes an existing version of existing service

 - Fixed appdirect error handling
 - Specify complete uri in external uri config
 - Refactor acls in appdirect advertise request

Change-Id: If860cbbb80e614e8334d9536bfd3df0f25b56c0f
  • Loading branch information...
1 parent 77510c6 commit db90798a10b5015d4e79e5f26c81b26e6e11eb59 Harshawardhan Gadgil committed Sep 4, 2012
@@ -3,7 +3,7 @@ cloud_controller_uri: api.vcap.me
ip_route: localhost
index: 0
token: changemarketplacetoken
-mbus: nats://localhost:4222
+mbus: nats://localhost:4222/
logging:
level: debug
@@ -16,5 +16,8 @@ marketplace: appdirect
acls: [ "*@example.com" ]
-external_uri: appdirect-mpgw.vcap.me
+external_uri: "http://appdirect-mpgw.vcap.me"
refresh_interval: 300
+
+# Used for testing purposes only
+disabled_service_ids: []
@@ -1,13 +1,15 @@
# Copyright (c) 2009-2012 VMware, Inc.
require 'fiber'
require 'nats/client'
+require 'uri'
module VCAP
module Services
module Marketplace
class MarketplaceAsyncServiceGateway < VCAP::Services::AsynchronousServiceGateway
- REQ_OPTS = %w(mbus external_uri token cloud_controller_uri).map {|o| o.to_sym}
+ API_VERSION = "v1"
+ REQ_OPTS = %w(mbus external_uri token cloud_controller_uri).map {|o| o.to_sym}
set :raise_errors, Proc.new {false}
set :show_exceptions, false
@@ -39,13 +41,14 @@ def setup(opts)
@host = opts[:host]
@port = opts[:port]
- @external_uri = opts[:external_uri]
+ @router_register_uri = (URI.parse(opts[:external_uri])).host
@node_timeout = opts[:node_timeout]
@logger = opts[:logger] || make_logger()
@token = opts[:token]
@hb_interval = opts[:heartbeat_interval] || 60
@cld_ctrl_uri = http_uri(opts[:cloud_controller_uri] || "api.vcap.me")
- @offering_uri = "#{@cld_ctrl_uri}/services/v1/offerings/"
+ @offering_uri = "#{@cld_ctrl_uri}/services/#{API_VERSION}/offerings/"
+ @service_list_uri = "#{@cld_ctrl_uri}/proxied_services/#{API_VERSION}/offerings"
@proxy_opts = opts[:proxy]
@handle_fetched = true # set to true in order to compatible with base asycn gateway.
@@ -56,7 +59,7 @@ def setup(opts)
@router_register_json = {
:host => @host,
:port => @port,
- :uris => [ @external_uri ],
+ :uris => [ @router_register_uri ],
:tags => {:components => "#{@marketplace_client.name}MarketplaceGateway" }
}.to_json
@@ -101,10 +104,10 @@ def setup(opts)
def refresh_catalog_and_update_cc
f = Fiber.new do
begin
- # get all service offerings
refresh_catalog
- # active services in local database
+
advertise_services
+
# Ready to serve
@logger.info("#{@marketplace_client.name} Marketplace Gateway is ready to serve incoming request.")
rescue => e
@@ -115,12 +118,43 @@ def refresh_catalog_and_update_cc
end
def refresh_catalog
- @catalog = @marketplace_client.get_catalog
+ @catalog_in_ccdb = get_proxied_services_from_cc
+ @catalog_in_marketplace = @marketplace_client.get_catalog
+ end
+
+ def deactivate_disabled_services
+ disabled_count = 0
+ @catalog_in_ccdb.each do |label, svc|
+ if (!@catalog_in_marketplace.keys.include?(label))
+ service_name, version = label.split(/-/)
+ svc["version"] = version
+ req = {
+ :label => svc["label"],
+ :active => false,
+ :url => @external_uri,
+ :supported_versions => [ version ],
+ :version_aliases => { "current" => version },
+ }
+ @logger.warn("#{@marketplace_client.name} service offering: #{label} not found in latest offering. Deactivating...")
+ advertise_service_to_cc(req)
+
+ # TODO: Update varz
+ disabled_count += 1
+ else
+ @logger.debug("Offering #{label} still active in #{@marketplace_client.name} marketplace")
+ end
+ end
+
+ @logger.info("Found #{disabled_count} disabled service offerings")
end
def advertise_services(active=true)
- @catalog.each do |name, bsvc|
- req = @marketplace_client.generate_cc_advertise_request(name, bsvc, active)
+ # Set services missing from marketplace offerings to inactive
+ deactivate_disabled_services
+
+ # Process all services currently in marketplace
+ @catalog_in_marketplace.each do |label, bsvc|
+ req = @marketplace_client.generate_cc_advertise_request(bsvc["id"], bsvc, active)
advertise_service_to_cc(req)
end
end
@@ -146,6 +180,7 @@ def stop_nats()
@nats.unsubscribe(@router_start_channel) if @router_start_channel
@logger.debug("Unregister #{@marketplace_client.name} marketplace gateway: #{@router_register_json}")
@nats.publish("router.unregister", @router_register_json)
+ sleep 0.1 # Allow some time for actual de-registering before shutting down
@nats.close
end
@@ -163,7 +198,7 @@ def on_exit(stop_event_loop=true)
#################### Handlers ###################
get "/" do
- return {"marketplace" => @marketplace_client.name, "offerings" => @catalog}.to_json
+ return {"marketplace" => @marketplace_client.name, "offerings" => @catalog_in_marketplace}.to_json
end
# Provision a marketplace service
@@ -237,6 +272,34 @@ def on_exit(stop_event_loop=true)
#
helpers do
+ def get_proxied_services_from_cc
+ @logger.debug("Get proxied services from cloud_controller: #{@service_list_uri}")
+ services = {}
+ req = create_http_request( :head => @cc_req_hdrs )
+
+ f = Fiber.current
+ http = EM::HttpRequest.new(@service_list_uri).get(req)
+ http.callback { f.resume(http) }
+ http.errback { f.resume(http) }
+ Fiber.yield
+
+ if http.error.empty?
+ if http.response_header.status == 200
+ resp = VCAP::Services::Api::ListProxiedServicesResponse.decode(http.response)
+ resp.proxied_services.each {|bsvc|
+ @logger.info("Fetch #{@marketplace_client.name} service from CC: label=#{bsvc["label"]} - #{bsvc.inspect}")
+ services[bsvc["label"]] = bsvc
+ }
+ else
+ @logger.warn("Failed to fetch #{@marketplace_client.name} service from CC - status=#{http.response_header.status}")
+ end
+ else
+ @logger.warn("Failed to fetch #{@marketplace_client.name} service from CC: #{http.error}")
+ end
+
+ return services
+ end
+
def advertise_service_to_cc(offering)
@logger.debug("advertise service offering #{offering.inspect} to cloud_controller: #{@offering_uri}")
return false unless offering
@@ -3,14 +3,15 @@
require "json"
require "fiber"
+$:.unshift(File.dirname(__FILE__))
+require "appdirect_error"
+
module VCAP
module Services
module Marketplace
module Appdirect
class AppdirectHelper
- include VCAP::Services::Marketplace::Appdirect
-
OFFERINGS_PATH = "custom/cloudfoundry/v1/offerings"
SERVICES_PATH = "custom/cloudfoundry/v1/services"
@@ -28,6 +29,7 @@ def initialize(opts, logger)
raise ArgumentError, "Missing options: #{missing_opts.join(', ')}" unless missing_opts.empty?
@appdirect_endpoint = appdirect_config[:endpoint]
+ @disabled_service_ids = opts[:disabled_service_ids] || []
@consumer = OAuth::Consumer.new(appdirect_config[:key], appdirect_config[:secret])
@access_token = OAuth::AccessToken.new(@consumer)
@@ -44,15 +46,19 @@ def get_catalog
data.each do |service|
# Add checks for specific categories which determine whether the addon should be listed on cc
@logger.debug("Got service '#{service["id"]}' from AppDirect")
- catalog[service["id"]] = service
+ if (@disabled_service_ids.include?(service["id"]))
+ @logger.warn("Service Offering: #{service["id"]} disabled via config")
+ else
+ catalog["#{service["id"]}-#{service["version"]}"] = service
+ end
end
@logger.info("Got #{catalog.keys.count} services from AppDirect")
else
@logger.warn("Failed to get catalog #{http.response}")
end
else
@logger.warn("Failed to get catalog: #{http.error}")
- raise AppdirectError.new(AppDirectError::APPDIRECT_ERROR_GET_LISTING, http.response_header.status)
+ raise AppdirectError.new(AppdirectError::APPDIRECT_ERROR_GET_LISTING, http.response_header.status)
end
return catalog
end
@@ -72,11 +78,11 @@ def purchase_service(order)
# 500 if AppDirect has issues
# 503 if ISV is down
@logger.warn("Bad status code posting #{body} was #{http.response}")
- raise AppdirectError.new(AppDirectError::APPDIRECT_ERROR_PURCHASE, http.response_header.status)
+ raise AppdirectError.new(AppdirectError::APPDIRECT_ERROR_PURCHASE, http.response)
end
else
@logger.warn("Error raised: #{http.error}")
- raise AppdirectError.new(AppDirectError::APPDIRECT_ERROR_PURCHASE, http.response_header.status)
+ raise AppdirectError.new(AppdirectError::APPDIRECT_ERROR_PURCHASE, http.error.inspect)
end
else
@logger.error("Order is required to purchase a service")
@@ -96,11 +102,12 @@ def bind_service(order, order_id)
update_serv = JSON.parse(http.response)
@logger.debug("Bound service #{order_id}")
else
- raise AppdirectError.new(AppDirectError::APPDIRECT_ERROR_BIND, http.response_header.status)
+ @logger.warn("Bind request #{body} failed due to: #{http.response}")
+ raise AppdirectError.new(AppdirectError::APPDIRECT_ERROR_BIND, http.response)
end
else
@logger.warn("Error raised: #{http.error}")
- raise AppdirectError.new(AppDirectError::APPDIRECT_ERROR_BIND, http.response_header.status)
+ raise AppdirectError.new(AppdirectError::APPDIRECT_ERROR_BIND, http.error.inspect)
end
else
@logger.error("Order and Order Id are required to cancel a service")
@@ -117,12 +124,12 @@ def unbind_service(order_id, binding_id)
if http.response_header.status >= 200 and http.response_header.status < 300
update_binding = true
else
- @logger.warn("Invalid status code returned: #{http.response_header.status}")
- raise AppdirectError.new(AppDirectError::APPDIRECT_ERROR_UNBIND, http.response_header.status)
+ @logger.warn("Invalid status code returned: #{http.response}")
+ raise AppdirectError.new(AppdirectError::APPDIRECT_ERROR_UNBIND, http.response)
end
else
@logger.warn("Error raised: #{http.error}")
- raise AppdirectError.new(AppDirectError::APPDIRECT_ERROR_UNBIND, http.response_header.status)
+ raise AppdirectError.new(AppdirectError::APPDIRECT_ERROR_UNBIND, http.error.inspect)
end
else
@logger.error("Binding Id and Order Id are required to cancel a service")
@@ -138,8 +145,8 @@ def cancel_service(order_id)
@logger.debug("Deleted #{order_id}")
cancel_serv = true
else
- @logger.warn("Invalid status code returned: #{http.response_header.status}")
- raise AppdirectError.new(AppDirectError::APPDIRECT_ERROR_CANCEL, http.response_header.status)
+ @logger.warn("Invalid status code returned: #{http.response}")
+ raise AppdirectError.new(AppdirectError::APPDIRECT_ERROR_CANCEL, http.response)
end
else
@logger.error("Order Id is required to cancel a service")
@@ -18,9 +18,10 @@ def initialize(opts)
super(opts)
@logger = opts[:logger]
- @external_uri = "http://#{opts[:external_uri]}"
+ @external_uri = opts[:external_uri]
@node_timeout = opts[:node_timeout]
@acls = opts[:acls]
+ @users = opts[:users] || []
@helper = AppdirectHelper.new(opts, @logger)
end
@@ -41,15 +42,16 @@ def generate_cc_advertise_request(name, bsvc, active = true)
req[:supported_versions] = [ bsvc["version"] ]
req[:version_aliases] = { "current" => bsvc["version"] }
+ req[:acls] = {}
+ req[:acls][:wildcards] = @acls
+
+ users_acl = @users.dup
if bsvc["developers"] and bsvc["developers"].count > 0
- acls = []
bsvc["developers"].each do |dev|
- acls << dev["email"]
+ users_acl << dev["email"]
end
- req[:acls] = {}
- req[:acls][:wildcards] = @acls
- req[:acls][:users] = acls
end
+ req[:acls][:users] = users_acl
req[:url] = @external_uri
@@ -177,6 +179,11 @@ def unbind_service(service_id, binding_id)
end
end
end
+
+ def fmt_error(e)
+ "#{e} [#{e.backtrace.join("|")}]"
+ end
+
end
end
end

0 comments on commit db90798

Please sign in to comment.