Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Split response fix #20

Open
wants to merge 6 commits into from

2 participants

@mdpye

Did not handle case where packet boundaries split a body from its line terminator exactly. Line terminator not consumed, nor anything past it. For example:

OK 5\r\n12345
\r\nDELETED 10\r\n

Considerably simplified multi-packet reading and body-extraction logic.

mdpye added some commits
@mdpye mdpye Merge pull request #1 from DanielWaterworth/closable
Closable
4733bae
@mdpye mdpye Fix deprecation warnings in specs b2755c4
@mdpye mdpye Specify the problem
If one packet contains exactly the declared data size, but not the line
terminator chars, then they will not be consumed and will block
consumption of further commands
d1ac7bb
@mdpye mdpye Fix handling of responses which take bodies
There was miscounting in the consuming of the bodies when the associated
line terminator did not turn up in the same delivery.

Removed the check on the return value of the `handle` methods. They
duplicate the check made in `handles?` and nothing is ever passed to
`handle` without first being tested in `handles?`.
622e275
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 9, 2013
  1. @DanielWaterworth
  2. @DanielWaterworth
Commits on May 8, 2014
  1. @mdpye
  2. @mdpye
  3. @mdpye

    Specify the problem

    mdpye authored
    If one packet contains exactly the declared data size, but not the line
    terminator chars, then they will not be consumed and will block
    consumption of further commands
  4. @mdpye

    Fix handling of responses which take bodies

    mdpye authored
    There was miscounting in the consuming of the bodies when the associated
    line terminator did not turn up in the same delivery.
    
    Removed the check on the return value of the `handle` methods. They
    duplicate the check made in `handles?` and nothing is ever passed to
    `handle` without first being tested in `handles?`.
