From 7fd69f625bbe1d35c18e7ea8c28024c246dd13da Mon Sep 17 00:00:00 2001 From: Aggelos Avgerinos Date: Thu, 12 Feb 2015 22:32:59 +0200 Subject: [PATCH] Remove connections that error with Beaneater::NotConnected error from the pool --- lib/beaneater/pool.rb | 11 +++++++---- test/pool_test.rb | 12 ++++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/lib/beaneater/pool.rb b/lib/beaneater/pool.rb index e49069e..86ebc9e 100644 --- a/lib/beaneater/pool.rb +++ b/lib/beaneater/pool.rb @@ -140,14 +140,17 @@ def safe_transmit(grace_period=1, &block) retries = 1 begin yield - rescue DrainingError, EOFError, Errno::ECONNRESET, Errno::EPIPE => ex - # TODO remove faulty connections from pool? - # https://github.com/kr/beanstalk-client-ruby/blob/master/lib/beanstalk-client/connection.rb#L405-410 + rescue NotConnected, DrainingError, EOFError, Errno::ECONNRESET, Errno::EPIPE => ex if retries < MAX_RETRIES retries += 1 sleep(grace_period) if grace_period retry else # finished retrying, fail out + if ex.is_a?(NotConnected) + connection = @connections.delete(ex.connection) + connection.close + end + ex.is_a?(DrainingError) ? raise(ex) : raise(NotConnected, "Could not connect!") end end @@ -165,4 +168,4 @@ def host_from_env end end # Pool -end # Beaneater \ No newline at end of file +end # Beaneater diff --git a/test/pool_test.rb b/test/pool_test.rb index 9067909..d4395b5 100644 --- a/test/pool_test.rb +++ b/test/pool_test.rb @@ -152,6 +152,18 @@ assert_equal 'INSERTED', res[:status] end + it "removes the connection with Beaneater::NotConnected" do + first_connection = @bp.connections.first + connections_count = @bp.connections.count + first_connection.stubs(:transmit). + raises(Beaneater::NotConnected.new(first_connection)) + + assert_raises(Beaneater::NotConnected) do + @bp.transmit_to_all "puts 0 0 10 2 \r\nxy" + end + assert_equal @bp.connections.count, connections_count - 1 + end + it "should retry 3 times for temporary failed connection with EOFError" do TCPSocket.any_instance.expects(:write).times(3) TCPSocket.any_instance.expects(:readline).raises(EOFError).then.