Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mattheworiordan committed Oct 2, 2016
1 parent 8097b2c commit c6a47fd
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 20 deletions.
26 changes: 13 additions & 13 deletions lib/ably/modules/event_emitter.rb
Expand Up @@ -102,21 +102,21 @@ def emit(event_name, *args)
#
# @return [void]
def off(*event_names, &block)
keys = if event_names.empty?
callbacks.keys
else
event_names
end

if event_names.empty? && block_given?
callbacks[nil].delete_if { |proc_hash| proc_hash[:block] == block }
end

keys.each do |event_name|
if event_names.empty?
if block_given?
callbacks[callbacks_event_coerced(event_name)].delete_if { |proc_hash| proc_hash[:block] == block }
callbacks.each do |event_name, registered_callbacks|
registered_callbacks.delete_if { |proc_hash| proc_hash[:block] == block }
end
else
callbacks[callbacks_event_coerced(event_name)].clear
callbacks.clear
end
else
event_names.each do |event_name|
if block_given?
callbacks[callbacks_event_coerced(event_name)].delete_if { |proc_hash| proc_hash[:block] == block }
else
callbacks[callbacks_event_coerced(event_name)].clear
end
end
end
end
Expand Down
6 changes: 5 additions & 1 deletion lib/ably/realtime/channel.rb
Expand Up @@ -215,7 +215,11 @@ def detach(&success_block)

raise exception_for_state_change_to(:detaching) if failed?

transition_state_machine :detaching if can_transition_to?(:detaching)
if can_transition_to?(:detaching)
transition_state_machine :detaching
else
transition_state_machine! :detached
end
deferrable_for_state_change_to(STATE.Detached, &success_block)
end

Expand Down
9 changes: 9 additions & 0 deletions lib/ably/realtime/channel/channel_manager.rb
Expand Up @@ -55,6 +55,15 @@ def request_reattach(reason: nil)
channel.set_failed_channel_error_reason(reason) if reason
end

def duplicate_attached_received(error)
if error
channel.set_failed_channel_error_reason error
emit_error error
else
logger.debug "ChannelManager: Extra ATTACHED message received for #{channel.state} channel '#{channel.name}'"
end
end

# When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity)
# then all messages published but awaiting an ACK from Ably should be failed with a NACK
def fail_messages_awaiting_ack(error, immediately: false)
Expand Down
6 changes: 5 additions & 1 deletion lib/ably/realtime/client/incoming_message_dispatcher.rb
Expand Up @@ -99,7 +99,11 @@ def dispatch_protocol_message(*args)
when ACTION.Attach
when ACTION.Attached
get_channel(protocol_message.channel).tap do |channel|
channel.transition_state_machine :attached, reason: protocol_message.error, resumed: protocol_message.channel_resumed?, protocol_message: protocol_message unless channel.attached?
if channel.attached?
channel.manager.duplicate_attached_received protocol_message.error
else
channel.transition_state_machine :attached, reason: protocol_message.error, resumed: protocol_message.channel_resumed?, protocol_message: protocol_message
end
end

when ACTION.Detach
Expand Down
75 changes: 70 additions & 5 deletions spec/acceptance/realtime/channel_spec.rb
Expand Up @@ -307,6 +307,26 @@
end
end

context 'when state is :suspended' do
it 'moves the channel state immediately to DETACHED state (RTL5j)' do
channel.attach do
channel.once(:suspended) do
channel.on do |channel_state_change|
expect(channel_state_change.current).to eq(:detached)
expect(channel.state).to eq(:detached)
EventMachine.add_timer(1) do
stop_reactor
end
end
EventMachine.next_tick do
channel.detach
end
end
channel.transition_state_machine :suspended
end
end
end

context 'when state is :initialized' do
it 'does nothing as there is no channel to detach' do
expect(channel).to be_initialized
Expand All @@ -326,12 +346,57 @@
end
end

describe 'channel recovery in :attaching state' do
context 'the transport is disconnected before the ATTACHED protocol message is received' do
skip 'attach times out and fails if not ATTACHED protocol message received'
skip 'channel is ATTACHED if ATTACHED protocol message is later received'
skip 'sends an ATTACH protocol message in response to a channel message being received on the attaching channel'
describe 'automatic channel recovery' do
let(:realtime_request_timeout) { 2 }
let(:client_options) do
default_options.merge(realtime_request_timeout: 2, log_level: :fatal)
end

context 'when an ATTACH request times out' do
it 'moves to the SUSPENDED state (RTL4f)' do
connection.once(:connected) do
attach_request_sent_at = Time.now
channel.attach
client.connection.__incoming_protocol_msgbus__.unsubscribe
channel.once(:suspended) do
expect(attach_request_sent_at.to_i).to be_within(realtime_request_timeout + 1).of(Time.now.to_i)
stop_reactor
end
end
end
end

context 'if a subsequent ATTACHED is received on an ATTACHED channel' do
it 'ignores the additional ATTACHED' do
channel.attach do
channel.once do |obj|
fail "No state change expected: #{obj}"
end
attached_message = Ably::Models::ProtocolMessage.new(action: 11, channel: channel_name) # ATTACHED
client.connection.__incoming_protocol_msgbus__.publish :protocol_message, attached_message
EventMachine.add_timer(1) do
channel.off
stop_reactor
end
end
end

it 'emits an error if the ATTACHED contains an error' do
channel.attach do
channel.on(:error) do |error|
expect(error.code).to eql(50505)
expect(channel.error_reason.code).to eql(50505)
stop_reactor
end
attached_message = Ably::Models::ProtocolMessage.new(action: 11, channel: channel_name, error: { code: 50505 }) # ATTACHED with error
client.connection.__incoming_protocol_msgbus__.publish :protocol_message, attached_message
end
end
end
# skip 'attach times out and fails if not ATTACHED protocol message received'
# skip 'channel is ATTACHED if ATTACHED protocol message is later received'
# skip 'sends an ATTACH protocol message in response to a channel message being received on the attaching channel'
# end
end

context '#publish' do
Expand Down

0 comments on commit c6a47fd

Please sign in to comment.