Skip to content

Commit

Permalink
Parametrize requester of SDSClient
Browse files Browse the repository at this point in the history
  This commit makes configurable the requester of SDSClient so that
users can use synchronous / blocking clients even when EventMachine is
running.

  - Factored out the SynchronousMultipartUpload
  - Added a .request to AsyncHttpMultiPartUpload

  Test plan:
  - All unit tests passed
  - Deploy CCNG with this change with BOSH and pass lifecycle
API BVT's

Change-Id: If9c159901f3f401a1b4474d7eef02899cb44b802

Conflicts:

	spec/unit/sds_client_spec.rb
  • Loading branch information
d committed Oct 23, 2012
1 parent 3b11537 commit 175a93a
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 24 deletions.
43 changes: 43 additions & 0 deletions lib/services/api/async_requests.rb
Expand Up @@ -82,6 +82,49 @@ def fibered(url, timeout, multipart, head={})
req.errback {f.resume(req)}
Fiber.yield
end

# @param ["PUT", "POST"] method the HTTP verb for the request, currently
# ignored and an emulated PUT is always used
# @param [URI::Generic] url
# @param [String] file_path the data file to be imported to SDS
# @param [Hash] opts
# @option opts [Hash] :headers
# @option opts [Numeric] :timeout
# @return [[code, body]]
def request(method, url, file_path, opts = {})
mime_types = MIME::Types.type_for(file_path) || []
mime_types << "application/octet-stream" if mime_types.empty?

payload = {:_method => 'put', :data_file => EM::StreamUploadIO.new(file_path, mime_types[0])}
multipart = EM::Multipart.new(payload, opts[:headers])
# url = URI.parse(uri.to_s + path)
http = fibered(url, opts[:timeout], multipart)
raise UnexpectedResponse, "Error uploading #{file_path} to serialized_data_server #{@url}: #{http.error}" unless http.error.empty?
code = http.response_header.status.to_i
body = http.response
[code, body]
end
end
end

class SynchronousMultipartUpload
# @param ["PUT", "POST"] method the HTTP verb for the request, currently
# ignored and an emulated PUT is always used
# @param [URI::Generic] url
# @param [String] file_path the data file to be imported to SDS
# @param [Hash] opts
# @option opts [Hash] :headers
# @return [[code, body]]
def self.request(method, url, file_path, opts = {})
mime_types = MIME::Types.type_for(file_path) || []
mime_types << "application/octet-stream" if mime_types.empty?

payload = {:_method => 'put', :data_file => UploadIO.new(file_path, mime_types[0])}
req = Net::HTTP::Post::Multipart.new(url.request_uri, payload, opts[:headers])
resp = Net::HTTP.new(url.host, url.port).start do |http|
http.request(req)
end
[resp.code.to_i, resp.body]
end
end
end
45 changes: 23 additions & 22 deletions lib/services/api/clients/sds_client.rb
Expand Up @@ -21,12 +21,17 @@ class SDSClient
class SDSErrorResponse < StandardError; end
class UnexpectedResponse < StandardError; end

def initialize(url, upload_token, timeout=60, opts={})
# @return [#request] an object that has responds to #request and returns a
# 2-element array of [code, body]
attr_reader :requester

def initialize(url, upload_token, timeout=60,
opts={:requester => AsyncHttpMultiPartUpload})
@url = url
# the options hash can't be specified in Ruby if caller omits timeout...
raise ArgumentError unless timeout.respond_to?(:to_f)
@timeout = timeout
@hdrs = {
'Content-Type' => 'application/json',
}
@requester = opts[:requester] || AsyncHttpMultiPartUpload
@upload_hdrs = {
'Content-Type' => 'multipart/form-data',
SDS_UPLOAD_TOKEN_HEADER => upload_token
Expand All @@ -43,27 +48,23 @@ def import_from_data(args)
def perform_multipart_upload(path, file_path)
# upload file using multipart/form data
result = nil
uri = URI.parse(@url)

mime_types = MIME::Types.type_for(file_path) || []
mime_types << "application/octet-stream" if mime_types.empty?
url = URI.join(@url, path)

if EM.reactor_running?
payload = {:_method => 'put', :data_file => EM::StreamUploadIO.new(file_path, mime_types[0])}
multipart = EM::Multipart.new(payload, @upload_hdrs)
url = URI.parse(uri.to_s + path)
http = AsyncHttpMultiPartUpload.fibered(url, @timeout, multipart)
raise UnexpectedResponse, "Error uploading #{file_path} to serialized_data_server #{@url}: #{http.error}" unless http.error.empty?
code = http.response_header.status.to_i
body = http.response
code, body = requester.request(
"PUT",
url,
file_path,
:headers => @upload_hdrs,
:timeout => @timeout,
)
else
payload = {:_method => 'put', :data_file => UploadIO.new(file_path, mime_types[0])}
req = Net::HTTP::Post::Multipart.new(path, payload, @upload_hdrs)
resp = Net::HTTP.new(uri.host, uri.port).start do |http|
http.request(req)
end
code = resp.code.to_i
body = resp.body
code, body = SynchronousMultipartUpload.request(
"PUT",
url,
file_path,
:headers => @upload_hdrs,
)
end
case code
when 200
Expand Down
42 changes: 42 additions & 0 deletions spec/unit/sds_client_spec.rb
Expand Up @@ -53,6 +53,47 @@
MockServer.last_request.headers["HTTP_X_VCAP_SDS_UPLOAD_TOKEN"].should == "secret"
end

it "issues a PUT to serialization data server (with synchronous requester)" do
MockServer.stubbed_status = 200
MockServer.stubbed_body = "{\"url\": \"http://example.com/foo\"}"
port = VCAP::grab_ephemeral_port
server = Thin::Server.new("localhost", port, MockServer)
server.silent = true
Thread.new { server.start }

f = Tempfile.new("foo")
f.write("bar\n")
f.close

client = VCAP::Services::Api::SDSClient.new(
"http://localhost:#{port}",
"secret",
2,
:requester => VCAP::Services::Api::SynchronousMultipartUpload,
)
EM.error_handler do |e|
raise e
end
Timeout.timeout(0.5) do
sleep 0.02 until server.running?
end
server.should be_running

client.import_from_data(
:service => "redis",
:service_id => "deadbeef",
:msg => f.path,
)
server.stop
Timeout.timeout(0.5) do
EM.reactor_thread.join
end
f.unlink
MockServer.last_request.forms["data_file"].should_not be_nil
MockServer.last_request.forms["data_file"][:tempfile].read.should == "bar\n"
MockServer.last_request.headers["HTTP_X_VCAP_SDS_UPLOAD_TOKEN"].should == "secret"
end

it "issues a PUT to serialization data server (without EM)" do
MockServer.stubbed_status = 200
MockServer.stubbed_body = "{\"url\": \"http://example.com/foo\"}"
Expand All @@ -78,6 +119,7 @@
"http://localhost:#{port}",
"secret",
2,
:requester => VCAP::Services::Api::SynchronousMultipartUpload,
)
Timeout.timeout(0.5) do
sleep 0.02 until server && server.status == :Running
Expand Down
4 changes: 2 additions & 2 deletions vcap_common.gemspec
@@ -1,7 +1,7 @@
spec = Gem::Specification.new do |s|
s.name = 'vcap_common'
s.version = '2.0.9'
s.date = '2012-09-26'
s.version = '2.0.10'
s.date = '2012-10-24'
s.summary = 'vcap common'
s.homepage = "http://github.com/vmware-ac/core"
s.description = 'common vcap classes/methods'
Expand Down

0 comments on commit 175a93a

Please sign in to comment.