Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,12 @@ public void release(String channelName) {
@Override
public void onMessage(ProtocolMessage msg) {
String channelName = msg.channel;
Channel channel;
synchronized(this) { channel = channels.get(channelName); }
Channel channel = null;
synchronized(this) {
if (channels.containsKey(channelName)) {
Comment thread
ikbalkaya marked this conversation as resolved.
channel = channels.get(channelName);
}
}
if(channel == null) {
Log.e(TAG, "Received channel message for non-existent channel");
return;
Expand Down
44 changes: 38 additions & 6 deletions lib/src/test/java/io/ably/lib/test/common/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -632,24 +632,56 @@ public static RawProtocolMonitor createMonitor(Action sendAction, Action recvAct
* Wait for a given number of messages
*/
public void waitForRecv() {
waitForRecv(1);
waitForRecv(1, 6000000);
}
public void waitForSend() {
waitForSend(1);
waitForSend(1, 6000000);
}
public void waitForRecv(int count) {
waitForRecv(count, 6000000);
}
public void waitForSend(int count) {
waitForSend(count, 6000000);
}

/**
* Wait for a given number of messages
* @param count
*/
public synchronized void waitForRecv(int count) {
public synchronized void waitForRecv(int count, long timeoutInMillis) {
long timeoutAt = System.currentTimeMillis() + timeoutInMillis;
while(receivedMessages.size() < count) {
try { wait(); } catch(InterruptedException e) {}
synchronized (this) {
try {
if (System.currentTimeMillis() > timeoutAt || receivedMessages.size() >= count) {
break;
}

wait();
} catch(InterruptedException e) {}
}
}

if (receivedMessages.size() < count) {
throw new AssertionError("Did not receive expected number of messages");
}
}
public synchronized void waitForSend(int count) {
public synchronized void waitForSend(int count, long timeoutInMillis) {
long timeoutAt = System.currentTimeMillis() + timeoutInMillis;
while(sentMessages.size() < count) {
try { wait(); } catch(InterruptedException e) {}
synchronized (this) {
try {
if (System.currentTimeMillis() > timeoutAt || sentMessages.size() >= count) {
break;
}

wait();
} catch(InterruptedException e) {}
}
}

if (sentMessages.size() < count) {
throw new AssertionError("Did not send expected number of messages");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2007,6 +2007,41 @@ public void no_messages_when_channel_state_not_attached() {
}
}

/*
* Checks that the DETACHED message sent by the server when a channel is released is dropped.
*/
@Test
public void detach_message_to_released_channel_is_dropped() throws AblyException {
AblyRealtime ably = null;
long oldRealtimeTimeout = Defaults.realtimeRequestTimeout;
final String channelName = "detach_message_to_released_channel_is_dropped";

try {
DebugOptions opts = createOptions(testVars.keys[0].keyStr);
Helpers.RawProtocolMonitor monitor = Helpers.RawProtocolMonitor.createReceiver(ProtocolMessage.Action.detached);
opts.protocolListener = monitor;

/* Make test faster */
Defaults.realtimeRequestTimeout = 1000;
opts.channelRetryTimeout = 1000;

ably = new AblyRealtime(opts);
Channel channel = ably.channels.get(channelName);
channel.attach();
(new ChannelWaiter(channel)).waitFor(ChannelState.attached);

// Listen for detach messages and release the channel
ably.channels.release(channelName);
monitor.waitForRecv(1, 10000);

assertFalse(ably.channels.containsKey("messages_to_non_existent_channels_are_dropped"));
} finally {
if (ably != null)
ably.close();
Defaults.realtimeRequestTimeout = oldRealtimeTimeout;
}
}

class DetachingProtocolListener implements DebugOptions.RawProtocolListener {

public Channel theChannel;
Expand Down