This page is out of date. Refresh to see the latest.
View
2  em-jack.gemspec
@@ -17,7 +17,7 @@ Gem::Specification.new do |s|
s.add_dependency 'eventmachine', ['>= 0.12.10']
- s.add_development_dependency 'bundler', ['~> 1.0.13']
+ s.add_development_dependency 'bundler', ['~> 1.3.5']
s.add_development_dependency 'rake', ['>= 0.8.7']
s.add_development_dependency 'rspec', ['~> 2.6']
View
116 lib/em-jack/connection.rb
@@ -50,6 +50,8 @@ def initialize(opts = {})
@watch_on_connect = [@tube]
initialize_tube_state
end
+
+ @intentionally_closed = false
end
def initialize_tube_state
@@ -272,39 +274,51 @@ def connected
@connected_callback.call if @connected_callback
@use_on_connect = nil
@watch_on_connect = nil
+
+ @conn.close_connection_after_writing if @intentionally_closed
+ end
+
+ def disconnect
+ @intentionally_closed = true
+ @conn.close_connection_after_writing if connected?
+ @close_df = EM::DefaultDeferrable.new
end
def disconnected
@connected = false
- d = @deferrables.dup
+ if @intentionally_closed
+ @close_df.succeed
+ else
+ d = @deferrables.dup
- ## if reconnecting, need to fail ourself to remove any callbacks
- fail
+ ## if reconnecting, need to fail ourself to remove any callbacks
+ fail
- set_deferred_status(nil)
- d.each { |df| df.fail(:disconnected) }
+ set_deferred_status(nil)
+ d.each { |df| df.fail(:disconnected) }
- if @retries >= RETRY_COUNT
- if @disconnected_callback
- @disconnected_callback.call
- else
- raise EMJack::Disconnected
+ if @retries >= RETRY_COUNT
+ if @disconnected_callback
+ @disconnected_callback.call
+ else
+ raise EMJack::Disconnected
+ end
end
- end
- reset_tube_state
- initialize_tube_state
- unless @reconnect_proc
- recon = Proc.new { @conn.reconnect(@host, @port) }
- if @fiberized
- @reconnect_proc = Proc.new { Fiber.new { recon.call }.resume }
- else
- @reconnect_proc = recon
+ reset_tube_state
+ initialize_tube_state
+ unless @reconnect_proc
+ recon = Proc.new { @conn.reconnect(@host, @port) }
+ if @fiberized
+ @reconnect_proc = Proc.new { Fiber.new { recon.call }.resume }
+ else
+ @reconnect_proc = recon
+ end
end
- end
- @retries += 1
- EM.add_timer(5) { @reconnect_proc.call }
+ @retries += 1
+ EM.add_timer(5) { @reconnect_proc.call }
+ end
end
def reconnect!
@@ -348,52 +362,34 @@ def connected?
def received(data)
@data << data
- until @data.empty?
- idx = @data.index(/\r\n/)
- break if idx.nil?
+ wait = false
+ while !wait && (idx = @data.index(/\r\n/))
+ idx += 2 # Include the line terminator
+ keyword = @data[0..idx]
- first = @data[0..(idx + 1)]
- df = @deferrables.shift
- handled, skip = false, false
EMJack::Connection.handlers.each do |h|
- handles, bytes = h.handles?(first)
-
+ handles, needs_body = h.handles?(keyword)
next unless handles
- bytes = bytes.to_i
-
- if bytes > 0
- # if this handler requires us to receive a body make sure we can get
- # the full length of body. If not, we'll go around and wait for more
- # data to be received
- body, @data = extract_body!(bytes, @data) unless bytes <= 0
- break if body.nil?
- else
- @data = @data[(@data.index(/\r\n/) + 2)..-1]
- end
-
- handled = h.handle(df, first, body, self)
- break if handled
- end
- @deferrables.unshift(df) unless handled
+ if needs_body
+ body_len = needs_body.to_i
+ if @data.length >= idx + body_len + 2 # Remember the line terminator
+ # Body is available
+ body = @data.slice(idx, body_len)
+ idx += body_len + 2
+ else
+ # We need to read more before we have all of the body
+ wait = true
+ break
+ end
+ end
- # not handled means there wasn't enough data to process a complete response
- break unless handled
- next unless @data.index(/\r\n/)
+ h.handle(@deferrables.shift, keyword, body, self)
- @data = "" if @data.nil?
+ @data = @data[idx..-1] || ""
+ end
end
end
- def extract_body!(bytes, data)
- rem = data[(data.index(/\r\n/) + 2)..-1]
- return [nil, data] if rem.bytesize < bytes
-
- body = rem[0..(bytes - 1)]
- data = rem[(bytes + 2)..-1]
- data = "" if data.nil?
-
- [body, data]
- end
end
end
View
29 spec/em-jack/connection_spec.rb
@@ -8,12 +8,12 @@
end
let(:connection_mock) do
- connection_mock = mock(:conn)
+ connection_mock = double(:conn)
connection_mock
end
before(:each) do
- EM.stub!(:connect).and_return(connection_mock)
+ EM.stub(:connect).and_return(connection_mock)
end
describe "uri connection string" do
@@ -404,11 +404,11 @@
5.times { conn.disconnected }
conn.connected
- lambda { conn.disconnected }.should_not raise_error(EMJack::Disconnected)
+ lambda { conn.disconnected }.should_not raise_error
end
it 'handles deferrables added during the fail phase' do
- EM.stub!(:add_timer)
+ EM.stub(:add_timer)
count = 0
blk = Proc.new do
@@ -425,6 +425,15 @@
conn.disconnected
count.should == 1
end
+
+ it "does not reconnect when the connection was closed intentionally" do
+ conn.stub(:reconnect) { fail }
+ connection_mock.should_receive :close_connection_after_writing
+
+ conn.disconnect
+ conn.disconnected
+ conn.connected?.should == false
+ end
end
describe 'beanstalk responses' do
@@ -568,6 +577,18 @@
conn.received("RESERVED 9 #{(msg1 + msg2).length}\r\n#{msg1}#{msg2}")
conn.received("\r\n")
end
+
+ it 'multiple responses with the \r\n in a separate packet' do
+ df.should_receive(:succeed).with do |stats|
+ stats['id'].should == 2
+ end
+
+ df2 = conn.add_deferrable
+ df2.should_receive(:fail).with(:deadline_soon)
+
+ conn.received("OK 142\r\n---\nid: 2\ntube: default\nstate: reserved\npri: 65536\nage: 2\ndelay: 0\nttr: 3\ntime-left: 0\nreserves: 1\ntimeouts: 0\nreleases: 0\nburies: 0\nkicks: 0\n")
+ conn.received("\r\nDEADLINE_SOON\r\n")
+ end
end
context 'passed blocks' do
View
2  spec/em-jack/job_spec.rb
@@ -1,7 +1,7 @@
require 'spec_helper'
describe EMJack::Job do
- let (:conn ) { mock(:conn) }
+ let (:conn) { double(:conn) }
it 'converts jobid to an integer' do
j = EMJack::Job.new(nil, "1", "body")
Something went wrong with that request. Please try again.