Skip to content

Commit

Permalink
add EM.synchrony wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
igrigorik committed Mar 20, 2010
1 parent a148882 commit ffd7084
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 82 deletions.
16 changes: 14 additions & 2 deletions lib/em-synchrony.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,19 @@
require "em-synchrony/em-multi"
require "em-synchrony/em-http"
require "em-synchrony/em-mysql"
# require "em-synchrony/em-jack"
require "em-synchrony/em-remcached"

require "em-synchrony/connection_pool"
require "em-synchrony/connection_pool"

module EventMachine

# A convenience method for wrapping EM.run body within
# a Ruby Fiber such that async operations can be transparently
# paused and resumed based on IO scheduling.
def self.synchrony(blk=nil, tail=nil, &block)
Fiber.new {
self.run(blk, tail, &block)
}.resume
end

end
9 changes: 6 additions & 3 deletions spec/beanstalk_spec.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
require 'spec/helper'
require 'spec/helper/all'

DELAY = 0.25

__END__
describe EMJack do

it "should fire sequential Beanstalk requests" do
pending

EventMachine.run do

Fiber.new {
jack = EMJack::Connection.new

Expand All @@ -19,8 +21,9 @@
end

it "should fire multiple requests in parallel" do
pending

EventMachine.run do

Fiber.new {
jack = EMJack::Connection.new

Expand Down
8 changes: 6 additions & 2 deletions spec/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
describe EventMachine::HttpRequest do
it "should fire sequential requests" do
EventMachine.run do
@s = StubServer.new("HTTP/1.0 200 OK\r\nConnection: close\r\n\r\nFoo", DELAY)

Fiber.new {
@s = StubServer.new("HTTP/1.0 200 OK\r\nConnection: close\r\n\r\nFoo", DELAY)

start = now
order = []
order.push :get if EventMachine::HttpRequest.new(URL).get
Expand All @@ -20,16 +21,18 @@
(now - start.to_f).should be_within(DELAY * order.size * 0.15).of(DELAY * order.size)
order.should == [:get, :post, :head, :post, :put]

@s.stop
EventMachine.stop
}.resume
end
end

it "should fire simultaneous requests via Multi interface" do
EventMachine.run do
@s = StubServer.new("HTTP/1.0 200 OK\r\nConnection: close\r\n\r\nFoo", DELAY)

Fiber.new {
@s = StubServer.new("HTTP/1.0 200 OK\r\nConnection: close\r\n\r\nFoo", DELAY)

start = now

multi = EventMachine::Synchrony::Multi.new
Expand All @@ -44,6 +47,7 @@
res.responses[:callback].size.should == 5
res.responses[:errback].size.should == 0

@s.stop
EventMachine.stop
}.resume
end
Expand Down
78 changes: 36 additions & 42 deletions spec/mysql_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,21 @@
describe EventMachine::MySQL do

it "should fire sequential, synchronous requests" do
EventMachine.run do
Fiber.new {
db = EventMachine::MySQL.new(host: "localhost")
start = now
res = []
EventMachine.synchrony do
db = EventMachine::MySQL.new(host: "localhost")
start = now
res = []

res.push db.query(QUERY)
res.push db.query(QUERY)
(now - start.to_f).should be_within(DELAY * res.size * 0.15).of(DELAY * res.size)
res.push db.query(QUERY)
res.push db.query(QUERY)
(now - start.to_f).should be_within(DELAY * res.size * 0.15).of(DELAY * res.size)

EventMachine.stop
}.resume
EventMachine.stop
end
end

it "should have accept a callback, errback on async queries" do
EventMachine.run do
EventMachine.synchrony do
db = EventMachine::MySQL.new(host: "localhost")

res = db.aquery(QUERY)
Expand All @@ -36,58 +34,54 @@
end

it "should fire simultaneous requests via Multi interface" do
EventMachine.run do
EventMachine.synchrony do

db = EventMachine::Synchrony::ConnectionPool.new(size: 2) do
EventMachine::MySQL.new(host: "localhost")
end

Fiber.new {
start = now
start = now

multi = EventMachine::Synchrony::Multi.new
multi.add :a, db.aquery(QUERY)
multi.add :b, db.aquery(QUERY)
res = multi.perform
multi = EventMachine::Synchrony::Multi.new
multi.add :a, db.aquery(QUERY)
multi.add :b, db.aquery(QUERY)
res = multi.perform

(now - start.to_f).should be_within(DELAY * 0.15).of(DELAY)
res.responses[:callback].size.should == 2
res.responses[:errback].size.should == 0
(now - start.to_f).should be_within(DELAY * 0.15).of(DELAY)
res.responses[:callback].size.should == 2
res.responses[:errback].size.should == 0

EventMachine.stop
}.resume
EventMachine.stop
end
end

it "should fire sequential and simultaneous MySQL requests" do
EventMachine.run do
EventMachine.synchrony do
db = EventMachine::Synchrony::ConnectionPool.new(size: 3) do
EventMachine::MySQL.new(host: "localhost")
end

Fiber.new {
start = now
res = []
start = now
res = []

res.push db.query(QUERY)
res.push db.query(QUERY)
(now - start.to_f).should be_within(DELAY * res.size * 0.15).of(DELAY * res.size)
res.push db.query(QUERY)
res.push db.query(QUERY)
(now - start.to_f).should be_within(DELAY * res.size * 0.15).of(DELAY * res.size)

start = now
start = now

multi = EventMachine::Synchrony::Multi.new
multi.add :a, db.aquery(QUERY)
multi.add :b, db.aquery(QUERY)
multi.add :c, db.aquery(QUERY)
res = multi.perform
multi = EventMachine::Synchrony::Multi.new
multi.add :a, db.aquery(QUERY)
multi.add :b, db.aquery(QUERY)
multi.add :c, db.aquery(QUERY)
res = multi.perform

(now - start.to_f).should be_within(DELAY * 0.15).of(DELAY)
res.responses[:callback].size.should == 3
res.responses[:errback].size.should == 0
(now - start.to_f).should be_within(DELAY * 0.15).of(DELAY)
res.responses[:callback].size.should == 3
res.responses[:errback].size.should == 0

EventMachine.stop
}.resume
EventMachine.stop
end
end

end
end
57 changes: 24 additions & 33 deletions spec/remcached_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,38 @@
describe Memcached do

it "should yield until connection is ready" do
EventMachine.run do
Fiber.new {
Memcached.connect %w(localhost)
Memcached.usable?.should be_true
EventMachine.stop
}.resume
EventMachine.synchrony do
Memcached.connect %w(localhost)
Memcached.usable?.should be_true
EventMachine.stop
end
end

it "should fire sequential memcached requests" do
EventMachine.run do
Fiber.new {

Memcached.connect %w(localhost)
Memcached.get(key: 'hai') do |res|
res[:value].should match('Not found')
end

Memcached.set(key: 'hai', value: 'win')
Memcached.add(key: 'count')
Memcached.delete(key: 'hai')

EventMachine.stop
}.resume
EventMachine.synchrony do

Memcached.connect %w(localhost)
Memcached.get(key: 'hai') do |res|
res[:value].should match('Not found')
end

Memcached.set(key: 'hai', value: 'win')
Memcached.add(key: 'count')
Memcached.delete(key: 'hai')

EventMachine.stop
end
end

it "should fire multi memcached requests" do
pending "remcached borked? opened a ticket"

EventMachine.run do
Fiber.new {

Memcached.connect %w(localhost)

Memcached.multi_get([{:key => 'foo'},{:key => 'bar'}, {:key => 'test'}]) do |res|
p res
end

EventMachine.synchrony do
pending "patch mult-get"

Memcached.connect %w(localhost)
Memcached.multi_get([{:key => 'foo'},{:key => 'bar'}, {:key => 'test'}]) do |res|
# TODO
EventMachine.stop
}.resume
end
end
end

Expand Down

0 comments on commit ffd7084

Please sign in to comment.