Skip to content

Commit

Permalink
Fix timing issues with ACK/message received
Browse files Browse the repository at this point in the history
I incorrectly assumed messages always came before ACKs, but this is no longer the case. This ensures we remove that assumption
  • Loading branch information
mattheworiordan committed Feb 24, 2017
1 parent 37b1600 commit 744290b
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 45 deletions.
36 changes: 28 additions & 8 deletions spec/acceptance/realtime/connection_spec.rb
Expand Up @@ -615,16 +615,23 @@ def expect_ordered_phases
end
end

it 'is set to 0 when a message sent ACK is received' do
channel.publish('event', 'data') do
it 'is set to 0 when a message is received back' do
channel.publish('event', 'data')
channel.subscribe do
expect(connection.serial).to eql(0)
stop_reactor
end
end

it 'is set to 1 when the second message sent ACK is received' do
it 'is set to 1 when the second message is received' do
channel.publish('event', 'data') do
channel.publish('event', 'data') do
channel.publish('event', 'data')
end

messages = []
channel.subscribe do |message|
messages << message
if messages.length == 2
expect(connection.serial).to eql(1)
stop_reactor
end
Expand Down Expand Up @@ -884,11 +891,16 @@ def self.available_states
expect(connection.serial).to eql(expected_serial)

channel.attach do
channel.publish('event', 'data') do
channel.publish('event', 'data')
channel.subscribe do
channel.unsubscribe

expected_serial += 1 # attach message received
expect(connection.serial).to eql(expected_serial)

channel.publish('event', 'data') do
channel.publish('event', 'data')
channel.subscribe do
channel.unsubscribe
expected_serial += 1 # attach message received
expect(connection.serial).to eql(expected_serial)

Expand Down Expand Up @@ -1302,6 +1314,14 @@ def self.available_states
end

context 'ConnectionStateChange object' do
def unbind
if connection.transport
connection.transport.unbind
else
EventMachine.add_timer(0.005) { unbind }
end
end

it 'has current state' do
connection.on(:connected) do |connection_state_change|
expect(connection_state_change.current).to eq(:connected)
Expand Down Expand Up @@ -1365,7 +1385,7 @@ def self.available_states
expect(connection_state_change.retry_in).to eql(0)
stop_reactor
end
EventMachine.add_timer(0.005) { connection.transport.unbind }
unbind
end
end

Expand All @@ -1386,7 +1406,7 @@ def self.available_states
expect(connection_state_change.retry_in).to be > 0
stop_reactor
end
EventMachine.add_timer(0.005) { connection.transport.unbind }
unbind
end
connection.transport.unbind
end
Expand Down
113 changes: 76 additions & 37 deletions spec/acceptance/realtime/presence_spec.rb
Expand Up @@ -468,7 +468,14 @@ def presence_action(method_name, data)

context 'once server sync is complete' do
it 'behaves like an Enumerable allowing direct access to current members' do
when_all(presence_client_one.enter, presence_client_two.enter) do
presence_client_one.enter
presence_client_two.enter

entered = 0
presence_client_one.subscribe(:enter) do
entered += 1
next unless entered == 2

presence_anonymous_client.members.once(:in_sync) do
expect(presence_anonymous_client.members.count).to eql(2)
member_ids = presence_anonymous_client.members.map(&:member_key)
Expand All @@ -495,7 +502,10 @@ def presence_action(method_name, data)

context 'when attaching to a channel with members present' do
it 'is false and the presence channel will subsequently be synced' do
presence_client_one.enter do
presence_client_one.enter
presence_client_one.subscribe(:enter) do
presence_client_one.unsubscribe :enter

channel_anonymous_client.attach do
expect(channel_anonymous_client.presence).to_not be_sync_complete
channel_anonymous_client.presence.get(wait_for_sync: true) do
Expand Down Expand Up @@ -680,36 +690,40 @@ def setup_members_on(presence)
it 'waits until sync is complete', em_timeout: 30 do # allow for slow connections and lots of messages
enter_expected_count.times do |index|
EventMachine.add_timer(index / 10) do
presence_client_one.enter_client("client:#{index}") do |message|
entered << message
next unless entered.count == enter_expected_count
presence_client_one.enter_client("client:#{index}")
end
end

presence_anonymous_client.get(wait_for_sync: true) do |members|
expect(members.map(&:client_id).uniq.count).to eql(enter_expected_count)
expect(members.count).to eql(enter_expected_count)
stop_reactor
end
end
presence_client_one.subscribe(:enter) do |message|
entered << message
next unless entered.count == enter_expected_count

presence_anonymous_client.get(wait_for_sync: true) do |members|
expect(members.map(&:client_id).uniq.count).to eql(enter_expected_count)
expect(members.count).to eql(enter_expected_count)
stop_reactor
end
end
end
end

context 'by default' do
it 'it does not wait for sync', em_timeout: 30 do # allow for slow connections and lots of messages
enter_expected_count.times do |index|
EventMachine.add_timer(index / 10) do
presence_client_one.enter_client("client:#{index}") do |message|
entered << message
next unless entered.count == enter_expected_count

