Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 2 commits
  • 5 files changed
  • 0 comments
  • 1 contributor
Oct 25, 2012
Jesse Zhang Add tests for SDSClient
  So that we have faster feedback and confidence for future
modification.

  - Tests are added to cover both code paths
  - gemspec is modified to have a stricter requirement on
    em-http-request because our monkey patch depends specifically on
    1.0.0.beta3 (nothing newer than beta4!)
  - sinatra is added as a development dependency

Change-Id: I91e67462263baa5beb32272fa0d2141ce80a6d22
8a970b7
Jesse Zhang Parametrize requester of SDSClient
  This commit makes configurable the requester of SDSClient so that
users can use synchronous / blocking clients even when EventMachine is
running.

  - Factors out the SynchronousMultipartUpload
  - Adds a .request to AsyncHttpMultiPartUpload
  - Bumps gem version to 2.0.10

  Test plan:
  - All unit tests passed
  - Deploy CCNG with this change with BOSH and pass lifecycle
API BVT's

Change-Id: If9c159901f3f401a1b4474d7eef02899cb44b802
78ff6ed
43  lib/services/api/async_requests.rb
@@ -82,6 +82,49 @@ def fibered(url, timeout, multipart, head={})
82 82
         req.errback {f.resume(req)}
83 83
         Fiber.yield
84 84
       end
  85
+
  86
+      # @param ["PUT", "POST"] method the HTTP verb for the request, currently
  87
+      #     ignored and an emulated PUT is always used
  88
+      # @param [URI::Generic] url
  89
+      # @param [String] file_path the data file to be imported to SDS
  90
+      # @param [Hash] opts
  91
+      # @option opts [Hash] :headers
  92
+      # @option opts [Numeric] :timeout
  93
+      # @return [[code, body]]
  94
+      def request(method, url, file_path, opts = {})
  95
+        mime_types = MIME::Types.type_for(file_path) || []
  96
+        mime_types << "application/octet-stream" if mime_types.empty?
  97
+
  98
+        payload = {:_method => 'put', :data_file => EM::StreamUploadIO.new(file_path, mime_types[0])}
  99
+        multipart = EM::Multipart.new(payload, opts[:headers])
  100
+        # url = URI.parse(uri.to_s + path)
  101
+        http = fibered(url, opts[:timeout], multipart)
  102
+        raise UnexpectedResponse, "Error uploading #{file_path} to serialized_data_server #{@url}: #{http.error}" unless http.error.empty?
  103
+        code = http.response_header.status.to_i
  104
+        body = http.response
  105
+        [code, body]
  106
+      end
  107
+    end
  108
+  end
  109
+
  110
+  class SynchronousMultipartUpload
  111
+      # @param ["PUT", "POST"] method the HTTP verb for the request, currently
  112
+      #     ignored and an emulated PUT is always used
  113
+      # @param [URI::Generic] url
  114
+      # @param [String] file_path the data file to be imported to SDS
  115
+      # @param [Hash] opts
  116
+      # @option opts [Hash] :headers
  117
+      # @return [[code, body]]
  118
+    def self.request(method, url, file_path, opts = {})
  119
+      mime_types = MIME::Types.type_for(file_path) || []
  120
+      mime_types << "application/octet-stream" if mime_types.empty?
  121
+
  122
+      payload = {:_method => 'put', :data_file => UploadIO.new(file_path, mime_types[0])}
  123
+      req = Net::HTTP::Post::Multipart.new(url.request_uri, payload, opts[:headers])
  124
+      resp = Net::HTTP.new(url.host, url.port).start do |http|
  125
+        http.request(req)
  126
+      end
  127
+      [resp.code.to_i, resp.body]
85 128
     end
86 129
   end
87 130
 end
39  lib/services/api/clients/sds_client.rb
@@ -4,6 +4,7 @@
4 4
 require 'mime/types'
5 5
 require 'uri'
6 6
 
  7
+require 'eventmachine'
7 8
 require 'services/api/const'
8 9
 require 'services/api/messages'
9 10
 require 'services/api/multipart'
