Permalink
Browse files

added async way to make requests

  • Loading branch information...
1 parent 34500c1 commit 884a1ba5ff9d763cd691d86118f06983ca63824d Roman Kononov committed May 3, 2011
Showing with 168 additions and 4 deletions.
  1. +10 −1 lib/simple_record/active_sdb.rb
  2. +154 −0 test/faraday_em_adapter.rb
  3. +3 −2 test/test_base.rb
  4. +1 −1 test/test_simple_record.rb
@@ -408,7 +408,12 @@ def sql_select(options) # :nodoc:
select_expression = build_select(options)
logger.debug 'SELECT=' + select_expression
# request items
- query_result = self.connection.select(select_expression, options)
+ executor = Concur::Executor.new_eventmachine_executor
+ future = executor.execute do
+ self.connection.select(select_expression, options)
+ end
+ query_result = future.get
+ executor.shutdown
# puts 'QR=' + query_result.inspect
@next_token = query_result[:next_token]
ret = {}
@@ -874,7 +879,11 @@ def save2(options={})
end
dom = options[:domain] || domain
#puts 'atts_to_save2=' + atts_to_save.inspect
+ executor = Concur::Executor.new_eventmachine_executor
+ future = executor.execute do
connection.put_attributes(dom, id, atts_to_save, :replace, options)
+ end
+ #executor.shutdown
apres_save2
@attributes
end
@@ -0,0 +1,154 @@
+module Faraday
+ class Adapter
+ class EventMachine < Faraday::Adapter
+ dependency do
+ require 'eventmachine'
+ require 'em-http'
+ end
+
+ def call(env)
+ super
+
+
+ ret = Faraday::AsyncResponse.new(@app, env)
+
+ http = ::EventMachine::HttpRequest.new(env[:url])
+ method = env[:method].to_s.downcase
+ if method == 'post'
+ http = http.post :body => env[:body]
+ else
+ http = http.get
+ end
+ puts 'http=' + http.inspect
+
+ ret.em_request = http
+
+ http.errback {
+ ret.call_errback
+ }
+ http.callback {
+ p http.response_header.status
+ p http.response_header
+ p http.response
+ ret.call_callback(http)
+ env.update :status => http.response_header.status, :body=>http.response
+ @app.call env
+ }
+
+ ret
+
+ rescue Errno::ECONNREFUSED
+ raise Error::ConnectionFailed, $!
+ end
+ end
+
+ class EventMachineFutureAdapter < Faraday::Adapter
+ dependency do
+ require 'eventmachine'
+ require 'em-http'
+ require 'concur'
+ end
+
+ def call(env)
+ super
+
+# uri = URI::parse(env[:url].to_s)
+# port = env[:ssl] || 80
+
+
+# conn = EM::Protocols::HttpClient2.connect(:host=>uri.host, :port=>80, :ssl=>env[:ssl])
+
+
+ http = ::EventMachine::HttpRequest.new(env[:url])
+ method = env[:method].to_s.downcase
+ if method == 'post'
+ http = http.post :body => env[:body]
+ else
+ http = http.get
+ end
+ puts 'http=' + http.inspect
+
+ resp = Faraday::AsyncResponse.new(@app, env)
+ resp.em_request = http
+
+ ret = Concur::EventMachineFutureCallback.new(http) do |http|
+ puts 'futurecallback called ' + http.inspect
+ p http.response_header.status
+ p http.response_header
+ p http.response
+ resp.call_callback(http)
+ env.update :status => http.response_header.status, :body=>http.response
+ @app.call env
+ end
+ ret
+
+# http
+
+
+ rescue Errno::ECONNREFUSED
+ raise Error::ConnectionFailed, $!
+ end
+ end
+
+ end
+
+ class Response
+ def async?
+ false
+ end
+ end
+
+ class AsyncResponse < Faraday::Response
+ attr_accessor :em_request, :app, :env, :errblk, :callblk
+
+ def initialize(app, env)
+ @app = app
+ @env = env
+ end
+
+ def async?
+ true
+ end
+
+ def errback &blk
+ @errblk = blk
+ end
+
+ def callback &blk
+ @callblk = blk
+ end
+
+ def call_callback(response)
+ puts 'call_callback=' + response.inspect
+ return unless callblk
+ callblk.call(ResponseWrapper.new(response))
+ end
+
+ def call_errback(response)
+ return unless errblk
+ errblk.call(ResponseWrapper.new(response))
+ end
+ end
+
+
+ class ResponseWrapper
+ attr_accessor :http
+
+ def initialize(http)
+ @http = http
+ end
+
+ def body
+ http.response
+ end
+
+ def status
+ http.response_header.status
+ end
+
+ def headers
+ http.response_header
+ end
+
+ end
+end
View
@@ -6,6 +6,7 @@
require_relative 'my_model'
require_relative 'my_child_model'
require 'active_support'
+require_relative 'faraday_em_adapter'
class TestBase < Test::Unit::TestCase
@@ -39,12 +40,12 @@ def reset_connection(options={})
SimpleRecord::Base.set_domain_prefix("simplerecord_tests_")
SimpleRecord.establish_connection(@config['amazon']['access_key'], @config['amazon']['secret_key'],
- {:connection_mode=>:per_thread}.merge(options))
+ {:connection_mode=>:per_thread,:adapter=>Faraday::Adapter::EventMachineFutureAdapter}.merge(options))
# Establish AWS connection directly
@@sdb = Aws::SdbInterface.new(@config['amazon']['access_key'], @config['amazon']['secret_key'],
- {:connection_mode => :per_thread}.merge(options))
+ {:connection_mode => :per_thread,:adapter=>Faraday::Adapter::EventMachineFutureAdapter}.merge(options))
end
@@ -565,7 +565,7 @@ def test_update_attributes
assert mm.name == "name2", "Name is #{mm.name}"
assert mm.age == 21
# assert mm.date2.to_time.utc == now.utc, "#{mm.date2.class.name} #{mm.date2.to_time.inspect} != #{now.inspect}"
- sleep 1
+ sleep 10
mm = MyModel.find(mm.id)
assert mm.name == "name2", "Name is #{mm.name}"

0 comments on commit 884a1ba

Please sign in to comment.