From 175a93ace435376f673a4f180d7ef177770aed1c Mon Sep 17 00:00:00 2001 From: Jesse Zhang Date: Tue, 23 Oct 2012 14:25:09 -0700 Subject: [PATCH] Parametrize requester of SDSClient This commit makes configurable the requester of SDSClient so that users can use synchronous / blocking clients even when EventMachine is running. - Factored out the SynchronousMultipartUpload - Added a .request to AsyncHttpMultiPartUpload Test plan: - All unit tests passed - Deploy CCNG with this change with BOSH and pass lifecycle API BVT's Change-Id: If9c159901f3f401a1b4474d7eef02899cb44b802 Conflicts: spec/unit/sds_client_spec.rb --- lib/services/api/async_requests.rb | 43 ++++++++++++++++++++++++ lib/services/api/clients/sds_client.rb | 45 +++++++++++++------------- spec/unit/sds_client_spec.rb | 42 ++++++++++++++++++++++++ vcap_common.gemspec | 4 +-- 4 files changed, 110 insertions(+), 24 deletions(-) diff --git a/lib/services/api/async_requests.rb b/lib/services/api/async_requests.rb index b041f53..0c55be9 100644 --- a/lib/services/api/async_requests.rb +++ b/lib/services/api/async_requests.rb @@ -82,6 +82,49 @@ def fibered(url, timeout, multipart, head={}) req.errback {f.resume(req)} Fiber.yield end + + # @param ["PUT", "POST"] method the HTTP verb for the request, currently + # ignored and an emulated PUT is always used + # @param [URI::Generic] url + # @param [String] file_path the data file to be imported to SDS + # @param [Hash] opts + # @option opts [Hash] :headers + # @option opts [Numeric] :timeout + # @return [[code, body]] + def request(method, url, file_path, opts = {}) + mime_types = MIME::Types.type_for(file_path) || [] + mime_types << "application/octet-stream" if mime_types.empty? + + payload = {:_method => 'put', :data_file => EM::StreamUploadIO.new(file_path, mime_types[0])} + multipart = EM::Multipart.new(payload, opts[:headers]) + # url = URI.parse(uri.to_s + path) + http = fibered(url, opts[:timeout], multipart) + raise UnexpectedResponse, "Error uploading #{file_path} to serialized_data_server #{@url}: #{http.error}" unless http.error.empty? + code = http.response_header.status.to_i + body = http.response + [code, body] + end + end + end + + class SynchronousMultipartUpload + # @param ["PUT", "POST"] method the HTTP verb for the request, currently + # ignored and an emulated PUT is always used + # @param [URI::Generic] url + # @param [String] file_path the data file to be imported to SDS + # @param [Hash] opts + # @option opts [Hash] :headers + # @return [[code, body]] + def self.request(method, url, file_path, opts = {}) + mime_types = MIME::Types.type_for(file_path) || [] + mime_types << "application/octet-stream" if mime_types.empty? + + payload = {:_method => 'put', :data_file => UploadIO.new(file_path, mime_types[0])} + req = Net::HTTP::Post::Multipart.new(url.request_uri, payload, opts[:headers]) + resp = Net::HTTP.new(url.host, url.port).start do |http| + http.request(req) + end + [resp.code.to_i, resp.body] end end end diff --git a/lib/services/api/clients/sds_client.rb b/lib/services/api/clients/sds_client.rb index 41fa494..025fd27 100644 --- a/lib/services/api/clients/sds_client.rb +++ b/lib/services/api/clients/sds_client.rb @@ -21,12 +21,17 @@ class SDSClient class SDSErrorResponse < StandardError; end class UnexpectedResponse < StandardError; end - def initialize(url, upload_token, timeout=60, opts={}) + # @return [#request] an object that has responds to #request and returns a + # 2-element array of [code, body] + attr_reader :requester + + def initialize(url, upload_token, timeout=60, + opts={:requester => AsyncHttpMultiPartUpload}) @url = url + # the options hash can't be specified in Ruby if caller omits timeout... + raise ArgumentError unless timeout.respond_to?(:to_f) @timeout = timeout - @hdrs = { - 'Content-Type' => 'application/json', - } + @requester = opts[:requester] || AsyncHttpMultiPartUpload @upload_hdrs = { 'Content-Type' => 'multipart/form-data', SDS_UPLOAD_TOKEN_HEADER => upload_token @@ -43,27 +48,23 @@ def import_from_data(args) def perform_multipart_upload(path, file_path) # upload file using multipart/form data result = nil - uri = URI.parse(@url) - - mime_types = MIME::Types.type_for(file_path) || [] - mime_types << "application/octet-stream" if mime_types.empty? + url = URI.join(@url, path) if EM.reactor_running? - payload = {:_method => 'put', :data_file => EM::StreamUploadIO.new(file_path, mime_types[0])} - multipart = EM::Multipart.new(payload, @upload_hdrs) - url = URI.parse(uri.to_s + path) - http = AsyncHttpMultiPartUpload.fibered(url, @timeout, multipart) - raise UnexpectedResponse, "Error uploading #{file_path} to serialized_data_server #{@url}: #{http.error}" unless http.error.empty? - code = http.response_header.status.to_i - body = http.response + code, body = requester.request( + "PUT", + url, + file_path, + :headers => @upload_hdrs, + :timeout => @timeout, + ) else - payload = {:_method => 'put', :data_file => UploadIO.new(file_path, mime_types[0])} - req = Net::HTTP::Post::Multipart.new(path, payload, @upload_hdrs) - resp = Net::HTTP.new(uri.host, uri.port).start do |http| - http.request(req) - end - code = resp.code.to_i - body = resp.body + code, body = SynchronousMultipartUpload.request( + "PUT", + url, + file_path, + :headers => @upload_hdrs, + ) end case code when 200 diff --git a/spec/unit/sds_client_spec.rb b/spec/unit/sds_client_spec.rb index 1155e40..f51461c 100644 --- a/spec/unit/sds_client_spec.rb +++ b/spec/unit/sds_client_spec.rb @@ -53,6 +53,47 @@ MockServer.last_request.headers["HTTP_X_VCAP_SDS_UPLOAD_TOKEN"].should == "secret" end + it "issues a PUT to serialization data server (with synchronous requester)" do + MockServer.stubbed_status = 200 + MockServer.stubbed_body = "{\"url\": \"http://example.com/foo\"}" + port = VCAP::grab_ephemeral_port + server = Thin::Server.new("localhost", port, MockServer) + server.silent = true + Thread.new { server.start } + + f = Tempfile.new("foo") + f.write("bar\n") + f.close + + client = VCAP::Services::Api::SDSClient.new( + "http://localhost:#{port}", + "secret", + 2, + :requester => VCAP::Services::Api::SynchronousMultipartUpload, + ) + EM.error_handler do |e| + raise e + end + Timeout.timeout(0.5) do + sleep 0.02 until server.running? + end + server.should be_running + + client.import_from_data( + :service => "redis", + :service_id => "deadbeef", + :msg => f.path, + ) + server.stop + Timeout.timeout(0.5) do + EM.reactor_thread.join + end + f.unlink + MockServer.last_request.forms["data_file"].should_not be_nil + MockServer.last_request.forms["data_file"][:tempfile].read.should == "bar\n" + MockServer.last_request.headers["HTTP_X_VCAP_SDS_UPLOAD_TOKEN"].should == "secret" + end + it "issues a PUT to serialization data server (without EM)" do MockServer.stubbed_status = 200 MockServer.stubbed_body = "{\"url\": \"http://example.com/foo\"}" @@ -78,6 +119,7 @@ "http://localhost:#{port}", "secret", 2, + :requester => VCAP::Services::Api::SynchronousMultipartUpload, ) Timeout.timeout(0.5) do sleep 0.02 until server && server.status == :Running diff --git a/vcap_common.gemspec b/vcap_common.gemspec index b34a0c1..106bd42 100644 --- a/vcap_common.gemspec +++ b/vcap_common.gemspec @@ -1,7 +1,7 @@ spec = Gem::Specification.new do |s| s.name = 'vcap_common' - s.version = '2.0.9' - s.date = '2012-09-26' + s.version = '2.0.10' + s.date = '2012-10-24' s.summary = 'vcap common' s.homepage = "http://github.com/vmware-ac/core" s.description = 'common vcap classes/methods'