@@ -23,10 +24,9 @@ class UnexpectedResponse < StandardError; end
23 24
 
24 25
     def initialize(url, upload_token, timeout=60, opts={})
25 26
       @url = url
  27
+      # the options hash can't be specified in Ruby if caller omits timeout...
  28
+      raise ArgumentError unless timeout.respond_to?(:to_f)
26 29
       @timeout = timeout
27  
-      @hdrs  = {
28  
-        'Content-Type' => 'application/json',
29  
-      }
30 30
       @upload_hdrs = {
31 31
         'Content-Type' => 'multipart/form-data',
32 32
         SDS_UPLOAD_TOKEN_HEADER => upload_token
@@ -43,28 +43,21 @@ def import_from_data(args)
43 43
     def perform_multipart_upload(path, file_path)
44 44
       # upload file using multipart/form data
45 45
       result = nil
46  
-      uri = URI.parse(@url)
  46
+      url = URI.join(@url, path)
47 47
 
48  
-      mime_types = MIME::Types.type_for(file_path) || []
49  
-      mime_types << "application/octet-stream" if mime_types.empty?
  48
+      requester = if EM.reactor_thread?
  49
+                    AsyncHttpMultiPartUpload
  50
+                  else
  51
+                    SynchronousMultipartUpload
  52
+                  end
50 53
 
51  
-      if EM.reactor_running?
52  
-        payload = {:_method => 'put', :data_file => EM::StreamUploadIO.new(file_path, mime_types[0])}
53  
-        multipart = EM::Multipart.new(payload, @upload_hdrs)
54  
-        url = URI.parse(uri.to_s + path)
55  
-        http = AsyncHttpMultiPartUpload.fibered(url, @timeout, multipart)
56  
-        raise UnexpectedResponse, "Error uploading #{file_path} to serialized_data_server #{@url}: #{http.error}" unless http.error.empty?
57  
-        code = http.response_header.status.to_i
58  
-        body = http.response
59  
-      else
60  
-        payload = {:_method => 'put', :data_file => UploadIO.new(file_path, mime_types[0])}
61  
-        req = Net::HTTP::Post::Multipart.new(path, payload, @upload_hdrs)
62  
-        resp = Net::HTTP.new(uri.host, uri.port).start do |http|
63  
-          http.request(req)
64  
-        end
65  
-        code = resp.code.to_i
66  
-        body = resp.body
67  
-      end
  54
+      code, body = requester.request(
  55
+        "PUT",
  56
+        url,
  57
+        file_path,
  58
+        :headers => @upload_hdrs,
  59
+        :timeout => @timeout,
  60
+      )
68 61
       case code
69 62
       when 200
70 63
         body
4  lib/services/api/multipart.rb
@@ -158,6 +158,10 @@ def send_body(conn)
158 158
 
159 159
 ## Support to streaming the file when sending body
160 160
 ## TODO FIXME this patch whether depends on specified version???
  161
+## FIXME: yes it depends on a very specific beta version, yuck
  162
+## FIXME: a less gross alternative is to stream out the request body to disk,
  163
+## and use the :file option to instruct em-http-request to stream the body
  164
+## from disk
161 165
 module EventMachine
162 166
   class HttpClient
163 167
     alias_method :original_send_request, :send_request
182  spec/unit/sds_client_spec.rb
... ...
@@ -0,0 +1,182 @@
  1
+# Copyright (c) 2009-2012 VMware, Inc.
  2
+
  3
+require "spec_helper"
  4
+require "tempfile"
  5
+require "timeout"
  6
+require "sinatra"
  7
+
  8
+require "services/api/clients/sds_client"
  9
+
  10
+describe VCAP::Services::Api::SDSClient do
  11
+  describe "#import_from_data" do
  12
+    it "issues a PUT to serialization data server" do
  13
+      # webmock would be great, but unfortunately it's a monkey-patch, and we
  14
+      # are (even more unfortunately) doing yet another monkey-patch on
  15
+      # em-http-request, so their monkey patch can't catch calls to our monkey
  16
+      # patch. So we have to roll out this hard-core real servers all over the
  17
+      # place. Yuck
  18
+      MockServer.stubbed_status = 200
  19
+      MockServer.stubbed_body = "{\"url\": \"http://example.com/foo\"}"
  20
+      port = VCAP::grab_ephemeral_port
  21
+      server = Thin::Server.new("localhost", port, MockServer)
  22
+      server.silent = true
  23
+      t = Thread.new { server.start }
  24
+
  25
+      f = Tempfile.new("foo")
  26
+      f.write("bar\n")
  27
+      f.close
  28
+
  29
+      client = VCAP::Services::Api::SDSClient.new(
  30
+        "http://localhost:#{port}",
  31
+        "secret",
  32
+      )
  33
+      EM.error_handler do |e|
  34
+        raise e
  35
+      end
  36
+      Timeout.timeout(0.5) do
  37
+        sleep 0.02 until server.running?
  38
+      end
  39
+      server.should be_running
  40
+
  41
+      EM.next_tick do
  42
+        fiber = Fiber.new do
  43
+          client.import_from_data(
  44
+            :service => "redis",
  45
+            :service_id => "deadbeef",
  46
+            :msg => f.path,
  47
+          )
  48
+          EM.stop
  49
+        end
  50
+        fiber.resume
  51
+      end
  52
+      Timeout.timeout(0.5) do
  53
+        EM.reactor_thread.join
  54
+      end
  55
+      f.unlink
  56
+      MockServer.last_request.forms["data_file"].should_not be_nil
  57
+      MockServer.last_request.forms["data_file"][:tempfile].read.should == "bar\n"
  58
+      MockServer.last_request.headers["HTTP_X_VCAP_SDS_UPLOAD_TOKEN"].should == "secret"
  59
+    end
  60
+
  61
+    it "issues a PUT to serialization data server (outside of reactor thread)" do
  62
+      MockServer.stubbed_status = 200
  63
+      MockServer.stubbed_body = "{\"url\": \"http://example.com/foo\"}"
  64
+      port = VCAP::grab_ephemeral_port
  65
+      server = Thin::Server.new("localhost", port, MockServer)
  66
+      server.silent = true
  67
+      Thread.new { server.start }
  68
+
  69
+      f = Tempfile.new("foo")
  70
+      f.write("bar\n")
  71
+      f.close
  72
+
  73
+      client = VCAP::Services::Api::SDSClient.new(
  74
+        "http://localhost:#{port}",
  75
+        "secret",
  76
+        2,
  77
+      )
  78
+      EM.error_handler do |e|
  79
+        raise e
  80
+      end
  81
+      Timeout.timeout(0.5) do
  82
+        sleep 0.02 until server.running?
  83
+      end
  84
+      server.should be_running
  85
+
  86
+      client.import_from_data(
  87
+        :service => "redis",
  88
+        :service_id => "deadbeef",
  89
+        :msg => f.path,
  90
+      )
  91
+      server.stop
  92
+      Timeout.timeout(0.5) do
  93
+        EM.reactor_thread.join
  94
+      end
  95
+      f.unlink
  96
+      MockServer.last_request.forms["data_file"].should_not be_nil
  97
+      MockServer.last_request.forms["data_file"][:tempfile].read.should == "bar\n"
  98
+      MockServer.last_request.headers["HTTP_X_VCAP_SDS_UPLOAD_TOKEN"].should == "secret"
  99
+    end
  100
+
  101
+    it "issues a PUT to serialization data server (without EM)" do
  102
+      MockServer.stubbed_status = 200
  103
+      MockServer.stubbed_body = "{\"url\": \"http://example.com/foo\"}"
  104
+      server = nil
  105
+      Thread.abort_on_exception = true
  106
+      port = VCAP::grab_ephemeral_port
  107
+      t = Thread.new do
  108
+        Rack::Handler::WEBrick.run(MockServer,
  109
+                                   :Port => port,
  110
+                                   :Host => "localhost",
  111
+                                   # shut up webrick
  112
+                                   :Logger => WEBrick::Log.new(nil, WEBrick::Log::WARN),
  113
+                                   :AccessLog => [],
  114
+                                  ) do |s|
  115
+          server = s
  116
+        end
  117
+      end
  118
+
  119
+      f = Tempfile.new("foo")
  120
+      f.write("bar\n")
  121
+      f.close
  122
+
  123
+      client = VCAP::Services::Api::SDSClient.new(
  124
+        "http://localhost:#{port}",
  125
+        "secret",
  126
+        2,
  127
+        :requester => VCAP::Services::Api::SynchronousMultipartUpload,
  128
+      )
  129
+      Timeout.timeout(0.5) do
  130
+        sleep 0.02 until server && server.status == :Running
  131
+      end
  132
+      server.status.should == :Running
  133
+
  134
+      client.import_from_data(
  135
+        :service => "redis",
  136
+        :service_id => "deadbeef",
  137
+        :msg => f.path,
  138
+      )
  139
+      # server#stop woul also do, but it takes 2 seconds...
  140
+      server.shutdown
  141
+      Timeout.timeout(0.5) do
  142
+        t.kill
  143
+        t.join
  144
+      end
  145
+      f.unlink
  146
+      MockServer.last_request.forms["data_file"].should_not be_nil
  147
+      MockServer.last_request.forms["data_file"][:tempfile].read.should == "bar\n"
  148
+      MockServer.last_request.headers["HTTP_X_VCAP_SDS_UPLOAD_TOKEN"].should == "secret"
  149
+    end
  150
+  end
  151
+end
  152
+
  153
+class MockServer < Sinatra::Base
  154
+  RequestSignature = Struct.new(
  155
+    :path,
  156
+    :headers,
  157
+    :forms,
  158
+  )
  159
+  class << self
  160
+    @stubbed_status = 204
  161
+    @stubbed_body = ""
  162
+    attr_accessor :last_request, :stubbed_status, :stubbed_body
  163
+  end
  164
+
  165
+  private
  166
+  def record_signature(request)
  167
+    r = RequestSignature.new
  168
+    r.path = request.path
  169
+    r.headers = request.env.select { |k,_| k.start_with?("HTTP_") }
  170
+    r.forms = request.POST.dup
  171
+    self.class.last_request = r
  172
+  end
  173
+
  174
+  # FIXME: drop this when we switch to true PUT
  175
+  enable :method_override
  176
+
  177
+  put "/*" do
  178
+    record_signature(request)
  179
+
  180
+    [self.class.stubbed_status, self.class.stubbed_body]
  181
+  end
  182
+end
7  vcap_common.gemspec
... ...
@@ -1,7 +1,7 @@
1 1
 spec = Gem::Specification.new do |s|
2 2
   s.name = 'vcap_common'
3  
-  s.version = '2.0.9'
4  
-  s.date = '2012-09-26'
  3
+  s.version = '2.0.10'
  4
+  s.date = '2012-10-25'
5 5
   s.summary = 'vcap common'
6 6
   s.homepage = "http://github.com/vmware-ac/core"
7 7
   s.description = 'common vcap classes/methods'
@@ -16,11 +16,12 @@ spec = Gem::Specification.new do |s|
16 16
   s.add_dependency('posix-spawn', '~> 0.3.6')
17 17
   s.add_dependency('membrane', '~> 0.0.2')
18 18
   s.add_dependency('httpclient')
19  
-  s.add_dependency('em-http-request', '~> 1.0.0.beta3')
  19
+  s.add_dependency('em-http-request', '~> 1.0.0.beta3', '< 1.0.0.beta4')
20 20
   s.add_dependency('multipart-post')
21 21
   s.add_dependency('mime-types')
22 22
   s.add_development_dependency('rake', '~> 0.9.2')
23 23
   s.add_development_dependency('rspec')
  24
+  s.add_development_dependency('sinatra')
24 25
 
25 26
   s.require_paths = ['lib']
26 27
 

No commit comments for this range

Something went wrong with that request. Please try again.