channel_anonymous_client.attach do
presence_anonymous_client.get do |members|
expect(presence_anonymous_client.members).to_not be_in_sync
expect(members.count).to eql(0)
stop_reactor
end
end
enter_expected_count.times do |indx|
EventMachine.add_timer(indx / 10) do
presence_client_one.enter_client "client:#{indx}"
end
end

presence_client_one.subscribe(:enter) do |message|
entered << message
next unless entered.count == enter_expected_count

channel_anonymous_client.attach do
presence_anonymous_client.get do |members|
expect(presence_anonymous_client.members).to_not be_in_sync
expect(members.count).to eql(0)
stop_reactor
end
end
end
Expand Down Expand Up @@ -896,15 +910,25 @@ def setup_members_on(presence)

context 'and sync is complete' do
it 'does not cache members that have left' do
presence_client_one.enter enter_data do
enter_ack = false

presence_client_one.subscribe(:enter) do
presence_client_one.unsubscribe :enter

expect(presence_client_one.members).to be_in_sync
expect(presence_client_one.members.send(:members).count).to eql(1)
presence_client_one.leave data
end

presence_client_one.enter(enter_data) do
enter_ack = true
end

presence_client_one.subscribe(:leave) do |presence_message|
presence_client_one.unsubscribe :leave
expect(presence_message.data).to eql(data)
expect(presence_client_one.members.send(:members).count).to eql(0)
expect(enter_ack).to eql(true)
stop_reactor
end
end
Expand Down Expand Up @@ -1290,7 +1314,9 @@ def connect_members_deferrables
# skip 'it fails if the connection changes to failed state'

it 'returns the current members on the channel' do
presence_client_one.enter do
presence_client_one.enter
presence_client_one.subscribe(:enter) do
presence_client_one.unsubscribe :enter
presence_client_one.get do |members|
expect(members.count).to eq(1)

Expand Down Expand Up @@ -1351,7 +1377,10 @@ def connect_members_deferrables
end

it 'does not wait for SYNC to complete if :wait_for_sync option is false' do
presence_client_one.enter do
presence_client_one.enter
presence_client_one.subscribe(:enter) do
presence_client_one.unsubscribe :enter

presence_client_two.get(wait_for_sync: false) do |members|
expect(members.count).to eql(0)
stop_reactor
Expand All @@ -1362,11 +1391,13 @@ def connect_members_deferrables
context 'when a member enters and then leaves' do
it 'has no members' do
presence_client_one.enter do
presence_client_one.leave do
presence_client_one.get do |members|
expect(members.count).to eq(0)
stop_reactor
end
presence_client_one.leave
end

presence_client_one.subscribe(:leave) do
presence_client_one.get do |members|
expect(members.count).to eq(0)
stop_reactor
end
end
end
Expand Down Expand Up @@ -1524,7 +1555,10 @@ def connect_members_deferrables

context 'REST #get' do
it 'returns current members' do
presence_client_one.enter(data_payload) do
presence_client_one.enter data_payload
presence_client_one.subscribe(:enter) do
presence_client_one.unsubscribe :enter

members_page = channel_rest_client_one.presence.get
this_member = members_page.items.first

Expand All @@ -1538,7 +1572,10 @@ def connect_members_deferrables

it 'returns no members once left' do
presence_client_one.enter(data_payload) do
presence_client_one.leave do
presence_client_one.leave
presence_client_one.subscribe(:leave) do
presence_client_one.unsubscribe :leave

members_page = channel_rest_client_one.presence.get
expect(members_page.items.count).to eql(0)
stop_reactor
Expand Down Expand Up @@ -1654,7 +1691,8 @@ def connect_members_deferrables

context '#get' do
it 'returns a list of members with decrypted data' do
encrypted_channel.presence.enter(data) do
encrypted_channel.presence.enter(data)
encrypted_channel.presence.subscribe(:enter) do
encrypted_channel.presence.get do |members|
member = members.first
expect(member.encoding).to be_nil
Expand All @@ -1667,7 +1705,8 @@ def connect_members_deferrables

context 'REST #get' do
it 'returns a list of members with decrypted data' do
encrypted_channel.presence.enter(data) do
encrypted_channel.presence.enter(data)
encrypted_channel.presence.subscribe(:enter) do
member = channel_rest_client_one.presence.get.items.first
expect(member.encoding).to be_nil
expect(member.data).to eql(data)
Expand Down Expand Up @@ -1738,7 +1777,7 @@ def connect_members_deferrables
context 'connection failure mid-way through a large member sync' do
let(:members_count) { 250 }
let(:sync_pages_received) { [] }
let(:client_options) { default_options.merge(log_level: :error) }
let(:client_options) { default_options.merge(log_level: :fatal) }

it 'resumes the SYNC operation', em_timeout: 15 do
when_all(*members_count.times.map do |index|
Expand Down

0 comments on commit 744290b

Please sign in to comment.