Permalink
Browse files

[cc] simplify service gateway client

- CC use unified interface to talk to gateway.
- gateway client returns none 200 response to CC.

Change-Id: I958998608b1475c90e929e590a8a267bf7a90164
  • Loading branch information...
1 parent 0492f3d commit 9e0cf5efdbdcedd7a355223d0312a4ce93bc1351 @andl andl committed Mar 31, 2012
View
20 cloud_controller/app/controllers/services_controller.rb
@@ -9,7 +9,7 @@ class ServicesController < ApplicationController
before_filter :validate_content_type
before_filter :require_service_auth_token, :only => [:create, :delete, :update_handle, :list_handles, :list_brokered_services]
before_filter :require_user, :only => [:provision, :bind, :bind_external, :unbind, :unprovision,
- :create_snapshot, :enum_snapshots, :snapshot_details,:rollback_snapshot,
+ :create_snapshot, :enum_snapshots, :snapshot_details,:rollback_snapshot, :delete_snapshot,
:serialized_url, :import_from_url, :import_from_data, :job_info]
before_filter :require_lifecycle_extension, :only => [:create_snapshot, :enum_snapshots, :snapshot_details,:rollback_snapshot, :delete_snapshot,
:serialized_url, :import_from_url, :import_from_data, :job_info]
@@ -192,7 +192,7 @@ def create_snapshot
result = cfg.create_snapshot
- render :json => result
+ render :json => result.extract
end
# Enumerate all snapshots of the given instance
@@ -204,7 +204,7 @@ def enum_snapshots
result = cfg.enum_snapshots
- render :json => result
+ render :json => result.extract
end
# Get snapshot detail information
@@ -216,7 +216,7 @@ def snapshot_details
result = cfg.snapshot_details params['sid']
- render :json => result
+ render :json => result.extract
end
# Rollback to a snapshot
@@ -228,7 +228,7 @@ def rollback_snapshot
result = cfg.rollback_snapshot params['sid']
- render :json => result
+ render :json => result.extract
end
# Delete a snapshot
@@ -240,7 +240,7 @@ def delete_snapshot
result = cfg.delete_snapshot params['sid']
- render :json => result
+ render :json => result.extract
end
# Get the url to download serialized data for an instance
@@ -252,7 +252,7 @@ def serialized_url
result = cfg.serialized_url
- render :json => result
+ render :json => result.extract
end
# import serialized data to an instance from url
@@ -266,7 +266,7 @@ def import_from_url
result = cfg.import_from_url req
- render :json => result
+ render :json => result.extract
end
# import serialized data to an instance from request data
@@ -284,7 +284,7 @@ def import_from_data
result = cfg.import_from_data req
- render :json => result
+ render :json => result.extract
end
# Get job information
@@ -296,7 +296,7 @@ def job_info
result = cfg.job_info params['job_id']
- render :json => result
+ render :json => result.extract
end
# Binds a provisioned instance to an app
View
20 cloud_controller/app/helpers/services_helper.rb
@@ -3,24 +3,4 @@ module ServicesHelper
def validate_content_type
raise CloudError.new(CloudError::BAD_REQUEST) unless request.env['CONTENT_TYPE'] == Mime::JSON
end
-
- def gateway_client(svc)
- uri = URI.parse(svc.url)
- VCAP::Services::Api::ServiceGatewayClient.new(uri.host, svc.token, uri.port)
- end
-
- def gateway_request(&blk)
- begin
- yield
- rescue VCAP::Services::Api::ServiceGatewayClient::UnexpectedResponse, \
- SocketError, \
- Errno::ECONNREFUSED, \
- Errno::ECONNRESET, \
- Errno::ETIMEDOUT => e
- CloudController.logger.error "Error talking to gateway: #{e.to_s}"
- CloudController.logger.error e
- raise CloudError.new(CloudError::SERVICE_GATEWAY_ERROR)
- end
- end
-
end
View
35 cloud_controller/app/models/app.rb
@@ -225,21 +225,8 @@ def bind_to_config(cfg, binding_options={})
:binding_options => binding_options
)
- if EM.reactor_running?
- # yields
- endpoint = "#{svc.url}/gateway/v1/configurations/#{req.service_id}/handles"
- http = VCAP::Services::Api::AsyncHttpRequest.fibered(endpoint, svc.token, :post, svc.timeout, req)
- if !http.error.empty?
- raise "Error sending bind request #{req.extract.inspect} to gateway #{svc.url}: #{http.error}"
- elsif http.response_header.status != 200
- raise "Error sending bind request #{req.extract.inspect}, non 200 response from gateway #{svc.url}: #{http.response_header.status} #{http.response}"
- end
- handle = VCAP::Services::Api::GatewayBindResponse.decode(http.response)
- else
- uri = URI.parse(svc.url)
- gw = VCAP::Services::Api::ServiceGatewayClient.new(uri.host, svc.token, uri.port)
- handle = gw.bind(req.extract)
- end
+ client = VCAP::Services::Api::ServiceGatewayClient.new(svc.url, svc.token, svc.timeout)
+ handle = client.bind(req.extract)
rescue => e
CloudController.logger.error("Exception talking to gateway: #{e}")
CloudController.logger.error(e)
@@ -300,22 +287,8 @@ def unbind_from_config(cfg)
binding.destroy
begin
- if EM.reactor_running?
- endpoint = "#{svc.url}/gateway/v1/configurations/#{req.service_id}/handles/#{req.handle_id}"
- http = VCAP::Services::Api::AsyncHttpRequest.new(endpoint, svc.token, :delete, svc.timeout, req)
- http.callback do
- if http.response_header.status != 200
- CloudController.logger.error("Error sending unbind request #{req.extract.to_json} non 200 response from gateway #{svc.url}: #{http.response_header.status} #{http.response}")
- end
- end
- http.errback do
- CloudController.logger.error("Error sending unbind request #{req.extract.to_json} to gateway #{svc.url}: #{http.error}")
- end
- else
- uri = URI.parse(svc.url)
- gw = VCAP::Services::Api::ServiceGatewayClient.new(uri.host, svc.token, uri.port)
- gw.unbind(req.extract)
- end
+ client = VCAP::Services::Api::ServiceGatewayClient.new(svc.url, svc.token, svc.timeout)
+ client.unbind(req.extract)
rescue => e
tok.destroy
CloudController.logger.error("Error talking to service gateway (svc.url): #{e.to_s}")
View
134 cloud_controller/app/models/service_config.rb
@@ -40,21 +40,8 @@ def self.provision(service, user, cfg_alias, plan, plan_option)
:plan_option => plan_option
)
- if EM.reactor_running?
- # yields
- endpoint = "#{service.url}/gateway/v1/configurations"
- http = VCAP::Services::Api::AsyncHttpRequest.fibered(endpoint, service.token, :post, service.timeout, req)
- if !http.error.empty?
- raise "Error sending provision request for #{req.extract.to_json} to gateway #{service.url}: #{http.error}"
- elsif http.response_header.status != 200
- raise "Error sending provision request for #{req.extract.to_json}: non 200 response from gateway #{service.url}: #{http.response_header.status} #{http.response}"
- end
- config = VCAP::Services::Api::GatewayProvisionResponse.decode(http.response)
- else
- uri = URI.parse(service.url)
- gw = VCAP::Services::Api::ServiceGatewayClient.new(uri.host, service.token, uri.port)
- config = gw.provision(req.extract)
- end
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ config = client.provision req.extract
rescue => e
CloudController.logger.error("Error talking to gateway: #{e}")
CloudController.logger.error(e)
@@ -93,109 +80,86 @@ def unprovision
destroy
begin
- if EM.reactor_running?
- endpoint = "#{svc.url}/gateway/v1/configurations/#{cfg_name}"
- http = VCAP::Services::Api::AsyncHttpRequest.new(endpoint, service.token, :delete, service.timeout)
- http.callback do
- if http.response_header.status != 200
- CloudController.logger.error("Error unprovisioning #{cfg_name}, non 200 response from gateway #{svc.url}: #{http.response_header.status} #{http.response}")
- end
- end
- http.errback do
- CloudController.logger.error("Error unprovisioning #{cfg_name} at gateway #{svc.url}: #{http.error}")
- end
- else
- uri = URI.parse(svc.url)
- gw = VCAP::Services::Api::ServiceGatewayClient.new(uri.host, svc.token, uri.port)
- gw.unprovision(:service_id => cfg_name)
- end
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.unprovision(:service_id => cfg_name)
rescue => e
CloudController.logger.error("Error talking to gateway: #{e}")
CloudController.logger.error(e)
raise CloudError.new(CloudError::SERVICE_GATEWAY_ERROR)
end
end
+ def handle_lifecycle_error(e)
+ CloudController.logger.error("Error talking to gateway: #{e}")
+ CloudController.logger.error(e)
+ if e.is_a? VCAP::Services::Api::ServiceGatewayClient::NotFoundResponse
+ raise CloudError.new([e.error.code, CloudError::HTTP_NOT_FOUND, e.error.description])
+ else
+ raise CloudError.new(CloudError::SERVICE_GATEWAY_ERROR)
+ end
+ end
+
def create_snapshot
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/snapshots"
- result = perform_gateway_request(:create_snapshot, endpoint, service.token, :post, service.timeout, VCAP::Services::Api::Job, empty_msg_class, :service_id => name)
- result
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.create_snapshot(:service_id => name)
+ rescue => e
+ handle_lifecycle_error(e)
end
def enum_snapshots
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/snapshots"
- result = perform_gateway_request(:enum_snapshots, endpoint, service.token, :get, service.timeout, VCAP::Services::Api::SnapshotList, empty_msg_class, :service_id => name)
- result
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.enum_snapshots(:service_id => name)
+ rescue => e
+ handle_lifecycle_error(e)
end
def snapshot_details(sid)
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/snapshots/#{sid}"
- result = perform_gateway_request(:snapshot_details, endpoint, service.token, :get, service.timeout, VCAP::Services::Api::Snapshot, empty_msg_class, :service_id => name, :snapshot_id => sid)
- result
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.snapshot_details(:service_id => name, :snapshot_id => sid)
+ rescue => e
+ handle_lifecycle_error(e)
end
def rollback_snapshot(sid)
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/snapshots/#{sid}"
- result = perform_gateway_request(:rollback_snapshot, endpoint, service.token, :put, service.timeout, VCAP::Services::Api::Job, empty_msg_class, :service_id => name, :snapshot_id => sid)
- result
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.rollback_snapshot(:service_id => name, :snapshot_id => sid)
+ rescue => e
+ handle_lifecycle_error(e)
end
def delete_snapshot(sid)
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/snapshots/#{sid}"
- result = perform_gateway_request(:delete_snapshot, endpoint, service.token, :delete, service.timeout, VCAP::Services::Api::Job, empty_msg_class, :service_id => name, :snapshot_id => sid)
- result
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.delete_snapshot(:service_id => name, :snapshot_id => sid)
+ rescue => e
+ handle_lifecycle_error(e)
end
def serialized_url
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/serialized/url"
- result = perform_gateway_request(:serialized_url, endpoint, service.token, :get, service.timeout, VCAP::Services::Api::Job, empty_msg_class, :service_id => name)
- result
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.serialized_url(:service_id => name)
+ rescue => e
+ handle_lifecycle_error(e)
end
def import_from_url req
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/serialized/url"
- result = perform_gateway_request(:import_from_url, endpoint, service.token, :put, service.timeout, VCAP::Services::Api::Job, req, :service_id => name, :msg => req)
- result
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.import_from_url(:service_id => name, :msg => req)
+ rescue => e
+ handle_lifecycle_error(e)
end
def import_from_data req
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/serialized/data"
- result = perform_gateway_request(:import_from_data, endpoint, service.token, :put, service.timeout, VCAP::Services::Api::Job, req, :service_id => name, :msg => req)
- result
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.import_from_data(:service_id => name, :msg => req)
+ rescue => e
+ handle_lifecycle_error(e)
end
def job_info job_id
- endpoint = "#{service.url}/gateway/v1/configurations/#{name}/jobs/#{job_id}"
- result = perform_gateway_request(:job_info, endpoint, service.token, :get, service.timeout, VCAP::Services::Api::Job, empty_msg_class, :service_id => name, :job_id => job_id)
- result
- end
-
- def empty_msg_class
- VCAP::Services::Api::EMPTY_REQUEST
- end
-
- # Perform gateway request and decode request to object
- #
- def perform_gateway_request(action, endpoint, token, http_method, timeout, decoder_class, msg, opts={})
- result = nil
- if EM.reactor_running?
- http = VCAP::Services::Api::AsyncHttpRequest.fibered(endpoint, token, http_method, timeout, msg)
- if !http.error.empty?
- raise "Error sending #{action} request for #{name} to gateway #{service.url}: #{http.error}"
- elsif http.response_header.status != 200
- raise "Error sending #{action} request for #{name}: non 200 response from gateway #{service.url}: #{http.response_header.status} #{http.response}"
- end
- result = decoder_class.decode(http.response)
- else
- uri = URI.parse(endpoint)
- gw = VCAP::Services::Api::ServiceGatewayClient.new(uri.host, token, uri.port)
- result = gw.send(action, opts)
- end
- result.extract
+ client = VCAP::Services::Api::ServiceGatewayClient.new(service.url, service.token, service.timeout)
+ client.job_info(:service_id => name, :job_id => job_id)
rescue => e
- CloudController.logger.error("Error talking to gateway: #{e}")
- CloudController.logger.error(e)
- raise CloudError.new(CloudError::SERVICE_GATEWAY_ERROR)
+ handle_lifecycle_error(e)
end
def provisioned_by?(user)
View
281 cloud_controller/spec/controllers/services_controller_spec.rb
@@ -661,6 +661,7 @@ def unbind_instance(service_id, handle_id, binding_options)
end
describe "#lifecycle_extension" do
+
it 'should return not implemented error when lifecycle is disabled' do
begin
origin = AppConfig.delete :service_lifecycle
@@ -671,7 +672,7 @@ def unbind_instance(service_id, handle_id, binding_options)
resp['description'].include?("not implemented").should == true
end
- %w(snapshot_details rollback_snapshot).each do |api|
+ %w(snapshot_details rollback_snapshot delete_snapshot).each do |api|
post api.to_sym, :id => 'xxx', :sid => '1'
response.status.should == 501
resp = Yajl::Parser.parse(response.body)
@@ -689,6 +690,12 @@ def unbind_instance(service_id, handle_id, binding_options)
end
describe "#create_snapshot" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
it 'should return not authorized for unknown users' do
request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
@@ -700,9 +707,32 @@ def unbind_instance(service_id, handle_id, binding_options)
post :create_snapshot, :id => 'xxx'
response.status.should == 404
end
+
+ it 'should create a snapshot job' do
+ job = VCAP::Services::Api::Job.decode(
+ {
+ :job_id => "abc",
+ :status => "queued",
+ :start_time => "1"
+ }.to_json
+ )
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:create_snapshot).with(:service_id => @cfg.name).returns job
+
+ post :create_snapshot, :id => @cfg.name
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["job_id"].should == "abc"
+ end
+
end
describe "#enum_snapshots" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
it 'should return not authorized for unknown users' do
request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
@@ -714,9 +744,30 @@ def unbind_instance(service_id, handle_id, binding_options)
get :enum_snapshots, :id => 'xxx'
response.status.should == 404
end
+
+ it 'should enum snapshots' do
+ snapshots = VCAP::Services::Api::SnapshotList.decode(
+ {
+ :snapshots => [{:snapshot_id => "abc"}]
+ }.to_json
+ )
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:enum_snapshots).with(:service_id => @cfg.name).returns snapshots
+
+ post :enum_snapshots, :id => @cfg.name
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["snapshots"].size.should == 1
+ resp["snapshots"][0]["snapshot_id"].should == "abc"
+ end
end
describe "#snapshot_details" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
it 'should return not authorized for unknown users' do
request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
@@ -728,9 +779,44 @@ def unbind_instance(service_id, handle_id, binding_options)
get :snapshot_details, :id => 'xxx', :sid => 'yyy'
response.status.should == 404
end
+
+ it 'should get snapshot_details' do
+ snapshot = VCAP::Services::Api::Snapshot.decode(
+ {
+ :snapshot_id => "abc",
+ :date => "1",
+ :size => 123
+ }.to_json
+ )
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:snapshot_details).with(:service_id => @cfg.name, :snapshot_id => snapshot.snapshot_id).returns snapshot
+
+ get :snapshot_details, :id => @cfg.name, :sid => snapshot.snapshot_id
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["snapshot_id"].should == "abc"
+ end
+
+ it "should handle not found error in snapshot details" do
+ err = VCAP::Services::Api::ServiceGatewayClient::NotFoundResponse.new(
+ VCAP::Services::Api::ServiceErrorResponse.decode(
+ {:code => 10000, :description => "not found"}.to_json
+ )
+ )
+ snapshot_id = "abc"
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:snapshot_details).with(:service_id => @cfg.name, :snapshot_id => snapshot_id).raises err
+
+ get :snapshot_details, :id => @cfg.name, :sid => snapshot_id
+ response.status.should == 404
+ end
end
describe "#rollback_snapshot" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
it 'should return not authorized for unknown users' do
request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
@@ -739,12 +825,99 @@ def unbind_instance(service_id, handle_id, binding_options)
end
it 'should return not found for unknown ids' do
- put :snapshot_details, :id => 'xxx' , :sid => 'yyy'
+ put :rollback_snapshot, :id => 'xxx' , :sid => 'yyy'
+ response.status.should == 404
+ end
+
+ it 'should rollback a snapshot' do
+ job = VCAP::Services::Api::Job.decode(
+ {
+ :job_id => "abc",
+ :status => "queued",
+ :start_time => "1"
+ }.to_json
+ )
+ snapshot_id = "abc"
+
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:rollback_snapshot).with(:service_id => @cfg.name, :snapshot_id => snapshot_id).returns job
+
+ put :rollback_snapshot, :id => @cfg.name, :sid => snapshot_id
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["job_id"].should == "abc"
+ end
+
+ it "should handle not found error in rollback snapshot" do
+ err = VCAP::Services::Api::ServiceGatewayClient::NotFoundResponse.new(
+ VCAP::Services::Api::ServiceErrorResponse.decode(
+ {:code => 10000, :description => "not found"}.to_json
+ )
+ )
+ snapshot_id = "abc"
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:rollback_snapshot).with(:service_id => @cfg.name, :snapshot_id => snapshot_id).raises err
+
+ put :rollback_snapshot, :id => @cfg.name, :sid => snapshot_id
+ response.status.should == 404
+ end
+ end
+
+ describe "#delete_snapshot" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
+
+ it 'should return not authorized for unknown users' do
+ request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
+ delete :delete_snapshot, :id => 'xxx', :sid => 'yyy'
+ response.status.should == 403
+ end
+
+ it 'should return not found for unknown ids' do
+ delete :delete_snapshot, :id => 'xxx' , :sid => 'yyy'
+ response.status.should == 404
+ end
+
+ it 'should delete a snapshot' do
+ job = VCAP::Services::Api::Job.decode(
+ {
+ :job_id => "abc",
+ :status => "queued",
+ :start_time => "1"
+ }.to_json
+ )
+ snapshot_id = "abc"
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:delete_snapshot).with(:service_id => @cfg.name, :snapshot_id => snapshot_id).returns job
+
+ delete :delete_snapshot, :id => @cfg.name, :sid => snapshot_id
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["job_id"].should == "abc"
+ end
+
+ it "should handle not found error in delete snapshot" do
+ err = VCAP::Services::Api::ServiceGatewayClient::NotFoundResponse.new(
+ VCAP::Services::Api::ServiceErrorResponse.decode(
+ {:code => 10000, :description => "not found"}.to_json
+ )
+ )
+ snapshot_id = "abc"
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:delete_snapshot).with(:service_id => @cfg.name, :snapshot_id => snapshot_id).raises err
+
+ delete :delete_snapshot, :id => @cfg.name, :sid => snapshot_id
response.status.should == 404
end
end
describe "#serialized_url" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
it 'should return not authorized for unknown users' do
request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
@@ -756,9 +929,31 @@ def unbind_instance(service_id, handle_id, binding_options)
get :serialized_url, :id => 'xxx'
response.status.should == 404
end
+
+ it 'should create serialized url job' do
+ job = VCAP::Services::Api::Job.decode(
+ {
+ :job_id => "abc",
+ :status => "queued",
+ :start_time => "1"
+ }.to_json
+ )
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:serialized_url).with(:service_id => @cfg.name).returns job
+
+ get :serialized_url, :id => @cfg.name
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["job_id"].should == "abc"
+ end
end
describe "#import_from_url" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
it 'should return not authorized for unknown users' do
request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
@@ -780,9 +975,36 @@ def unbind_instance(service_id, handle_id, binding_options)
end
response.status.should == 400
end
+
+ it 'should create import from url job' do
+ job = VCAP::Services::Api::Job.decode(
+ {
+ :job_id => "abc",
+ :status => "queued",
+ :start_time => "1"
+ }.to_json
+ )
+ url = "http://api.cloudfoundry.com"
+
+ req = VCAP::Services::Api::SerializedURL.new(:url => url)
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:import_from_url).with(anything).returns job
+
+ put_msg :import_from_url, :id => @cfg.name do
+ req
+ end
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["job_id"].should == "abc"
+ end
end
describe "#import_from_data" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
it 'should return not authorized for unknown users' do
request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
@@ -796,9 +1018,34 @@ def unbind_instance(service_id, handle_id, binding_options)
end
response.status.should == 404
end
+
+ it 'should create import from data job' do
+ job = VCAP::Services::Api::Job.decode(
+ {
+ :job_id => "abc",
+ :status => "queued",
+ :start_time => "1"
+ }.to_json
+ )
+ req = VCAP::Services::Api::SerializedData.new(:data => 'raw_data' )
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:import_from_data).with(anything).returns job
+
+ put_msg :import_from_data, :id => @cfg.name do
+ req
+ end
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["job_id"].should == "abc"
+ end
end
describe "#job_info" do
+ before :each do
+ cfg = ServiceConfig.new(:name => 'lifecycle', :alias => 'bar', :service => @svc, :user => @user)
+ cfg.save
+ cfg.should be_valid
+ @cfg = cfg
+ end
it 'should return not authorized for unknown users' do
request.env['HTTP_AUTHORIZATION'] = UserToken.create('bar@foo.com').encode
@@ -810,6 +1057,36 @@ def unbind_instance(service_id, handle_id, binding_options)
get :job_info, :id => 'xxx' , :job_id => 'yyy'
response.status.should == 404
end
+
+ it 'should return job_info' do
+ job_id = "job1"
+ job = VCAP::Services::Api::Job.decode(
+ {
+ :job_id => job_id,
+ :status => "queued",
+ :start_time => "1"
+ }.to_json
+ )
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:job_info).with(:service_id => @cfg.name, :job_id => job_id).returns job
+
+ get :job_info, :id => @cfg.name, :job_id => job_id
+ response.status.should == 200
+ resp = Yajl::Parser.parse(response.body)
+ resp["job_id"].should == job_id
+ end
+
+ it "should handle not found error in get job_info" do
+ err = VCAP::Services::Api::ServiceGatewayClient::NotFoundResponse.new(
+ VCAP::Services::Api::ServiceErrorResponse.decode(
+ {:code => 10000, :description => "job not found"}.to_json
+ )
+ )
+ job_id = "job1"
+ VCAP::Services::Api::ServiceGatewayClient.any_instance.stubs(:job_info).with(:service_id => @cfg.name, :job_id => job_id).raises err
+
+ get :job_info, :id => @cfg.name, :job_id => job_id
+ response.status.should == 404
+ end
end
end
View
BIN cloud_controller/vendor/cache/vcap_common-1.0.10.gem
Binary file not shown.
View
221 common/lib/services/api/clients/service_gateway_client.rb
@@ -1,5 +1,6 @@
# Copyright (c) 2009-2011 VMware, Inc.
require 'net/http'
+require 'uri'
require 'services/api/const'
require 'services/api/messages'
@@ -11,102 +12,166 @@ module Api
end
end
-class VCAP::Services::Api::ServiceGatewayClient
-
- class UnexpectedResponse < StandardError
- attr_reader :response
+module VCAP::Services::Api
+ class ServiceGatewayClient
+ METHODS_MAP = {
+ :get => Net::HTTP::Get,
+ :post=> Net::HTTP::Post,
+ :put => Net::HTTP::Put,
+ :delete => Net::HTTP::Delete,
+ }
- def initialize(resp)
- @response = resp
+ # Public: Indicate gateway client encounter an unexpcted error,
+ # such as can't connect to gateway or can't decode response.
+ #
+ class UnexpectedResponse < StandardError; end
+
+ # Pubilc: Indicate an error response from gateway
+ #
+ class ErrorResponse < StandardError
+ attr_reader :status, :error
+
+ # status - the http status
+ # error - a ServiceErrorResponse object
+ #
+ def initialize(status, error)
+ @status = status
+ @error = error
+ end
+
+ def to_s
+ "Reponse status:#{status},error:[#{error.extract}]"
+ end
end
- end
- attr_reader :host, :port, :token
+ class NotFoundResponse < ErrorResponse
+ def initialize(error)
+ super(404, error)
+ end
+ end
- def initialize(host, token, port=80)
- @host = host
- @port = port
- @token = token
- @hdrs = {
- 'Content-Type' => 'application/json',
- VCAP::Services::Api::GATEWAY_TOKEN_HEADER => @token
- }
- end
+ class GatewayInternalResponse < ErrorResponse
+ def initialize(error)
+ super(503, error)
+ end
+ end
- def provision(args)
- msg = VCAP::Services::Api::GatewayProvisionRequest.new(args)
- resp = perform_request(Net::HTTP::Post, '/gateway/v1/configurations', msg)
- VCAP::Services::Api::GatewayProvisionResponse.decode(resp.body)
- end
+ attr_reader :host, :port, :token
+ def initialize(url, token, timeout, opts={})
+ @url = url
+ @timeout = timeout
+ @token = token
+ @hdrs = {
+ 'Content-Type' => 'application/json',
+ GATEWAY_TOKEN_HEADER => @token
+ }
+ end
- def unprovision(args)
- perform_request(Net::HTTP::Delete, "/gateway/v1/configurations/#{args[:service_id]}")
- end
+ def provision(args)
+ msg = GatewayProvisionRequest.new(args)
+ resp = perform_request(:post, '/gateway/v1/configurations', msg)
+ GatewayProvisionResponse.decode(resp)
+ end
- def create_snapshot(args)
- resp = perform_request(Net::HTTP::Post, "/gateway/v1/configurations/#{args[:service_id]}/snapshots")
- VCAP::Services::Api::Job.decode(resp.body)
- end
+ def unprovision(args)
+ resp = perform_request(:delete, "/gateway/v1/configurations/#{args[:service_id]}")
+ EMPTY_REQUEST
+ end
- def enum_snapshots(args)
- resp = perform_request(Net::HTTP::Get, "/gateway/v1/configurations/#{args[:service_id]}/snapshots")
- VCAP::Services::Api::SnapshotList.decode(resp.body)
- end
+ def create_snapshot(args)
+ resp = perform_request(:post, "/gateway/v1/configurations/#{args[:service_id]}/snapshots")
+ Job.decode(resp)
+ end
- def snapshot_details(args)
- resp = perform_request(Net::HTTP::Get, "/gateway/v1/configurations/#{args[:service_id]}/snapshots/#{args[:snapshot_id]}")
- VCAP::Services::Api::Snapshot.decode(resp.body)
- end
+ def enum_snapshots(args)
+ resp = perform_request(:get, "/gateway/v1/configurations/#{args[:service_id]}/snapshots")
+ SnapshotList.decode(resp)
+ end
- def rollback_snapshot(args)
- resp = perform_request(Net::HTTP::Put, "/gateway/v1/configurations/#{args[:service_id]}/snapshots/#{args[:snapshot_id]}")
- VCAP::Services::Api::Job.decode(resp.body)
- end
+ def snapshot_details(args)
+ resp = perform_request(:get, "/gateway/v1/configurations/#{args[:service_id]}/snapshots/#{args[:snapshot_id]}")
+ Snapshot.decode(resp)
+ end
- def delete_snapshot(args)
- resp = perform_request(Net::HTTP::Delete, "/gateway/v1/configurations/#{args[:service_id]}/snapshots/#{args[:snapshot_id]}")
- VCAP::Services::Api::Job.decode(resp.body)
- end
+ def rollback_snapshot(args)
+ resp = perform_request(:put, "/gateway/v1/configurations/#{args[:service_id]}/snapshots/#{args[:snapshot_id]}")
+ Job.decode(resp)
+ end
- def serialized_url(args)
- resp = perform_request(Net::HTTP::Get, "/gateway/v1/configurations/#{args[:service_id]}/serialized/url")
- VCAP::Services::Api::Job.decode(resp.body)
- end
+ def delete_snapshot(args)
+ resp = perform_request(:delete, "/gateway/v1/configurations/#{args[:service_id]}/snapshots/#{args[:snapshot_id]}")
+ Job.decode(resp)
+ end
- def import_from_url(args)
- resp = perform_request(Net::HTTP::Put, "/gateway/v1/configurations/#{args[:service_id]}/serialized/url", args[:msg])
- VCAP::Services::Api::Job.decode(resp.body)
- end
+ def serialized_url(args)
+ resp = perform_request(:get, "/gateway/v1/configurations/#{args[:service_id]}/serialized/url")
+ Job.decode(resp)
+ end
- def import_from_data(args)
- resp = perform_request(Net::HTTP::Put, "/gateway/v1/configurations/#{args[:service_id]}/serialized/data", args[:msg])
- VCAP::Services::Api::Job.decode(resp.body)
- end
+ def import_from_url(args)
+ resp = perform_request(:put, "/gateway/v1/configurations/#{args[:service_id]}/serialized/url", args[:msg])
+ Job.decode(resp)
+ end
- def job_info(args)
- resp = perform_request(Net::HTTP::Get, "/gateway/v1/configurations/#{args[:service_id]}/jobs/#{args[:job_id]}")
- VCAP::Services::Api::Job.decode(resp.body)
- end
+ def import_from_data(args)
+ resp = perform_request(:put, "/gateway/v1/configurations/#{args[:service_id]}/serialized/data", args[:msg])
+ Job.decode(resp)
+ end
- def bind(args)
- msg = VCAP::Services::Api::GatewayBindRequest.new(args)
- resp = perform_request(Net::HTTP::Post, "/gateway/v1/configurations/#{msg.service_id}/handles", msg)
- VCAP::Services::Api::GatewayBindResponse.decode(resp.body)
- end
+ def job_info(args)
+ resp = perform_request(:get, "/gateway/v1/configurations/#{args[:service_id]}/jobs/#{args[:job_id]}")
+ Job.decode(resp)
+ end
- def unbind(args)
- msg = VCAP::Services::Api::GatewayUnbindRequest.new(args)
- perform_request(Net::HTTP::Delete, "/gateway/v1/configurations/#{msg.service_id}/handles/#{msg.handle_id}", msg)
- end
+ def bind(args)
+ msg = GatewayBindRequest.new(args)
+ resp = perform_request(:post, "/gateway/v1/configurations/#{msg.service_id}/handles", msg)
+ GatewayBindResponse.decode(resp)
+ end
- protected
+ def unbind(args)
+ msg = GatewayUnbindRequest.new(args)
+ perform_request(:delete, "/gateway/v1/configurations/#{msg.service_id}/handles/#{msg.handle_id}", msg)
+ EMPTY_REQUEST
+ end
- def perform_request(klass, path, msg=VCAP::Services::Api::EMPTY_REQUEST)
- req = klass.new(path, initheader=@hdrs)
- req.body = msg.encode
- resp = Net::HTTP.new(@host, @port).start {|http| http.request(req)}
- raise UnexpectedResponse, resp unless resp.is_a? Net::HTTPOK
- resp
+ protected
+
+ def perform_request(http_method, path, msg=VCAP::Services::Api::EMPTY_REQUEST)
+ result = nil
+ uri = URI.parse(@url)
+ if EM.reactor_running?
+ url = uri.merge!(path)
+ http = AsyncHttpRequest.fibered(url, @token, http_method, @timeout, msg)
+ raise UnexpectedResponse, "Error sending request #{msg.extract.to_json} to gateway #{@url}: #{http.error}" unless http.error.empty?
+ code = http.response_header.status.to_i
+ body = http.response
+ else
+ klass = METHODS_MAP[http_method]
+ req = klass.new(path, initheader=@hdrs)
+ req.body = msg.encode
+ resp = Net::HTTP.new(uri.host, uri.port).start {|http| http.request(req)}
+ code = resp.code.to_i
+ body = resp.body
+ end
+ case code
+ when 200
+ body
+ when 404
+ err = ServiceErrorResponse.decode(body)
+ raise NotFoundResponse.new(err)
+ when 503
+ err = ServiceErrorResponse.decode(body)
+ raise GatewayInternalResponse.new(err)
+ else
+ begin
+ # try to decode the response
+ err = ServiceErrorResponse.decode(body)
+ raise ErrorResponse.new(code, err)
+ rescue => e
+ raise UnexpectedResponse, "Can't decode gateway response. status code:#{code}, response body:#{body}"
+ end
+ end
+ end
end
-
end
View
5 common/lib/services/api/messages.rb
@@ -148,6 +148,11 @@ class SerializedURL < JsonMessage
class SerializedData < JsonMessage
required :data, String
end
+
+ class ServiceErrorResponse < JsonMessage
+ required :code, Integer
+ required :description, String
+ end
end
end
end
View
2 common/spec/spec_helper.rb
@@ -13,6 +13,8 @@
require "vcap/config"
require "vcap/priority_queue"
require 'vcap/quota'
+require 'services/api/clients/service_gateway_client'
+require 'services/api/async_requests'
require 'benchmark'
RSpec::Matchers.define :take_less_than do |n|
View
81 common/spec/unit/service_gateway_client_spec.rb
@@ -0,0 +1,81 @@
+# Copyright (c) 2009-2012 VMware, Inc.
+require 'spec_helper'
+
+module VCAP::Services::Api
+ class ServiceGatewayClient
+ public :perform_request
+ end
+end
+describe VCAP::Services::Api::ServiceGatewayClient do
+ describe '#perform_request' do
+ before :all do
+ @url = "http://localhost"
+ @token = "mytoken"
+ @timeout = 10
+ end
+
+ it "should use async http client when EM is running" do
+ client = VCAP::Services::Api::ServiceGatewayClient.new(@url, @token, @timeout)
+ EM.should_receive(:reactor_running?).and_return true
+
+ path = "/path1"
+ resp = mock("resq")
+ message = "data"
+ resp.should_receive(:response).and_return(message)
+ resp.should_receive(:error).and_return []
+
+ resp_header = mock("resq_header")
+ resp_header.should_receive(:status).and_return(200)
+ resp.should_receive(:response_header).and_return resp_header
+ http_method = :get
+
+ VCAP::Services::Api::AsyncHttpRequest.should_receive(:fibered).with(anything, @token, http_method, @timeout, anything).and_return resp
+
+ result = client.perform_request(http_method, path)
+ result.should == message
+ end
+
+ it "should use net/http client when EM is not running" do
+ client = VCAP::Services::Api::ServiceGatewayClient.new(@url, @token, @timeout)
+ EM.should_receive(:reactor_running?).and_return nil
+
+ path = "/path1"
+ resp = mock("resq")
+ message = "data"
+ resp.should_receive(:body).and_return(message)
+ resp.should_receive(:code).and_return 200
+ resp.should_receive(:start).and_return resp
+
+ http_method = :get
+
+ Net::HTTP.should_receive(:new).with("localhost", 80).and_return resp
+
+ result = client.perform_request(http_method, path)
+ result.should == message
+ end
+
+
+ it "should should raise error with none 200 response" do
+ client = VCAP::Services::Api::ServiceGatewayClient.new(@url, @token, @timeout)
+ EM.should_receive(:reactor_running?).any_number_of_times.and_return nil
+
+ path = "/path1"
+ resp = mock("resq")
+ resp.should_receive(:body).and_return(
+ {:code => 40400, :description=> "not found"}.to_json,
+ {:code => 50300, :description=> "internal"}.to_json,
+ {:code => 50100, :description=> "not done yet"}.to_json,
+ )
+ resp.should_receive(:code).and_return(404, 503, 500)
+ resp.should_receive(:start).any_number_of_times.and_return resp
+
+ http_method = :get
+
+ Net::HTTP.should_receive(:new).with("localhost", 80).any_number_of_times.and_return resp
+
+ expect {client.perform_request(http_method, path)}.should raise_error(VCAP::Services::Api::ServiceGatewayClient::NotFoundResponse)
+ expect {client.perform_request(http_method, path)}.should raise_error(VCAP::Services::Api::ServiceGatewayClient::GatewayInternalResponse)
+ expect {client.perform_request(http_method, path)}.should raise_error(VCAP::Services::Api::ServiceGatewayClient::UnexpectedResponse)
+ end
+ end
+end

0 comments on commit 9e0cf5e

Please sign in to comment.