Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/src/main/java/io/ably/lib/http/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void finalize() {
* @return
* @throws AblyException
*/
<T> T ablyHttpExecute(String path, String method, Param[] headers, Param[] params, RequestBody requestBody, ResponseHandler<T> responseHandler) throws AblyException {
public <T> T ablyHttpExecute(String path, String method, Param[] headers, Param[] params, RequestBody requestBody, ResponseHandler<T> responseHandler) throws AblyException {
int retryCountRemaining = Hosts.isRestFallbackSupported(this.host) ? options.httpMaxRetryCount : 0;
String candidateHost = this.host;
URL url;
Expand Down Expand Up @@ -355,7 +355,7 @@ <T> T ablyHttpExecute(String path, String method, Param[] headers, Param[] param
* @return
* @throws AblyException
*/
<T> T httpExecute(URL url, String method, Param[] headers, RequestBody requestBody, ResponseHandler<T> responseHandler) throws AblyException {
public <T> T httpExecute(URL url, String method, Param[] headers, RequestBody requestBody, ResponseHandler<T> responseHandler) throws AblyException {
return httpExecuteWithRetry(url, method, headers, requestBody, responseHandler, false);
}

Expand All @@ -371,7 +371,7 @@ <T> T httpExecute(URL url, String method, Param[] headers, RequestBody requestBo
* @return
* @throws AblyException
*/
<T> T httpExecute(URL url, Proxy proxy, String method, Param[] headers, RequestBody requestBody, boolean withCredentials, ResponseHandler<T> responseHandler) throws AblyException {
public <T> T httpExecute(URL url, Proxy proxy, String method, Param[] headers, RequestBody requestBody, boolean withCredentials, ResponseHandler<T> responseHandler) throws AblyException {
HttpURLConnection conn = null;
try {
conn = (HttpURLConnection)url.openConnection(proxy);
Expand Down Expand Up @@ -451,7 +451,7 @@ <T> T httpExecute(HttpURLConnection conn, String method, Param[] headers, Reques
* @return
* @throws AblyException
*/
<T> T httpExecuteWithRetry(URL url, String method, Param[] headers, RequestBody requestBody, ResponseHandler<T> responseHandler, boolean allowAblyAuth) throws AblyException {
public <T> T httpExecuteWithRetry(URL url, String method, Param[] headers, RequestBody requestBody, ResponseHandler<T> responseHandler, boolean allowAblyAuth) throws AblyException {
boolean authPending = true, renewPending = true, proxyAuthPending = true;
while(true) {
try {
Expand Down
109 changes: 64 additions & 45 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ public class ConnectionManager implements Runnable, ConnectListener {
* default errors
***********************************/

static ErrorInfo REASON_CLOSED = new ErrorInfo("Connection closed by client", 10000);
static ErrorInfo REASON_DISCONNECTED = new ErrorInfo("Connection temporarily unavailable", 80003);
static ErrorInfo REASON_SUSPENDED = new ErrorInfo("Connection unavailable", 80002);
static ErrorInfo REASON_FAILED = new ErrorInfo("Connection failed", 80000);
static ErrorInfo REASON_REFUSED = new ErrorInfo("Access refused", 40100);
static ErrorInfo REASON_TOO_BIG = new ErrorInfo("Connection closed; message too large", 40000);
static ErrorInfo REASON_NEVER_CONNECTED = new ErrorInfo("Unable to establish connection", 80002);
static ErrorInfo REASON_TIMEDOUT = new ErrorInfo("Unable to establish connection", 80014);
static ErrorInfo REASON_CLOSED = new ErrorInfo("Connection closed by client", 200, 10000);
static ErrorInfo REASON_DISCONNECTED = new ErrorInfo("Connection temporarily unavailable", 503, 80003);
static ErrorInfo REASON_SUSPENDED = new ErrorInfo("Connection unavailable", 503, 80002);
static ErrorInfo REASON_FAILED = new ErrorInfo("Connection failed", 503, 80000);
static ErrorInfo REASON_REFUSED = new ErrorInfo("Access refused", 401, 40100);
static ErrorInfo REASON_TOO_BIG = new ErrorInfo("Connection closed; message too large", 400, 40000);
static ErrorInfo REASON_NEVER_CONNECTED = new ErrorInfo("Unable to establish connection", 503, 80002);
static ErrorInfo REASON_TIMEDOUT = new ErrorInfo("Unable to establish connection", 503, 80014);

/***********************************
* a class encapsulating information
Expand All @@ -52,14 +52,14 @@ public class ConnectionManager implements Runnable, ConnectListener {
public static class StateIndication {
final ConnectionState state;
final ErrorInfo reason;
final boolean fallback;
final String fallback;
final String currentHost;

public StateIndication(ConnectionState state, ErrorInfo reason) {
this(state, reason, false, null);
this(state, reason, null, null);
}

public StateIndication(ConnectionState state, ErrorInfo reason, boolean fallback, String currentHost) {
public StateIndication(ConnectionState state, ErrorInfo reason, String fallback, String currentHost) {
this.state = state;
this.reason = reason;
this.fallback = fallback;
Expand Down Expand Up @@ -224,7 +224,8 @@ synchronized void notifyState(ITransport transport, StateIndication state) {
if(states.get(state.state).terminal)
this.transport = null;
notifyState(state);
}
} else
Log.v(TAG, "notifyState: wrong transport");
}

synchronized void notifyState(StateIndication state) {
Expand Down Expand Up @@ -329,6 +330,18 @@ private void onChannelMessage(ProtocolMessage message) {
}

private synchronized void onConnected(ProtocolMessage message) {
/* Set the http host to try and ensure that realtime and rest use the
* same region:
* - if we're on the default realtime host, set http to the default
* rest host
* - otherwise (the realtime host has been overridden or has fallen
* back), set http to the same as realtime.
*/
if (pendingConnect.host == options.realtimeHost)
ably.http.setHost(options.restHost);
else
ably.http.setHost(pendingConnect.host);

/* if there was a (non-fatal) connection error
* that invalidates an existing connection id, then
* remove all channels attached to the previous id */
Expand Down Expand Up @@ -447,7 +460,7 @@ private void handleStateRequest() {
break;
case connecting:
if(!connectImpl(requestedState)) {
indicatedState = new StateIndication(ConnectionState.failed, new ErrorInfo("Connection failed; no host available", 404, 80000), false, requestedState.currentHost);
indicatedState = new StateIndication(ConnectionState.failed, new ErrorInfo("Connection failed; no host available", 404, 80000), null, requestedState.currentHost);
}

handled = true;
Expand Down Expand Up @@ -495,6 +508,11 @@ private void handleStateChange(StateIndication stateChange) {
/* we were connected, so retry immediately */
requestState(ConnectionState.connecting);
break;
case suspended:
/* Don't allow a second disconnected to make the state come out of suspended. */
Log.v(TAG, "handleStateChange: not moving out of suspended");
stateChange = null;
break;
default:
break;
}
Expand All @@ -518,18 +536,19 @@ private StateIndication checkSuspend(StateIndication stateChange) {
* - the suspend timer has expired, so we're going into suspended state.
*/

if(pendingConnect != null && stateChange.reason == null) {
if (!Hosts.isFallback(pendingConnect.host)) {
if (!checkConnectivity()) {
return new StateIndication(ConnectionState.failed, new ErrorInfo("connection failed", 80000), false, pendingConnect.host);
if(pendingConnect != null && (stateChange.reason == null || stateChange.reason.statusCode >= 500)) {
if (Hosts.isFallback(pendingConnect.host) || checkConnectivity()) {
/* we will try a fallback host */
String hostFallback = Hosts.isRealtimeFallbackSupported(options.realtimeHost)?(Hosts.getFallback(pendingConnect.host)):(null);
if (hostFallback != null) {
Log.v(TAG, "checkSuspend: fallback to " + hostFallback);
requestState(new StateIndication(ConnectionState.connecting, null, hostFallback, pendingConnect.host));
/* returning null ensures we stay in the connecting state */
return null;
}
}

/* we will try a fallback host */
requestState(new StateIndication(ConnectionState.connecting, null, true, pendingConnect.host));
/* returning null ensures we stay in the connecting state */
return null;
}
Log.v(TAG, "checkSuspend: not falling back");
boolean suspendMode = System.currentTimeMillis() > suspendTime;
ConnectionState expiredState = suspendMode ? ConnectionState.suspended : ConnectionState.disconnected;
return new StateIndication(expiredState, stateChange.reason);
Expand Down Expand Up @@ -595,13 +614,17 @@ public void run() {

@Override
public void onTransportAvailable(ITransport transport, TransportParams params) {
this.transport = transport;
}

@Override
public synchronized void onTransportUnavailable(ITransport transport, TransportParams params, ErrorInfo reason) {
if (this.transport != transport) {
/* This is from a transport that we have already abandoned. */
Log.v(TAG, "onTransportUnavailable: wrong transport");
return;
}
ably.auth.onAuthError(reason);
notifyState(new StateIndication(ConnectionState.disconnected, reason, false, transport.getHost()));
notifyState(new StateIndication(ConnectionState.disconnected, reason, null, transport.getHost()));
transport = null;
}

Expand All @@ -620,25 +643,14 @@ private boolean connectImpl(StateIndication request) {
* instance the transport.
* First, choose the transport. (Right now there's only one.)
* Second, choose the host. ConnectParams will use the default
* (or requested) host, unless fallback=true, in which case
* it will choose a fallback host at random */
* (or requested) host, unless fallback!=null, in which case
* checkSuspend has already chosen a fallback host at random */

if(request.fallback) {
String hostFallback = Hosts.isRealtimeFallbackSupported(options.realtimeHost)?(Hosts.getFallback(request.currentHost)):(null);

if (hostFallback == null) {
return false;
}

pendingConnect = new ConnectParams(options);
pendingConnect.host = hostFallback;
ably.http.setHost(hostFallback);
}
else {
pendingConnect = new ConnectParams(options);
pendingConnect.host = options.realtimeHost;
ably.http.setHost(options.restHost);
}
String host = request.fallback;
if (host == null)
host = options.realtimeHost;
pendingConnect = new ConnectParams(options);
pendingConnect.host = host;

/* enter the connecting state */
notifyState(request);
Expand All @@ -652,17 +664,24 @@ private boolean connectImpl(StateIndication request) {
Log.e(getClass().getName(), msg, e);
throw new RuntimeException(msg, e);
}
ITransport oldTransport;
synchronized(this) {
oldTransport = this.transport;
this.transport = transport;
}
if (oldTransport != null)
oldTransport.abort(REASON_TIMEDOUT);
transport.connect(this);

return true;
}

private void closeImpl(StateIndication request) {
boolean connectionExist = state.state == ConnectionState.connected;
/* enter the closing state */
notifyState(request);

/* send a close message on the transport, if any */
if(transport != null) {
/* send a close message on the transport, if connected */
if(connectionExist) {
try {
transport.send(new ProtocolMessage(Action.close));
} catch (AblyException e) {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/transport/Defaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

public class Defaults {
public static final int PROTOCOL_VERSION = 1;
static final List<String> HOST_FALLBACKS = Arrays.asList("A.ably-realtime.com", "B.ably-realtime.com", "C.ably-realtime.com", "D.ably-realtime.com", "E.ably-realtime.com");
public static final List<String> HOST_FALLBACKS = Arrays.asList("A.ably-realtime.com", "B.ably-realtime.com", "C.ably-realtime.com", "D.ably-realtime.com", "E.ably-realtime.com");
public static final String HOST_REST = "rest.ably.io";
public static final String HOST_REALTIME = "realtime.ably.io";
public static final int PORT = 80;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.ably.lib.transport;
package io.ably.lib.test.realtime;

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Connection;
import io.ably.lib.realtime.ConnectionState;
import io.ably.lib.test.common.Helpers;
import io.ably.lib.test.common.Setup;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.transport.Defaults;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ClientOptions;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -69,13 +71,13 @@ public void connectionmanager_fallback_none_customhost() throws AblyException {
AblyRealtime ably = new AblyRealtime(opts);
ConnectionManager connectionManager = ably.connection.connectionManager;

new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.failed);
new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.disconnected);

/* Verify that,
* - connectionManager is connected
* - connectionManager is connected to the host without any fallback
* - connectionManager is disconnected
* - connectionManager's last host did not have any fallback
*/
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.failed));
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.disconnected));
assertThat(connectionManager.getHost(), is(equalTo(opts.realtimeHost)));
}

Expand Down Expand Up @@ -109,13 +111,13 @@ protected boolean checkConnectivity() {

connectionManager.connect();

new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.failed);
new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.disconnected);

/* Verify that,
* - connectionManager is failed
* - connectionManager is didn't applied any fallback behavior
* - connectionManager is disconnected
* - connectionManager did not apply any fallback behavior
*/
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.failed));
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.disconnected));
assertThat(connectionManager.getHost(), is(equalTo(opts.realtimeHost)));
}

Expand Down Expand Up @@ -143,16 +145,14 @@ public void connectionmanager_fallback_applied() throws AblyException {
AblyRealtime ably = new AblyRealtime(opts);
ConnectionManager connectionManager = ably.connection.connectionManager;

new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.failed);
new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.disconnected);

/* Verify that,
* - connectionManager is connected
* - connectionManager is connected to a fallback host
* - Ably http client is also using the same fallback host
* - connectionManager is disconnected
* - connectionManager's last host was a fallback host
*/
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.failed));
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.disconnected));
assertThat(connectionManager.getHost(), is(not(equalTo(opts.realtimeHost))));
assertThat(ably.http.getHost(), is(equalTo(connectionManager.getHost())));
}

/**
Expand All @@ -174,28 +174,38 @@ public void connectionmanager_reconnect_default_endpoint() throws AblyException
AblyRealtime ably = new AblyRealtime(opts);
ConnectionManager connectionManager = ably.connection.connectionManager;

new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.failed);
System.out.println("waiting for disconnected");
new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.disconnected);
System.out.println("got disconnected");

/* Verify that,
* - connectionManager is connected
* - connectionManager is connected to a fallback host
* - Ably http client is also using the same fallback host
* - connectionManager is disconnected
* - connectionManager's last host was a fallback host
*/
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.failed));
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.disconnected));
assertThat(connectionManager.getHost(), is(not(equalTo(opts.realtimeHost))));
assertThat(ably.http.getHost(), is(equalTo(connectionManager.getHost())));

/* Reconnect */
ably.options.tlsPort = Defaults.TLS_PORT;
System.out.println("about to connect");
ably.connection.connect();

new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.connected);
/* Wait 1s and then get the name of the host it is trying to connect to. */
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
String host = connectionManager.getHost();

new Helpers.ConnectionManagerWaiter(connectionManager).waitFor(ConnectionState.failed);

/* Verify that,
* - connectionManager is connected
* - connectionManager is connected to the host without any fallback
* - connectionManager is failed, because we are using an application key
* that only works on sandbox;
* - connectionManager first tried to connect to the original host, not a fallback one.
*/
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.connected));
assertThat(connectionManager.getHost(), is(equalTo(opts.realtimeHost)));
System.out.println("waiting for failed");
assertThat(connectionManager.getConnectionState().state, is(ConnectionState.failed));
System.out.println("got failed");
assertThat(host, is(equalTo(opts.realtimeHost)));
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package io.ably.lib.transport;
package io.ably.lib.test.realtime;

import io.ably.lib.transport.Defaults;
import io.ably.lib.transport.Hosts;

import org.hamcrest.Matchers;
import org.junit.Test;
Expand Down Expand Up @@ -43,4 +46,4 @@ public void hosts_fallback_traverse_all() {

assertThat(host, is(equalTo(null)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public void connect_fail_suspended() {
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);

connectionWaiter.waitFor(ConnectionState.suspended);
/* Wait 1s to force bug where it changes to disconnected right after
* changing to suspended. Without this it fails only intermittently
* when that bug is present. */
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
assertEquals("Verify suspended state is reached", ConnectionState.suspended, ably.connection.state);
assertTrue("Verify multiple connect attempts", connectionWaiter.getCount(ConnectionState.connecting) > 1);
assertTrue("Verify multiple connect attempts", connectionWaiter.getCount(ConnectionState.disconnected) > 1);
Expand Down
Loading