Permalink
Browse files

refactor import_from_data API

 1. service gateway will not need access NFS
 2. Add support multipart uploading to eventmachine/em-http-request
 3. Add serialization_data_server client to upload file using multipart post

Change-Id: I8f340ddfd89e157d50d391fe36a5c178240fa2f3
  • Loading branch information...
1 parent b0cc19d commit 1dca468ec017ea5d296aeac1aa2fb7a70affba63 Frank Lu committed Jul 6, 2012
View
@@ -4,5 +4,4 @@ gemspec
group :spec do
gem 'rspec'
- gem 'em-http-request' #, '~> 1.0.0.beta3'
end
View
@@ -3,4 +3,5 @@
require 'services/api/const'
require 'services/api/messages'
require 'services/api/clients/service_gateway_client'
+require 'services/api/clients/sds_client'
require 'services/api/util'
@@ -58,4 +58,30 @@ def self.request(url, token, verb, timeout, msg=VCAP::Services::Api::EMPTY_REQUE
[msg.code, msg.body]
end
end
+
+ class AsyncHttpMultiPartUpload
+ class << self
+ def new(url, timeout, multipart, head={})
+ req = {
+ :head => head,
+ :body => "",
+ :multipart => multipart
+ }
+
+ if timeout
+ EM::HttpRequest.new(url, :inactivity_timeout => timeout).post req
+ else
+ EM::HttpRequest.new(url).post req
+ end
+ end
+
+ def fibered(url, timeout, multipart, head={})
+ req = new(url, timeout, multipart, head)
+ f = Fiber.current
+ req.callback { f.resume(req) }
+ req.errback {f.resume(req)}
+ Fiber.yield
+ end
+ end
+ end
end
@@ -0,0 +1,84 @@
+# Copyright (c) 2009-2012 VMware, Inc.
+require 'net/http'
+require 'net/http/post/multipart'
+require 'mime/types'
+require 'uri'
+
+require 'services/api/const'
+require 'services/api/messages'
+require 'services/api/multipart'
+
+module VCAP
+ module Services
+ module Api
+ end
+ end
+end
+
+module VCAP::Services::Api
+ class SDSClient
+
+ class SDSErrorResponse < StandardError; end
+ class UnexpectedResponse < StandardError; end
+
+ def initialize(url, upload_token, timeout=60, opts={})
+ @url = url
+ @timeout = timeout
+ @hdrs = {
+ 'Content-Type' => 'application/json',
+ }
+ @upload_hdrs = {
+ 'Content-Type' => 'multipart/form-data',
+ SDS_UPLOAD_TOKEN_HEADER => upload_token
+ }
+ end
+
+ def import_from_data(args)
+ resp = perform_multipart_upload("/serialized/#{args[:service]}/#{args[:service_id]}/serialized/data", args[:msg])
+ SerializedURL.decode(resp)
+ end
+
+ protected
+
+ def perform_multipart_upload(path, file_path)
+ # upload file using multipart/form data
+ result = nil
+ uri = URI.parse(@url)
+
+ mime_types = MIME::Types.type_for(file_path) || []
+ mime_types << "application/octet-stream" if mime_types.empty?
+
+ if EM.reactor_running?
+ payload = {:_method => 'put', :data_file => EM::StreamUploadIO.new(file_path, mime_types[0])}
+ multipart = EM::Multipart.new(payload, @upload_hdrs)
+ url = URI.parse(uri.to_s + path)
+ http = AsyncHttpMultiPartUpload.fibered(url, @timeout, multipart)
+ raise UnexpectedResponse, "Error uploading #{file_path} to serialized_data_server #{@url}: #{http.error}" unless http.error.empty?
+ code = http.response_header.status.to_i
+ body = http.response
+ else
+ payload = {:_method => 'put', :data_file => UploadIO.new(file_path, mime_types[0])}
+ req = Net::HTTP::Post::Multipart.new(path, payload, @upload_hdrs)
+ resp = Net::HTTP.new(uri.host, uri.port).start do |http|
+ http.request(req)
+ end
+ code = resp.code.to_i
+ body = resp.body
+ end
+ case code
+ when 200
+ body
+ when 400
+ raise SDSErrorResponse, "Fail to upload the file to serialization_data_server."
+ when 403
+ raise SDSErrorResponse, "You are forbidden to access serialization_data_server."
+ when 404
+ raise SDSErrorResponse, "Not found in serialization_data_server."
+ when 501
+ raise SDSErrorResponse, "Serialized data file is recognized, but file not found in serialization_data_server."
+ else
+ raise UnexpectedResponse, "Unexpected exception in serialization_data_server: #{(uri.to_s + path)} #{code} #{body}"
+ end
+ end
+ end
+end
@@ -22,12 +22,12 @@ class ServiceGatewayClient
:delete => Net::HTTP::Delete,
}
- # Public: Indicate gateway client encounter an unexpcted error,
+ # Public: Indicate gateway client encounter an unexpected error,
# such as can't connect to gateway or can't decode response.
#
class UnexpectedResponse < StandardError; end
- # Pubilc: Indicate an error response from gateway
+ # Public: Indicate an error response from gateway
#
class ErrorResponse < StandardError
attr_reader :status, :error
@@ -123,11 +123,6 @@ def import_from_url(args)
Job.decode(resp)
end
- def import_from_data(args)
- resp = perform_request(:put, "/gateway/v1/configurations/#{args[:service_id]}/serialized/data", args[:msg])
- Job.decode(resp)
- end
-
def job_info(args)
resp = perform_request(:get, "/gateway/v1/configurations/#{args[:service_id]}/jobs/#{args[:job_id]}")
Job.decode(resp)
@@ -2,6 +2,7 @@
module VCAP
module Services
module Api
+ SDS_UPLOAD_TOKEN_HEADER = 'X-VCAP-SDS-Upload-Token'
GATEWAY_TOKEN_HEADER = 'X-VCAP-Service-Token'
SERVICE_LABEL_REGEX = /^\S+-\S+$/
end
@@ -0,0 +1,191 @@
+# Copyright (c) 2009-2011 VMware, Inc.
+require 'eventmachine'
+require 'em-http-request'
+
+# monkey-patch for em-http-request to support multipart file upload
+
+module EventMachine
+ class StreamUploadIO
+ attr_reader :args, :filename, :basename, :size, :content_type
+ def initialize(filename, content_type, args={})
+ # disable http chunking
+ @args = args.merge({:http_chunks => false})
+ @filename = filename
+ # FIXME how to catch exception and log it
+ begin
+ @basename = File.basename(filename)
+ @size = File.size(filename)
+ rescue => e
+ # size == 0, the part will be injected
+ @size = 0
+ end
+ @content_type = content_type
+ end
+
+ def add_extra_size(extra_size)
+ @size += extra_size
+ end
+
+ def length
+ @size
+ end
+
+ def stream_file_data
+ true
+ end
+ end
+
+ module Part
+ def self.create(boundary, k, v)
+ if v.respond_to?(:stream_file_data)
+ FilePart.new(boundary, k, v)
+ else
+ ParamPart.new(boundary, k, v)
+ end
+ end
+
+ def to_io
+ @io
+ end
+
+ def length
+ @io.size
+ end
+
+ def send_part(conn, parts, idx)
+ end
+
+ def get_next_part(parts, idx)
+ next_idx = idx.to_i + 1
+ if parts && next_idx < parts.size && next_idx >=0
+ next_part = parts[next_idx]
+ else
+ nil
+ end
+ next_part
+ end
+
+ def send_next_part(conn, parts, idx)
+ next_part = get_next_part(parts, idx)
+ next_part.send_part(conn, parts, idx+1) if next_part
+ end
+
+ end
+
+ class ParamPart
+ include Part
+ def initialize(boundary, name, value)
+ @boundary = boundary
+ @name = name
+ part = ''
+ part << "--#{@boundary}\r\n"
+ part << "Content-Disposition: form-data; name=\"#{@name.to_s}\"\r\n"
+ part << "\r\n"
+ part << "#{value.to_s}\r\n"
+ @io = StringIO.new(part)
+ end
+
+ def send_part(conn, parts, idx)
+ conn.send_data @io.string if conn
+ send_next_part(conn, parts, idx)
+ end
+ end
+
+ class EpiloguePart
+ include Part
+ def initialize(boundary)
+ @io = StringIO.new("--#{boundary}--\r\n") #\r\n or \r\n\r\n
+ end
+
+ def send_part(conn, parts, idx)
+ conn.send_data @io.string if conn
+ # this part should be the last part
+ end
+ end
+
+ class FilePart
+ include Part
+ def initialize(boundary, name, upload_io)
+ @boundary = boundary
+ @name = name
+ @io = upload_io
+ @part = ''
+ @part << "--#{boundary}\r\n"
+ @part << "Content-Disposition: form-data; name=\"#{name.to_s}\"; filename=\"#{@io.filename}\"\r\n"
+ @part << "Content-Length: #{@io.size}\r\n"
+ @part << "Content-Type: #{@io.content_type}\r\n"
+ @part << "Content-Transfer-Encoding: binary\r\n"
+ @part << "\r\n"
+ @end_part ="\r\n"
+ @io.add_extra_size(@part.size + @end_part.size)
+ end
+
+ def send_part(conn, parts, idx)
+ conn.send_data @part
+ streamer = EM::FileStreamer.new(conn, @io.filename, @io.args)
+ streamer.callback {
+ conn.send_data @end_part
+ send_next_part(conn, parts, idx)
+ }
+ end
+ end
+
+ class Multipart
+ DEFAULT_BOUNDARY = "-----------RubyEMMultiPartPost"
+ attr_reader :parts, :ps, :content_type, :content_length, :boundary, :headers
+ def initialize(params, headers={}, boundary=DEFAULT_BOUNDARY)
+ @parts = params.map{ |k,v| Part.create(boundary, k, v) }
+ @parts << EpiloguePart.new(boundary)
+ # inject the part with length = 0
+ @ps = @parts.select{ |part| part.length > 0 }
+ @content_type = "multipart/form-data; boundary=#{boundary}"
+ @content_length = 0
+ @parts.each do |part|
+ @content_length += part.length
+ end
+ @boundary = boundary
+ @headers = headers
+ end
+
+ def send_body(conn)
+ if conn && conn.error.nil? && @parts.size > 0
+ part = @parts.first
+ part.send_part(conn, @parts, 0)
+ end
+ end
+ end
+end
+
+## Support to streaming the file when sending body
+## TODO FIXME this patch whether depends on specified version???
+module EventMachine
+ class HttpClient
+ alias_method :original_send_request, :send_request
+ def multipart_request?
+ (@req.method == 'POST' or @req.method == 'PUT') and @options[:multipart]
+ end
+
+ def send_request(head, body)
+ unless multipart_request?
+ original_send_request(head, body)
+ else
+ body = normalize_body(body)
+ multipart = @options[:multipart]
+ query = @options[:query]
+
+ head['content-length'] = multipart.content_length
+ head['content-type'] = multipart.content_type
+ extra_headers = {}
+ extra_headers = multipart.headers.reject { |k, v| %w(content-length content-type).include?(k.to_s.downcase) }
+ head.merge! extra_headers
+
+ request_header ||= encode_request(@req.method, @req.uri, query, @conn.opts.proxy)
+ request_header << encode_headers(head)
+ request_header << CRLF
+ @conn.send_data request_header
+
+ multipart.send_body(@conn)
+ end
+ end
+ end
+end
View
@@ -1,7 +1,7 @@
spec = Gem::Specification.new do |s|
s.name = 'vcap_common'
s.version = '2.0.4'
- s.date = '2012-07-05'
+ s.date = '2012-08-05'
s.summary = 'vcap common'
s.homepage = "http://github.com/vmware-ac/core"
s.description = 'common vcap classes/methods'
@@ -16,6 +16,9 @@ spec = Gem::Specification.new do |s|
s.add_dependency('posix-spawn', '~> 0.3.6')
s.add_dependency('membrane', '~> 0.0.2')
s.add_dependency('httpclient')
+ s.add_dependency('em-http-request', '~> 1.0.0.beta3')
+ s.add_dependency('multipart-post')
+ s.add_dependency('mime-types')
s.add_development_dependency('rake', '~> 0.9.2')
s.require_paths = ['lib']

0 comments on commit 1dca468

Please sign in to comment.