Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/rest/Auth.java
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ public TokenAuth getTokenAuth() {

public void onAuthError(ErrorInfo err) {
/* we're only interested in token expiry errors */
if(err.code == 40140)
if(err.code >= 40140 && err.code < 40150)
tokenAuth.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,13 @@ synchronized void notifyState(ITransport transport, StateIndication state) {

synchronized void notifyState(StateIndication state) {
Log.v(TAG, "notifyState(): notifying " + state.state + "; id = " + connection.key);
indicatedState = state;
notify();
if (Thread.currentThread() == mgrThread) {
handleStateChange(state);
}
else {
indicatedState = state;
notify();
}
}

public void ping(final CompletionListener listener) {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/transport/Defaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class Defaults {


public static final String[] TRANSPORTS = new String[]{"web_socket"};
public static final String TRANSPORT = "io.ably.lib.transport.WebSocketTransport$Factory";
public static String TRANSPORT = "io.ably.lib.transport.WebSocketTransport$Factory";
public static final int HTTP_MAX_RETRY_COUNT = 3;

public static int getPort(ClientOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public WebSocketTransport getTransport(TransportParams params, ConnectionManager
}

/******************
* private constructor
* protected constructor
******************/

private WebSocketTransport(TransportParams params, ConnectionManager connectionManager) {
protected WebSocketTransport(TransportParams params, ConnectionManager connectionManager) {
this.params = params;
this.connectionManager = connectionManager;
this.channelBinaryMode = params.options.useBinaryProtocol;
Expand Down
23 changes: 18 additions & 5 deletions lib/src/test/java/io/ably/lib/test/common/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ public synchronized ErrorInfo waitFor(ConnectionState state) {
*/
public synchronized void waitFor(ConnectionState state, int count) {
Log.d(TAG, "waitFor(state=" + state + ", count=" + count + ")");
while(connection.state != state || stateCounts.get(state).value < count)

while(getStateCount(state) < count)
try { wait(); } catch(InterruptedException e) {}
Log.d(TAG, "waitFor done: state=" + connection.state + ", count=" + stateCounts.get(state).value + ")");
Log.d(TAG, "waitFor done: state=" + connection.state + ", count=" + getStateCount(state) + ")");
}

/**
Expand All @@ -307,16 +308,20 @@ public synchronized void waitFor(ConnectionState state, int count) {
* @param state
* @param count
* @param time timeout in ms
* @return true if state was reached
*/
public synchronized void waitFor(ConnectionState state, int count, long time) {
public synchronized boolean waitFor(ConnectionState state, int count, long time) {
Log.d(TAG, "waitFor(state=" + state + ", count=" + count + ", time=" + time + ")");
long targetTime = System.currentTimeMillis() + time;
long remaining = time;
while((connection.state != state || stateCounts.get(state).value < count) && remaining > 0) {
while(getStateCount(state) < count && remaining > 0) {
try { wait(remaining); } catch(InterruptedException e) {}
remaining = targetTime - System.currentTimeMillis();
}
Log.d(TAG, "waitFor done: state=" + connection.state + ", count=" + stateCounts.get(state).value + ")");
int stateCount = getStateCount(state);
Log.d(TAG, "waitFor done: state=" + connection.state +
", count=" + Integer.toString(stateCount)+ ")");
return stateCount >= count;
}

/**
Expand Down Expand Up @@ -350,6 +355,14 @@ public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChan
}
}

/**
* Helper function
*/
private synchronized int getStateCount(ConnectionState state) {
Counter counter = stateCounts.get(state);
return counter != null ? counter.value : 0;
}

/**
* Internal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import static org.junit.Assert.fail;

import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.transport.ITransport;
import io.ably.lib.transport.WebSocketTransport;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.realtime.ConnectionStateListener;
import io.ably.lib.rest.Auth;
Expand Down Expand Up @@ -184,16 +187,20 @@ public void connect_token_expire_disconnected() {
final Setup.TestVars optsTestVars = Setup.getTestVars();
ClientOptions optsForToken = optsTestVars.createOptions(optsTestVars.keys[0].keyStr);
optsForToken.logLevel = Log.VERBOSE;
final Auth.AuthOptions authOptions = new Auth.AuthOptions();
authOptions.queryTime = true;
final AblyRest ablyForToken = new AblyRest(optsForToken);
TokenDetails tokenDetails = ablyForToken.auth.requestToken(new TokenParams() {{ ttl = 8000L; }}, null);
final TokenDetails tokenDetails = ablyForToken.auth.requestToken(new TokenParams() {{ ttl = 8000L; }}, authOptions);
assertNotNull("Expected token value", tokenDetails.token);

/* implement callback, using Ably instance with key */
final class TokenGenerator implements TokenCallback {
@Override
public Object getTokenRequest(TokenParams params) throws AblyException {
++cbCount;
return ablyForToken.auth.requestToken(params, null);
if(cbCount++ == 0)
return tokenDetails;
else
return ablyForToken.auth.requestToken(params, authOptions);
}
public int getCbCount() { return cbCount; }
private int cbCount = 0;
Expand All @@ -204,7 +211,6 @@ public Object getTokenRequest(TokenParams params) throws AblyException {
/* create Ably realtime instance without key */
final TestVars testVars = Setup.getTestVars();
ClientOptions opts = testVars.createOptions();
opts.tokenDetails = tokenDetails;
opts.authCallback = authCallback;
opts.logLevel = Log.VERBOSE;
AblyRealtime ably = new AblyRealtime(opts);
Expand All @@ -215,20 +221,17 @@ public Object getTokenRequest(TokenParams params) throws AblyException {
assertEquals("Verify connected state is reached", ConnectionState.connected, ably.connection.state);

/* wait for disconnected state (on token expiry), with timeout */
connectionWaiter.waitFor(ConnectionState.disconnected, 1, 30000L);
assertEquals("Verify disconnected state is reached", ConnectionState.disconnected, ably.connection.state);
assertTrue("Verify disconnected state is reached", connectionWaiter.waitFor(ConnectionState.disconnected, 1, 30000L));

/* wait for connected state (on token renewal) */
connectionWaiter.waitFor(ConnectionState.connected, 1, 30000L);
assertEquals("Verify connected state is reached", ConnectionState.connected, ably.connection.state);
assertTrue("Verify connected state is reached", connectionWaiter.waitFor(ConnectionState.connected, 2, 30000L));

/* verify that our token generator was called */
assertEquals("Expected token generator to be called", 1, authCallback.getCbCount());
assertEquals("Expected token generator to be called", 2, authCallback.getCbCount());

/* end */
ably.close();
connectionWaiter.waitFor(ConnectionState.closed);
assertEquals("Verify closed state is reached", ConnectionState.closed, ably.connection.state);
} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
Expand Down Expand Up @@ -371,6 +374,9 @@ public Object getTokenRequest(TokenParams params) throws AblyException {
};
optsForRealtime.tokenDetails = tokenDetails;
ablyRealtime = new AblyRealtime(optsForRealtime);

(new ConnectionWaiter(ablyRealtime.connection)).waitFor(ConnectionState.connected);

ablyRealtime.connection.on(new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateChange state) {
Expand All @@ -384,8 +390,6 @@ public void onConnectionStateChanged(ConnectionStateChange state) {
connectionWaiter.waitFor(ConnectionState.failed);

List<ConnectionState> correctHistory = Arrays.asList(
ConnectionState.connecting,
ConnectionState.connected,
ConnectionState.disconnected,
ConnectionState.connecting,
ConnectionState.disconnected,
Expand All @@ -406,4 +410,73 @@ public void onConnectionStateChanged(ConnectionStateChange state) {
ablyRealtime.close();
}
}

/**
* Throw exception in authCallback repeatedly and check if connection goes into suspended state
*/
@Test
public void connect_auth_failure_and_suspend_test() {
AblyRealtime ablyRealtime = null;
AblyRest ablyRest = null;
int oldSuspendTimeout = Defaults.TIMEOUT_SUSPEND;
int oldDisconnectTimeout = Defaults.TIMEOUT_DISCONNECT;

try {
/* Make test faster */
Defaults.TIMEOUT_SUSPEND = 2000;
Defaults.TIMEOUT_DISCONNECT = 1000;

final int[] numberOfAuthCalls = new int[] {0};
final boolean[] reachedFinalState = new boolean[] {false};

TestVars testVars = Setup.getTestVars();
ClientOptions opts = testVars.createOptions(testVars.keys[0].keyStr);

ablyRest = new AblyRest(opts);
final TokenDetails tokenDetails = ablyRest.auth.requestToken(new TokenParams() {{ ttl = 5000L; }}, null);
assertNotNull("Expected token value", tokenDetails.token);

ClientOptions optsForRealtime = testVars.createOptions();
optsForRealtime.authCallback = new TokenCallback() {
@Override
public Object getTokenRequest(TokenParams params) throws AblyException {
if (numberOfAuthCalls[0]++ == 0)
return tokenDetails;
else
throw AblyException.fromErrorInfo(new ErrorInfo("Auth failure", 90000));
}
};
ablyRealtime = new AblyRealtime(optsForRealtime);

ablyRealtime.connection.on(new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateChange state) {
System.out.println(String.format("New state: %s", state.current));
synchronized (reachedFinalState) {
reachedFinalState[0] = state.current == ConnectionState.closed ||
state.current == ConnectionState.suspended ||
state.current == ConnectionState.failed;
reachedFinalState.notify();
}
}
});

synchronized (reachedFinalState) {
while (!reachedFinalState[0]) {
try { reachedFinalState.wait(); } catch (InterruptedException e) {}
}
}

assertEquals("Verify suspended state", ablyRealtime.connection.state, ConnectionState.suspended);
} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
} finally {
Defaults.TIMEOUT_SUSPEND = oldSuspendTimeout;
Defaults.TIMEOUT_DISCONNECT = oldDisconnectTimeout;
if (ablyRealtime != null)
ablyRealtime.close();
}
}

}