Skip to content

Commit

Permalink
Merge pull request #236 from ably/fix_issue_233
Browse files Browse the repository at this point in the history
Fixed issue 233, made changes to allow ITransport mocking
  • Loading branch information
paddybyers committed Nov 18, 2016
2 parents 7fc6b24 + 78a23cc commit c94ebcb
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 23 deletions.
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/rest/Auth.java
Expand Up @@ -681,7 +681,7 @@ public TokenAuth getTokenAuth() {

public void onAuthError(ErrorInfo err) {
/* we're only interested in token expiry errors */
if(err.code == 40140)
if(err.code >= 40140 && err.code < 40150)
tokenAuth.clear();
}

Expand Down
Expand Up @@ -231,8 +231,13 @@ synchronized void notifyState(ITransport transport, StateIndication state) {

synchronized void notifyState(StateIndication state) {
Log.v(TAG, "notifyState(): notifying " + state.state + "; id = " + connection.key);
indicatedState = state;
notify();
if (Thread.currentThread() == mgrThread) {
handleStateChange(state);
}
else {
indicatedState = state;
notify();
}
}

public void ping(final CompletionListener listener) {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/transport/Defaults.java
Expand Up @@ -28,7 +28,7 @@ public class Defaults {


public static final String[] TRANSPORTS = new String[]{"web_socket"};
public static final String TRANSPORT = "io.ably.lib.transport.WebSocketTransport$Factory";
public static String TRANSPORT = "io.ably.lib.transport.WebSocketTransport$Factory";
public static final int HTTP_MAX_RETRY_COUNT = 3;

public static int getPort(ClientOptions options) {
Expand Down
Expand Up @@ -41,10 +41,10 @@ public WebSocketTransport getTransport(TransportParams params, ConnectionManager
}

/******************
* private constructor
* protected constructor
******************/

private WebSocketTransport(TransportParams params, ConnectionManager connectionManager) {
protected WebSocketTransport(TransportParams params, ConnectionManager connectionManager) {
this.params = params;
this.connectionManager = connectionManager;
this.channelBinaryMode = params.options.useBinaryProtocol;
Expand Down
23 changes: 18 additions & 5 deletions lib/src/test/java/io/ably/lib/test/common/Helpers.java
Expand Up @@ -296,9 +296,10 @@ public synchronized ErrorInfo waitFor(ConnectionState state) {
*/
public synchronized void waitFor(ConnectionState state, int count) {
Log.d(TAG, "waitFor(state=" + state + ", count=" + count + ")");
while(connection.state != state || stateCounts.get(state).value < count)

while(getStateCount(state) < count)
try { wait(); } catch(InterruptedException e) {}
Log.d(TAG, "waitFor done: state=" + connection.state + ", count=" + stateCounts.get(state).value + ")");
Log.d(TAG, "waitFor done: state=" + connection.state + ", count=" + getStateCount(state) + ")");
}

/**
Expand All @@ -307,16 +308,20 @@ public synchronized void waitFor(ConnectionState state, int count) {
* @param state
* @param count
* @param time timeout in ms
* @return true if state was reached
*/
public synchronized void waitFor(ConnectionState state, int count, long time) {
public synchronized boolean waitFor(ConnectionState state, int count, long time) {
Log.d(TAG, "waitFor(state=" + state + ", count=" + count + ", time=" + time + ")");
long targetTime = System.currentTimeMillis() + time;
long remaining = time;
while((connection.state != state || stateCounts.get(state).value < count) && remaining > 0) {
while(getStateCount(state) < count && remaining > 0) {
try { wait(remaining); } catch(InterruptedException e) {}
remaining = targetTime - System.currentTimeMillis();
}
Log.d(TAG, "waitFor done: state=" + connection.state + ", count=" + stateCounts.get(state).value + ")");
int stateCount = getStateCount(state);
Log.d(TAG, "waitFor done: state=" + connection.state +
", count=" + Integer.toString(stateCount)+ ")");
return stateCount >= count;
}

/**
Expand Down Expand Up @@ -350,6 +355,14 @@ public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChan
}
}

/**
* Helper function
*/
private synchronized int getStateCount(ConnectionState state) {
Counter counter = stateCounts.get(state);
return counter != null ? counter.value : 0;
}

/**
* Internal
*/
Expand Down
Expand Up @@ -7,6 +7,9 @@
import static org.junit.Assert.fail;

import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.transport.ITransport;
import io.ably.lib.transport.WebSocketTransport;
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.realtime.ConnectionStateListener;
import io.ably.lib.rest.Auth;
Expand Down Expand Up @@ -184,16 +187,20 @@ public void connect_token_expire_disconnected() {
final Setup.TestVars optsTestVars = Setup.getTestVars();
ClientOptions optsForToken = optsTestVars.createOptions(optsTestVars.keys[0].keyStr);
optsForToken.logLevel = Log.VERBOSE;
final Auth.AuthOptions authOptions = new Auth.AuthOptions();
authOptions.queryTime = true;
final AblyRest ablyForToken = new AblyRest(optsForToken);
TokenDetails tokenDetails = ablyForToken.auth.requestToken(new TokenParams() {{ ttl = 8000L; }}, null);
final TokenDetails tokenDetails = ablyForToken.auth.requestToken(new TokenParams() {{ ttl = 8000L; }}, authOptions);
assertNotNull("Expected token value", tokenDetails.token);

/* implement callback, using Ably instance with key */
final class TokenGenerator implements TokenCallback {
@Override
public Object getTokenRequest(TokenParams params) throws AblyException {
++cbCount;
return ablyForToken.auth.requestToken(params, null);
if(cbCount++ == 0)
return tokenDetails;
else
return ablyForToken.auth.requestToken(params, authOptions);
}
public int getCbCount() { return cbCount; }
private int cbCount = 0;
Expand All @@ -204,7 +211,6 @@ public Object getTokenRequest(TokenParams params) throws AblyException {
/* create Ably realtime instance without key */
final TestVars testVars = Setup.getTestVars();
ClientOptions opts = testVars.createOptions();
opts.tokenDetails = tokenDetails;
opts.authCallback = authCallback;
opts.logLevel = Log.VERBOSE;
AblyRealtime ably = new AblyRealtime(opts);
Expand All @@ -215,20 +221,17 @@ public Object getTokenRequest(TokenParams params) throws AblyException {
assertEquals("Verify connected state is reached", ConnectionState.connected, ably.connection.state);

/* wait for disconnected state (on token expiry), with timeout */
connectionWaiter.waitFor(ConnectionState.disconnected, 1, 30000L);
assertEquals("Verify disconnected state is reached", ConnectionState.disconnected, ably.connection.state);
assertTrue("Verify disconnected state is reached", connectionWaiter.waitFor(ConnectionState.disconnected, 1, 30000L));

/* wait for connected state (on token renewal) */
connectionWaiter.waitFor(ConnectionState.connected, 1, 30000L);
assertEquals("Verify connected state is reached", ConnectionState.connected, ably.connection.state);
assertTrue("Verify connected state is reached", connectionWaiter.waitFor(ConnectionState.connected, 2, 30000L));

/* verify that our token generator was called */
assertEquals("Expected token generator to be called", 1, authCallback.getCbCount());
assertEquals("Expected token generator to be called", 2, authCallback.getCbCount());

/* end */
ably.close();
connectionWaiter.waitFor(ConnectionState.closed);
assertEquals("Verify closed state is reached", ConnectionState.closed, ably.connection.state);
} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
Expand Down Expand Up @@ -371,6 +374,9 @@ public Object getTokenRequest(TokenParams params) throws AblyException {
};
optsForRealtime.tokenDetails = tokenDetails;
ablyRealtime = new AblyRealtime(optsForRealtime);

(new ConnectionWaiter(ablyRealtime.connection)).waitFor(ConnectionState.connected);

ablyRealtime.connection.on(new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateChange state) {
Expand All @@ -384,8 +390,6 @@ public void onConnectionStateChanged(ConnectionStateChange state) {
connectionWaiter.waitFor(ConnectionState.failed);

List<ConnectionState> correctHistory = Arrays.asList(
ConnectionState.connecting,
ConnectionState.connected,
ConnectionState.disconnected,
ConnectionState.connecting,
ConnectionState.disconnected,
Expand All @@ -406,4 +410,73 @@ public void onConnectionStateChanged(ConnectionStateChange state) {
ablyRealtime.close();
}
}

/**
* Throw exception in authCallback repeatedly and check if connection goes into suspended state
*/
@Test
public void connect_auth_failure_and_suspend_test() {
AblyRealtime ablyRealtime = null;
AblyRest ablyRest = null;
int oldSuspendTimeout = Defaults.TIMEOUT_SUSPEND;
int oldDisconnectTimeout = Defaults.TIMEOUT_DISCONNECT;

try {
/* Make test faster */
Defaults.TIMEOUT_SUSPEND = 2000;
Defaults.TIMEOUT_DISCONNECT = 1000;

final int[] numberOfAuthCalls = new int[] {0};
final boolean[] reachedFinalState = new boolean[] {false};

TestVars testVars = Setup.getTestVars();
ClientOptions opts = testVars.createOptions(testVars.keys[0].keyStr);

ablyRest = new AblyRest(opts);
final TokenDetails tokenDetails = ablyRest.auth.requestToken(new TokenParams() {{ ttl = 5000L; }}, null);
assertNotNull("Expected token value", tokenDetails.token);

ClientOptions optsForRealtime = testVars.createOptions();
optsForRealtime.authCallback = new TokenCallback() {
@Override
public Object getTokenRequest(TokenParams params) throws AblyException {
if (numberOfAuthCalls[0]++ == 0)
return tokenDetails;
else
throw AblyException.fromErrorInfo(new ErrorInfo("Auth failure", 90000));
}
};
ablyRealtime = new AblyRealtime(optsForRealtime);

ablyRealtime.connection.on(new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateChange state) {
System.out.println(String.format("New state: %s", state.current));
synchronized (reachedFinalState) {
reachedFinalState[0] = state.current == ConnectionState.closed ||
state.current == ConnectionState.suspended ||
state.current == ConnectionState.failed;
reachedFinalState.notify();
}
}
});

synchronized (reachedFinalState) {
while (!reachedFinalState[0]) {
try { reachedFinalState.wait(); } catch (InterruptedException e) {}
}
}

assertEquals("Verify suspended state", ablyRealtime.connection.state, ConnectionState.suspended);
} catch (AblyException e) {
e.printStackTrace();
fail("init0: Unexpected exception instantiating library");
} finally {
Defaults.TIMEOUT_SUSPEND = oldSuspendTimeout;
Defaults.TIMEOUT_DISCONNECT = oldDisconnectTimeout;
if (ablyRealtime != null)
ablyRealtime.close();
}
}

}

0 comments on commit c94ebcb

Please sign in to comment.