Skip to content

Commit

Permalink
[Java] Add pings and server timeout (#3027)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Oct 10, 2018
1 parent be4fe6c commit 2ee3517
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
Expand Down Expand Up @@ -39,9 +43,36 @@ public class HubConnection {
private ConnectionState connectionState = null;
private HttpClient httpClient;
private String stopError;
private Timer pingTimer = null;
private AtomicLong nextServerTimeout = new AtomicLong();
private AtomicLong nextPingActivation = new AtomicLong();
private Duration keepAliveInterval = Duration.ofSeconds(15);
private Duration serverTimeout = Duration.ofSeconds(30);
private Duration tickRate = Duration.ofSeconds(1);
private CompletableFuture<Void> handshakeResponseFuture;
private Duration handshakeResponseTimeout = Duration.ofSeconds(15);

public void setServerTimeout(Duration serverTimeout) {
this.serverTimeout = serverTimeout;
}

public Duration getServerTimeout() {
return this.serverTimeout;
}

public void setKeepAliveInterval(Duration keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
}

public Duration getKeepAliveInterval() {
return this.keepAliveInterval;
}

// For testing purposes
void setTickRate(Duration tickRate) {
this.tickRate = tickRate;
}

HubConnection(String url, Transport transport, boolean skipNegotiate, Logger logger, HttpClient httpClient, Single<String> accessTokenProvider, Duration handshakeResponseTimeout) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("A valid url is required.");
Expand Down Expand Up @@ -79,6 +110,7 @@ public class HubConnection {
this.skipNegotiate = skipNegotiate;

this.callback = (payload) -> {
resetServerTimeout();
if (!handshakeReceived) {
int handshakeLength = payload.indexOf(RECORD_SEPARATOR) + 1;
String handshakeResponseString = payload.substring(0, handshakeLength - 1);
Expand Down Expand Up @@ -245,6 +277,29 @@ public Completable start() {
hubConnectionState = HubConnectionState.CONNECTED;
connectionState = new ConnectionState(this);
logger.log(LogLevel.Information, "HubConnection started.");

resetServerTimeout();
this.pingTimer = new Timer();
this.pingTimer.schedule(new TimerTask() {
@Override
public void run() {
try {
if (System.currentTimeMillis() > nextServerTimeout.get()) {
stop("Server timeout elapsed without receiving a message from the server.");
return;
}

if (System.currentTimeMillis() > nextPingActivation.get()) {
sendHubMessage(PingMessage.getInstance());
}
} catch (Exception e) {
logger.log(LogLevel.Warning, String.format("Error sending ping: %s", e.getMessage()));
// The connection is probably in a bad or closed state now, cleanup the timer so
// it stops triggering
pingTimer.cancel();
}
}
}, new Date(0), tickRate.toMillis());
} finally {
hubConnectionStateLock.unlock();
}
Expand Down Expand Up @@ -399,11 +454,21 @@ public <T> Single<T> invoke(Class<T> returnType, String method, Object... args)
private void sendHubMessage(HubMessage message) throws Exception {
String serializedMessage = protocol.writeMessage(message);
if (message.getMessageType() == HubMessageType.INVOCATION) {
logger.log(LogLevel.Debug, "Sending %d message '%s'.", message.getMessageType().value, ((InvocationMessage)message).getInvocationId());
logger.log(LogLevel.Debug, "Sending %s message '%s'.", message.getMessageType().name(), ((InvocationMessage)message).getInvocationId());
} else {
logger.log(LogLevel.Debug, "Sending %d message.", message.getMessageType().value);
logger.log(LogLevel.Debug, "Sending %s message.", message.getMessageType().name());
}
transport.send(serializedMessage);

resetKeepAlive();
}

private void resetServerTimeout() {
this.nextServerTimeout.set(System.currentTimeMillis() + serverTimeout.toMillis());
}

private void resetKeepAlive() {
this.nextPingActivation.set(System.currentTimeMillis() + keepAliveInterval.toMillis());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

class PingMessage extends HubMessage
{
int type = HubMessageType.PING.value;

private static PingMessage instance = new PingMessage();

private PingMessage()
Expand Down
Loading

0 comments on commit 2ee3517

Please sign in to comment.