Skip to content

Commit

Permalink
RTN16b - recovery of msgSerial on connection recover
Browse files Browse the repository at this point in the history
Fixes #180
  • Loading branch information
mattheworiordan committed Apr 27, 2019
1 parent 384d0b8 commit 58dc55f
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 20 deletions.
30 changes: 16 additions & 14 deletions lib/ably/realtime/connection.rb
Expand Up @@ -66,7 +66,7 @@ class Connection
ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'

# Expected format for a connection recover key
RECOVER_REGEX = /^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+)$/
RECOVER_REGEX = /^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+):(?<msg_serial>\-?\d+)$/

# Defaults for automatic connection recovery and timeouts
DEFAULTS = {
Expand Down Expand Up @@ -137,7 +137,7 @@ def initialize(client, options)
@client = client
@__outgoing_message_queue__ = []
@__pending_message_ack_queue__ = []
reset_client_serial
reset_client_msg_serial

@defaults = DEFAULTS.dup
options.each do |key, val|
Expand Down Expand Up @@ -303,11 +303,11 @@ def internet_up?
# @!attribute [r] recovery_key
# @return [String] recovery key that can be used by another client to recover this connection with the :recover option
def recovery_key
"#{key}:#{serial}" if connection_resumable?
"#{key}:#{serial}:#{client_msg_serial}" if connection_resumable?
end

# Following a new connection being made, the connection ID, connection key
# and message serial need to match the details provided by the server.
# and connection serial need to match the details provided by the server.
#
# @return [void]
# @api private
Expand Down Expand Up @@ -439,9 +439,11 @@ def create_websocket_transport
logger.debug { "Resuming connection key #{key} with serial #{serial}" }
elsif connection_recoverable?
url_params.merge! recover: connection_recover_parts[:recover], connectionSerial: connection_recover_parts[:connection_serial]
recovered_msg_serial = connection_recover_parts[:msg_serial].to_i
logger.debug { "Recovering connection with key #{client.recover}" }
unsafe_once(:connected, :closed, :failed) do
client.disable_automatic_connection_recovery
@client_msg_serial = recovered_msg_serial
end
end

Expand Down Expand Up @@ -541,11 +543,11 @@ def heartbeat_interval
defaults.fetch(:realtime_request_timeout)
end

# Resets the client serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage}
# (see #client_serial)
# Resets the client message serial (msgSerial) sent to Ably for each new {Ably::Models::ProtocolMessage}
# (see #client_msg_serial)
# @api private
def reset_client_serial
@client_serial = -1
def reset_client_msg_serial
@client_msg_serial = -1
end

# When a hearbeat or any other message from Ably is received
Expand All @@ -567,15 +569,15 @@ def time_since_connection_confirmed_alive?

private

# The client serial is incremented for every message that is published that requires an ACK.
# The client message serial (msgSerial) is incremented for every message that is published that requires an ACK.
# Note that this is different to the connection serial that contains the last known serial number
# received from the server.
#
# A message serial number does not guarantee a message has been received, only sent.
# A connection serial guarantees the server has received the message and is thus used for connection recovery and resumes.
# @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
def client_serial
@client_serial
def client_msg_serial
@client_msg_serial
end

def resume_callbacks
Expand All @@ -600,11 +602,11 @@ def add_message_serial_if_ack_required_to(protocol_message)
end

def add_message_serial_to(protocol_message)
@client_serial += 1
protocol_message[:msgSerial] = client_serial
@client_msg_serial += 1
protocol_message[:msgSerial] = client_msg_serial
yield
rescue StandardError => e
@client_serial -= 1
@client_msg_serial -= 1
raise e
end

Expand Down
2 changes: 1 addition & 1 deletion lib/ably/realtime/connection/connection_manager.rb
Expand Up @@ -100,7 +100,7 @@ def connected(protocol_message)
resend_pending_message_ack_queue
else
logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{protocol_message.connection_id} and key #{protocol_message.connection_key}" }
connection.reset_client_serial
connection.reset_client_msg_serial
nack_messages_on_all_channels protocol_message.error
force_reattach_on_channels protocol_message.error
end
Expand Down
4 changes: 2 additions & 2 deletions spec/acceptance/realtime/connection_failures_spec.rb
Expand Up @@ -921,7 +921,7 @@ def fail_if_suspended_or_failed
end
end

it 'retains the client_serial (#RTN15c2, #RTN15c3)' do
it 'retains the client_msg_serial (#RTN15c2, #RTN15c3)' do
last_message = nil
channel = client.channels.get("foo")

Expand Down Expand Up @@ -1103,7 +1103,7 @@ def kill_connection_transport_and_prevent_valid_resume
end
end

it 'resets the client_serial (#RTN15c3)' do
it 'resets the client_msg_serial (#RTN15c3)' do
last_message = nil
channel = client.channels.get("foo")

Expand Down
36 changes: 33 additions & 3 deletions spec/acceptance/realtime/connection_spec.rb
Expand Up @@ -1156,7 +1156,7 @@ def self.available_states
expected_serial += 1 # attach message received
expect(connection.serial).to eql(expected_serial)

expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}")
expect(connection.recovery_key).to eql("#{connection.key}:#{connection.serial}:#{connection.send(:client_msg_serial)}")
stop_reactor
end
end
Expand Down Expand Up @@ -1267,6 +1267,36 @@ def self.available_states
end
end
end

context 'when messages have been published' do
describe 'the new connection' do
it 'uses the correct msgSerial from the old connection' do
msg_serial, recovery_key, connection_id = nil, nil, nil

channel.attach do
expect(connection.send(:client_msg_serial)).to eql(-1) # no messages published yet
connection_id = client.connection.id
connection.transport.__incoming_protocol_msgbus__
channel.publish('event', 'message') do
msg_serial = connection.send(:client_msg_serial)
expect(msg_serial).to eql(0)
recovery_key = client.connection.recovery_key
connection.transition_state_machine! :failed
end
end

connection.on(:failed) do
recover_client = auto_close Ably::Realtime::Client.new(default_options.merge(recover: recovery_key))
recover_client_channel = recover_client.channel(channel_name)
recover_client_channel.attach do
expect(recover_client.connection.id).to eql(connection_id)
expect(recover_client.connection.send(:client_msg_serial)).to eql(msg_serial)
stop_reactor
end
end
end
end
end
end

context 'with :recover option' do
Expand All @@ -1280,7 +1310,7 @@ def self.available_states
end

context 'with invalid formatted value sent to server' do
let(:client_options) { default_options.merge(recover: 'not-a-valid-connection-key:1', log_level: :none) }
let(:client_options) { default_options.merge(recover: 'not-a-valid-connection-key:1:0', log_level: :none) }

it 'sets the #error_reason and moves the connection to FAILED' do
connection.once(:failed) do |state_change|
Expand All @@ -1295,7 +1325,7 @@ def self.available_states
end

context 'with expired (missing) value sent to server' do
let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0', log_level: :fatal) }
let(:client_options) { default_options.merge(recover: 'wVIsgTHAB1UvXh7z-1991d8586:0:0', log_level: :fatal) }

it 'connects but sets the error reason and includes the reason in the state change' do
connection.once(:connected) do |state_change|
Expand Down

0 comments on commit 58dc55f

Please sign in to comment.