Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #17 from cbascom/each_job_reconnect

Handle various connection failures
  • Loading branch information...
commit 42419abad3c8584a44d938e2026c8b4e10db563e 2 parents 00cea91 + f7d0413
@dj2 dj2 authored
View
63 lib/em-jack/connection.rb
@@ -46,20 +46,32 @@ def initialize(opts = {})
end
unless @tube.nil?
- use(@tube)
- watch(@tube)
+ @use_on_connect = @tube
+ @watch_on_connect = [@tube]
+ initialize_tube_state
end
end
+ def initialize_tube_state
+ if @use_on_connect
+ @fiberized ? ause(@use_on_connect) : use(@use_on_connect)
+ end
+
+ [@watch_on_connect].flatten.compact.each do |tube|
+ @fiberized ? awatch(tube) : watch(tube)
+ end
+
+ ignore = [@watched_tubes].flatten.compact - [@watch_on_connect].flatten.compact
+ ignore.each { |tube| @fiberized ? aignore(tube) : ignore(tube) }
+ end
+
def reset_tube_state
- prev_used = @used_tube
- prev_watched = @watched_tubes.dup if @watched_tubes
+ @use_on_connect ||= @used_tube
+ @watch_on_connect ||= @watched_tubes.dup if @watched_tubes
@used_tube = 'default'
@watched_tubes = ['default']
@deferrables = []
-
- return [prev_used, prev_watched]
end
def fiber!
@@ -72,9 +84,12 @@ def fiber!
define_method(meth.to_sym) do |*args|
fib = Fiber.current
ameth = :"a#{meth}"
- proc = lambda { |*result| fib.resume(*result) }
- send(ameth, *args, &proc)
- Fiber.yield
+ df = send(ameth, *args)
+ if df
+ df.callback { |*result| fib.resume(*result) }
+ df.errback { fib.resume }
+ Fiber.yield
+ end
end
end
end
@@ -231,8 +246,8 @@ def each_job(timeout = nil, &blk)
if (@fiberized)
work = Proc.new do
job = reserve(timeout)
- blk.call(job)
- EM.next_tick { Fiber.new { work.call }.resume }
+ blk.call(job) if job
+ EM.next_tick { Fiber.new { work.call }.resume }
end
else
work = Proc.new do
@@ -245,7 +260,7 @@ def each_job(timeout = nil, &blk)
EM.next_tick { work.call }
end
end
- end
+ end
work.call
end
@@ -255,6 +270,8 @@ def connected
succeed
@connected = true
@connected_callback.call if @connected_callback
+ @use_on_connect = nil
+ @watch_on_connect = nil
end
def disconnected
@@ -275,9 +292,10 @@ def disconnected
end
end
- prev_used, prev_watched = reset_tube_state
+ reset_tube_state
+ initialize_tube_state
unless @reconnect_proc
- recon = Proc.new { reconnect(prev_used, prev_watched) }
+ recon = Proc.new { @conn.reconnect(@host, @port) }
if @fiberized
@reconnect_proc = Proc.new { Fiber.new { recon.call }.resume }
else
@@ -289,21 +307,14 @@ def disconnected
EM.add_timer(5) { @reconnect_proc.call }
end
- def reconnect(prev_used, prev_watched)
- @conn.reconnect(@host, @port)
-
- use(prev_used) if prev_used
-
- [prev_watched].flatten.compact.each do |tube|
- @fiberized ? awatch(tube) : watch(tube)
- end
- end
-
def reconnect!
@retries = 0
- prev_used, prev_watched = reset_tube_state
- EM.next_tick { reconnect(prev_used, prev_watched) }
+ reset_tube_state
+ EM.next_tick do
+ @conn.reconnect(@host, @port)
+ initialize_tube_state
+ end
end
def add_deferrable(&blk)
View
63 spec/em-jack/connection_spec.rb
@@ -28,6 +28,7 @@
it "should parse tube" do
connection_mock.should_receive(:send).once.with(:use, "leetube")
connection_mock.should_receive(:send).once.with(:watch, "leetube")
+ connection_mock.should_receive(:send).once.with(:ignore, "default")
conn.connected
end
@@ -45,19 +46,42 @@
conn.port.should == 11300
end
- it 'watch and use to provided tube on connect' do
+ it "use_on_connect and watch_on_connect to provided tube" do
+ conn = EMJack::Connection.new(:tube => 'mytube')
+ conn.instance_variable_get("@use_on_connect").should == 'mytube'
+ conn.instance_variable_get("@watch_on_connect").should == ['mytube']
+ end
+ end
+
+ describe "connected" do
+ it 'watch and use the provided tube and ignore the default tube on connect' do
connection_mock.should_receive(:send).once.with(:use, "mytube")
connection_mock.should_receive(:send).once.with(:watch, "mytube")
+ connection_mock.should_receive(:send).once.with(:ignore, "default")
conn = EMJack::Connection.new(:tube => "mytube")
conn.connected
end
+ it "should not ignore the default tube if no tube was provided" do
+ conn = EMJack::Connection.new
+ connection_mock.should_not_receive(:send)
+ conn.connected
+ end
+
+ it "should clear the use and watch on connect variables" do
+ connection_mock.as_null_object
+ conn = EMJack::Connection.new(:tube => 'mytube')
+
+ conn.connected
+ conn.instance_variable_get("@use_on_connect").should be_nil
+ conn.instance_variable_get("@watch_on_connect").should be_nil
+ end
end
describe 'sending commands' do
it "doesn't send the command until we've connected" do
conn = EMJack::Connection.new
- conn.should_not_receive(:send)
+ connection_mock.should_not_receive(:send)
conn.use("mytube")
end
@@ -76,6 +100,17 @@
conn.connected
end
+ it "uses and watches the provided tube when initial connection fails" do
+ EM.stub(:add_timer)
+ connection_mock.stub(:reconnect)
+ conn = EMJack::Connection.new(:tube => 'mytube')
+ connection_mock.should_receive(:send).once.with(:use, "mytube")
+ connection_mock.should_receive(:send).once.with(:watch, "mytube")
+ connection_mock.should_receive(:send).once.with(:ignore, "default")
+ conn.disconnected
+ conn.connected
+ end
+
it 'the "use" command' do
connection_mock.should_receive(:send).once.with(:use, "mytube")
conn.use("mytube")
@@ -295,20 +330,21 @@
EM.should_receive(:add_timer).exactly(1).times.and_yield
connection_mock.as_null_object
end
-
+
it 'reuses a used tube' do
conn.should_receive(:use).with('used')
conn.instance_variable_set(:@used_tube, 'used')
conn.disconnected
end
- it 'reuses a used tube' do
- conn.should_receive(:use).with('default')
+ it 'rewatches a watched tube' do
+ conn.should_receive(:watch).with('watched')
+ conn.instance_variable_set(:@watched_tubes, ['watched'])
conn.disconnected
end
- it 'rewatches a watched tube' do
- conn.should_receive(:watch).with('watched')
+ it "ignores the default tube if it is not on the watch list" do
+ conn.should_receive(:ignore).with('default')
conn.instance_variable_set(:@watched_tubes, ['watched'])
conn.disconnected
end
@@ -332,6 +368,19 @@
conn.instance_variable_set(:@watched_tubes, ['watched'])
conn.disconnected
end
+
+ it "rewatches and reuses previous tubes when first reconnect fails" do
+ EM.should_receive(:add_timer).exactly(1).times.and_yield
+ conn.instance_variable_set(:@used_tube, 'used')
+ conn.instance_variable_set(:@watched_tubes, ['watched'])
+ conn.disconnected
+
+ conn.should_receive(:use).with('used')
+ conn.should_receive(:watch).with('watched')
+ conn.should_receive(:ignore).with('default')
+ conn.disconnected
+ end
+
end
it 'watches and uses previous tubes on disconnect' do
View
65 spec/em-jack/fiber_spec.rb
@@ -9,7 +9,7 @@
EM.add_timer(10) { EM.stop }
Fiber.new do
- bean = EMJack::Connection.new
+ bean = EMJack::Connection.new(:tube => 'emjacktesttube')
bean.fiber!
bean.put("hello!")
@@ -21,36 +21,77 @@
end.resume
end
end
+
it "should process each job" do
EM.run do
EM.add_timer(10) { EM.stop }
-
+
job_body = ''
-
+
f = Fiber.new do
- bean = EMJack::Connection.new
+ bean = EMJack::Connection.new(:tube => 'emjacktesttube')
bean.fiber!
-
+
bean.put("hello!")
- bean.put("bonjour!")
-
+ bean.put("bonjour!")
+
mock = double()
mock.should_receive(:foo).with("hello!")
mock.should_receive(:foo).with("bonjour!")
-
+
bean.each_job(0) do |job|
mock.foo(job.body)
job_body = job.body
job.delete
end
-
+
end
-
+
f.resume
-
+
EM.add_timer(1) { EM.stop unless f.alive?; job_body.should eq "bonjour!" unless f.alive? }
-
+
end
end
+
+ it "should resume the fiber when disconnected" do
+ success = false
+ EM.run do
+ EM.add_timer(10) { EM.stop }
+
+ f = Fiber.new {
+ bean = EMJack::Connection.new(:tube => 'emjacktesttube')
+ bean.fiber!
+
+ EM.add_timer(1) { bean.disconnected }
+ bean.reserve
+
+ success = true
+ EM.stop
+ }.resume
+ end
+
+ success.should be_true
+ end
+
+ it "should not invoke the each_job block when disconnected" do
+ success = true
+ EM.run do
+ EM.add_timer(10) { EM.stop }
+
+ f = Fiber.new {
+ bean = EMJack::Connection.new(:tube => 'emjacktesttube')
+ bean.fiber!
+
+ EM.should_receive(:next_tick) { EM.stop }
+ EM.add_timer(1) { bean.disconnected }
+ bean.each_job do |job|
+ success = false
+ end
+ }.resume
+ end
+
+ success.should be_true
+ end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.