Permalink
Browse files

Handle various connection failures

Handle initial connection failure to make sure that the provided tube
is used and watched properly despite the initial connection failing.

Handle reconnect scenarios where the first reconnect attempt fails
to make sure the use and watch states are correctly restored for all
tubes.

When performing the reconnect, make sure that the use, watch, and ignore
commands get on the callback queue prior to the reserve command when
each_job is being used.

When in fiberized mode, make sure to register an errback on the deferrable
so that the fiber can be resumed when an error occurs.  Also make sure to
not invoke the each_job callback in this case since there is no job to
pass back to the caller.
  • Loading branch information...
1 parent 00cea91 commit 716a1e7f78aab3796926967231b6eac896a9d8a4 @cbascom cbascom committed Jun 27, 2012
Showing with 144 additions and 45 deletions.
  1. +35 −26 lib/em-jack/connection.rb
  2. +56 −7 spec/em-jack/connection_spec.rb
  3. +53 −12 spec/em-jack/fiber_spec.rb
View
61 lib/em-jack/connection.rb
@@ -46,20 +46,30 @@ def initialize(opts = {})
end
unless @tube.nil?
- use(@tube)
- watch(@tube)
+ @use_on_connect = @tube
+ @watch_on_connect = [@tube]
+ initialize_tube_state
end
end
+ def initialize_tube_state
+ @fiberized ? ause(@use_on_connect) : use(@use_on_connect) if @use_on_connect
+
+ [@watch_on_connect].flatten.compact.each do |tube|
+ @fiberized ? awatch(tube) : watch(tube)
+ end
+
+ ignore = [@watched_tubes].flatten.compact - [@watch_on_connect].flatten.compact
+ ignore.each { |tube| @fiberized ? aignore(tube) : ignore(tube) }
+ end
+
def reset_tube_state
- prev_used = @used_tube
- prev_watched = @watched_tubes.dup if @watched_tubes
+ @use_on_connect ||= @used_tube
+ @watch_on_connect ||= @watched_tubes.dup if @watched_tubes
@used_tube = 'default'
@watched_tubes = ['default']
@deferrables = []
-
- return [prev_used, prev_watched]
end
def fiber!
@@ -72,9 +82,12 @@ def fiber!
define_method(meth.to_sym) do |*args|
fib = Fiber.current
ameth = :"a#{meth}"
- proc = lambda { |*result| fib.resume(*result) }
- send(ameth, *args, &proc)
- Fiber.yield
+ df = send(ameth, *args)
+ if df
+ df.callback { |*result| fib.resume(*result) }
+ df.errback { fib.resume }
+ Fiber.yield
+ end
end
end
end
@@ -231,8 +244,8 @@ def each_job(timeout = nil, &blk)
if (@fiberized)
work = Proc.new do
job = reserve(timeout)
- blk.call(job)
- EM.next_tick { Fiber.new { work.call }.resume }
+ blk.call(job) if job
+ EM.next_tick { Fiber.new { work.call }.resume }
end
else
work = Proc.new do
@@ -245,7 +258,7 @@ def each_job(timeout = nil, &blk)
EM.next_tick { work.call }
end
end
- end
+ end
work.call
end
@@ -255,6 +268,8 @@ def connected
succeed
@connected = true
@connected_callback.call if @connected_callback
+ @use_on_connect = nil
+ @watch_on_connect = nil
end
def disconnected
@@ -275,9 +290,10 @@ def disconnected
end
end
- prev_used, prev_watched = reset_tube_state
+ reset_tube_state
+ initialize_tube_state
unless @reconnect_proc
- recon = Proc.new { reconnect(prev_used, prev_watched) }
+ recon = Proc.new { @conn.reconnect(@host, @port) }
if @fiberized
@reconnect_proc = Proc.new { Fiber.new { recon.call }.resume }
else
@@ -289,21 +305,14 @@ def disconnected
EM.add_timer(5) { @reconnect_proc.call }
end
- def reconnect(prev_used, prev_watched)
- @conn.reconnect(@host, @port)
-
- use(prev_used) if prev_used
-
- [prev_watched].flatten.compact.each do |tube|
- @fiberized ? awatch(tube) : watch(tube)
- end
- end
-
def reconnect!
@retries = 0
- prev_used, prev_watched = reset_tube_state
- EM.next_tick { reconnect(prev_used, prev_watched) }
+ reset_tube_state
+ EM.next_tick do
+ @conn.reconnect(@host, @port)
+ initialize_tube_state
+ end
end
def add_deferrable(&blk)
View
63 spec/em-jack/connection_spec.rb
@@ -28,6 +28,7 @@
it "should parse tube" do
connection_mock.should_receive(:send).once.with(:use, "leetube")
connection_mock.should_receive(:send).once.with(:watch, "leetube")
+ connection_mock.should_receive(:send).once.with(:ignore, "default")
conn.connected
end
@@ -45,19 +46,42 @@
conn.port.should == 11300
end
- it 'watch and use to provided tube on connect' do
+ it "use_on_connect and watch_on_connect to provided tube" do
+ conn = EMJack::Connection.new(:tube => 'mytube')
+ conn.instance_variable_get("@use_on_connect").should == 'mytube'
+ conn.instance_variable_get("@watch_on_connect").should == ['mytube']
+ end
+ end
+
+ describe "connected" do
+ it 'watch and use the provided tube and ignore the default tube on connect' do
connection_mock.should_receive(:send).once.with(:use, "mytube")
connection_mock.should_receive(:send).once.with(:watch, "mytube")
+ connection_mock.should_receive(:send).once.with(:ignore, "default")
conn = EMJack::Connection.new(:tube => "mytube")
conn.connected
end
+ it "should not ignore the default tube if no tube was provided" do
+ conn = EMJack::Connection.new
+ connection_mock.should_not_receive(:send)
+ conn.connected
+ end
+
+ it "should clear the use and watch on connect variables" do
+ connection_mock.as_null_object
+ conn = EMJack::Connection.new(:tube => 'mytube')
+
+ conn.connected
+ conn.instance_variable_get("@use_on_connect").should be_nil
+ conn.instance_variable_get("@watch_on_connect").should be_nil
+ end
end
describe 'sending commands' do
it "doesn't send the command until we've connected" do
conn = EMJack::Connection.new
- conn.should_not_receive(:send)
+ connection_mock.should_not_receive(:send)
conn.use("mytube")
end
@@ -76,6 +100,17 @@
conn.connected
end
+ it "uses and watches the provided tube when initial connection fails" do
+ EM.stub(:add_timer)
+ connection_mock.stub(:reconnect)
+ conn = EMJack::Connection.new(:tube => 'mytube')
+ connection_mock.should_receive(:send).once.with(:use, "mytube")
+ connection_mock.should_receive(:send).once.with(:watch, "mytube")
+ connection_mock.should_receive(:send).once.with(:ignore, "default")
+ conn.disconnected
+ conn.connected
+ end
+
it 'the "use" command' do
connection_mock.should_receive(:send).once.with(:use, "mytube")
conn.use("mytube")
@@ -295,20 +330,21 @@
EM.should_receive(:add_timer).exactly(1).times.and_yield
connection_mock.as_null_object
end
-
+
it 'reuses a used tube' do
conn.should_receive(:use).with('used')
conn.instance_variable_set(:@used_tube, 'used')
conn.disconnected
end
- it 'reuses a used tube' do
- conn.should_receive(:use).with('default')
+ it 'rewatches a watched tube' do
+ conn.should_receive(:watch).with('watched')
+ conn.instance_variable_set(:@watched_tubes, ['watched'])
conn.disconnected
end
- it 'rewatches a watched tube' do
- conn.should_receive(:watch).with('watched')
+ it "ignores the default tube if it is not on the watch list" do
+ conn.should_receive(:ignore).with('default')
conn.instance_variable_set(:@watched_tubes, ['watched'])
conn.disconnected
end
@@ -332,6 +368,19 @@
conn.instance_variable_set(:@watched_tubes, ['watched'])
conn.disconnected
end
+
+ it "rewatches and reuses previous tubes when first reconnect fails" do
+ EM.should_receive(:add_timer).exactly(1).times.and_yield
+ conn.instance_variable_set(:@used_tube, 'used')
+ conn.instance_variable_set(:@watched_tubes, ['watched'])
+ conn.disconnected
+
+ conn.should_receive(:use).with('used')
+ conn.should_receive(:watch).with('watched')
+ conn.should_receive(:ignore).with('default')
+ conn.disconnected
+ end
+
end
it 'watches and uses previous tubes on disconnect' do
View
65 spec/em-jack/fiber_spec.rb
@@ -9,7 +9,7 @@
EM.add_timer(10) { EM.stop }
Fiber.new do
- bean = EMJack::Connection.new
+ bean = EMJack::Connection.new(:tube => 'emjacktesttube')
bean.fiber!
bean.put("hello!")
@@ -21,36 +21,77 @@
end.resume
end
end
+
it "should process each job" do
EM.run do
EM.add_timer(10) { EM.stop }
-
+
job_body = ''
-
+
f = Fiber.new do
- bean = EMJack::Connection.new
+ bean = EMJack::Connection.new(:tube => 'emjacktesttube')
bean.fiber!
-
+
bean.put("hello!")
- bean.put("bonjour!")
-
+ bean.put("bonjour!")
+
mock = double()
mock.should_receive(:foo).with("hello!")
mock.should_receive(:foo).with("bonjour!")
-
+
bean.each_job(0) do |job|
mock.foo(job.body)
job_body = job.body
job.delete
end
-
+
end
-
+
f.resume
-
+
EM.add_timer(1) { EM.stop unless f.alive?; job_body.should eq "bonjour!" unless f.alive? }
-
+
end
end
+
+ it "should resume the fiber when disconnected" do
+ success = false
+ EM.run do
+ EM.add_timer(10) { EM.stop }
+
+ f = Fiber.new {
+ bean = EMJack::Connection.new(:tube => 'emjacktesttube')
+ bean.fiber!
+
+ EM.add_timer(1) { bean.disconnected }
+ bean.reserve
+
+ success = true
+ EM.stop
+ }.resume
+ end
+
+ success.should be_true
+ end
+
+ it "should not invoke the each_job block when disconnected" do
+ success = true
+ EM.run do
+ EM.add_timer(10) { EM.stop }
+
+ f = Fiber.new {
+ bean = EMJack::Connection.new(:tube => 'emjacktesttube')
+ bean.fiber!
+
+ EM.should_receive(:next_tick) { EM.stop }
+ EM.add_timer(1) { bean.disconnected }
+ bean.each_job do |job|
+ success = false
+ end
+ }.resume
+ end
+
+ success.should be_true
+ end
end
end

0 comments on commit 716a1e7

Please sign in to comment.