Skip to content

Commit

Permalink
Add configuration for max missed heartbeat ACKs
Browse files Browse the repository at this point in the history
  • Loading branch information
quanticc committed Apr 13, 2020
1 parent 365c9d6 commit 4fd54c7
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 21 deletions.
17 changes: 16 additions & 1 deletion core/src/main/java/discord4j/core/shard/GatewayBootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public class GatewayBootstrap<O extends GatewayOptions> {
private VoiceConnectionFactory voiceConnectionFactory = defaultVoiceConnectionFactory();
private EntityRetrievalStrategy entityRetrievalStrategy = null;
private DispatchEventMapper dispatchEventMapper = null;
private int maxMissedHeartbeatAck = 1;

/**
* Create a default {@link GatewayBootstrap} based off the given {@link DiscordClient} that provides an instance
Expand Down Expand Up @@ -168,6 +169,7 @@ public static GatewayBootstrap<GatewayOptions> create(DiscordClient client) {
this.voiceConnectionFactory = source.voiceConnectionFactory;
this.entityRetrievalStrategy = source.entityRetrievalStrategy;
this.dispatchEventMapper = source.dispatchEventMapper;
this.maxMissedHeartbeatAck = source.maxMissedHeartbeatAck;
}

/**
Expand Down Expand Up @@ -546,6 +548,19 @@ public GatewayBootstrap<O> setDispatchEventMapper(DispatchEventMapper dispatchEv
return this;
}

/**
* Set the maximum number of missed heartbeat acknowledge payloads each connection to Gateway will allow before
* triggering an automatic reconnect. A missed acknowledge is counted if a client does not receive a heartbeat
* ACK between its attempts at sending heartbeats.
*
* @param maxMissedHeartbeatAck a non-negative number representing the maximum number of allowed
* @return this builder
*/
public GatewayBootstrap<O> setMaxMissedHeartbeatAck(int maxMissedHeartbeatAck) {
this.maxMissedHeartbeatAck = Math.max(0, maxMissedHeartbeatAck);
return this;
}

/**
* Connect to the Discord Gateway upon subscription to acquire a {@link GatewayDiscordClient} instance and use it
* in a declarative way, releasing the object once the derived usage {@link Function} completes, and the underlying
Expand Down Expand Up @@ -896,7 +911,7 @@ private DispatchEventMapper initDispatchEventMapper() {
private O buildOptions(GatewayDiscordClient gateway, IdentifyOptions identify, PayloadTransformer identifyLimiter) {
GatewayOptions options = new GatewayOptions(client.getCoreResources().getToken(),
gateway.getGatewayResources().getGatewayReactorResources(), initPayloadReader(), initPayloadWriter(),
reconnectOptions, identify, gatewayObserver, identifyLimiter);
reconnectOptions, identify, gatewayObserver, identifyLimiter, maxMissedHeartbeatAck);
return this.optionsModifier.apply(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ static class CustomOptions extends GatewayOptions {
public CustomOptions(GatewayOptions parent, String foo) {
super(parent.getToken(), parent.getReactorResources(), parent.getPayloadReader(),
parent.getPayloadWriter(), parent.getReconnectOptions(), parent.getIdentifyOptions(),
parent.getInitialObserver(), parent.getIdentifyLimiter());
parent.getInitialObserver(), parent.getIdentifyLimiter(), parent.getMaxMissedHeartbeatAck());
this.foo = foo;
}

Expand Down
38 changes: 22 additions & 16 deletions gateway/src/main/java/discord4j/gateway/DefaultGatewayClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class DefaultGatewayClient implements GatewayClient {
private final GatewayObserver observer;
private final PayloadTransformer identifyLimiter;
private final ResettableInterval heartbeat;
private final int maxMissedHeartbeatAck;

// reactive pipelines
private final EmitterProcessor<ByteBuf> receiver = EmitterProcessor.create(false);
Expand All @@ -108,6 +109,7 @@ public class DefaultGatewayClient implements GatewayClient {
private final AtomicReference<String> sessionId = new AtomicReference<>("");
private final AtomicLong lastSent = new AtomicLong(0);
private final AtomicLong lastAck = new AtomicLong(0);
private final AtomicInteger missedAck = new AtomicInteger(0);
private final AtomicLong responseTime = new AtomicLong(0);
private volatile MonoProcessor<Void> disconnectNotifier;
private volatile GatewayWebsocketHandler sessionHandler;
Expand All @@ -128,6 +130,7 @@ public DefaultGatewayClient(GatewayOptions options) {
this.identifyOptions = Objects.requireNonNull(options.getIdentifyOptions());
this.observer = options.getInitialObserver();
this.identifyLimiter = Objects.requireNonNull(options.getIdentifyLimiter());
this.maxMissedHeartbeatAck = Math.max(0, options.getMaxMissedHeartbeatAck());
// TODO: consider exposing OverflowStrategy to GatewayOptions
this.receiverSink = receiver.sink(FluxSink.OverflowStrategy.BUFFER);
this.senderSink = sender.sink(FluxSink.OverflowStrategy.ERROR);
Expand All @@ -144,6 +147,7 @@ public Mono<Void> execute(String gatewayUrl) {
disconnectNotifier = MonoProcessor.create();
lastAck.set(0);
lastSent.set(0);
missedAck.set(0);

MonoProcessor<Void> ping = MonoProcessor.create();

Expand Down Expand Up @@ -227,17 +231,18 @@ public Mono<Void> execute(String gatewayUrl) {
lastAck.compareAndSet(0, now);
long delay = now - lastAck.get();
if (lastSent.get() - lastAck.get() > 0) {
log.warn(format(context, "Missing heartbeat ACK for {} (tick: {}, seq: {})"),
Duration.ofNanos(delay), t, sequence.get());
sessionHandler.error(new GatewayException(context,
"Reconnecting due to zombie or failed connection"));
return Mono.empty();
} else {
log.debug(format(context, "Sending heartbeat {} after last ACK"),
Duration.ofNanos(delay));
lastSent.set(now);
return Mono.just(GatewayPayload.heartbeat(ImmutableHeartbeat.of(sequence.get())));
if (missedAck.incrementAndGet() > maxMissedHeartbeatAck) {
log.warn(format(context, "Missing heartbeat ACK for {} (tick: {}, seq: {})"),
Duration.ofNanos(delay), t, sequence.get());
sessionHandler.error(new GatewayException(context,
"Reconnecting due to zombie or failed connection"));
return Mono.empty();
}
}
log.debug(format(context, "Sending heartbeat {} after last ACK"),
Duration.ofNanos(delay));
lastSent.set(now);
return Mono.just(GatewayPayload.heartbeat(ImmutableHeartbeat.of(sequence.get())));
})
.doOnNext(heartbeatSink::next)
.then();
Expand Down Expand Up @@ -320,12 +325,12 @@ private Retry<ReconnectContext> retryFactory() {
}

private static final List<Integer> nonRetryableStatusCodes = Arrays.asList(
4004, // Authentication failed
4010, // Invalid shard
4011, // Sharding required
4012, // Invalid API version
4013, // Invalid intent(s)
4014 // Disallowed intent(s)
4004, // Authentication failed
4010, // Invalid shard
4011, // Sharding required
4012, // Invalid API version
4013, // Invalid intent(s)
4014 // Disallowed intent(s)
);

private boolean isRetryable(Throwable t) {
Expand Down Expand Up @@ -469,6 +474,7 @@ public Duration getResponseTime() {
/////////////////////////////////

void ackHeartbeat() {
missedAck.set(0);
responseTime.set(lastAck.updateAndGet(x -> System.nanoTime()) - lastSent.get());
}

Expand Down
8 changes: 7 additions & 1 deletion gateway/src/main/java/discord4j/gateway/GatewayOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public class GatewayOptions {
private final IdentifyOptions identifyOptions;
private final GatewayObserver initialObserver;
private final PayloadTransformer identifyLimiter;
private final int maxMissedHeartbeatAck;

public GatewayOptions(String token, GatewayReactorResources reactorResources, PayloadReader payloadReader,
PayloadWriter payloadWriter, ReconnectOptions reconnectOptions,
IdentifyOptions identifyOptions, GatewayObserver initialObserver,
PayloadTransformer identifyLimiter) {
PayloadTransformer identifyLimiter, int maxMissedHeartbeatAck) {
this.token = Objects.requireNonNull(token, "token");
this.reactorResources = Objects.requireNonNull(reactorResources, "reactorResources");
this.payloadReader = Objects.requireNonNull(payloadReader, "payloadReader");
Expand All @@ -50,6 +51,7 @@ public GatewayOptions(String token, GatewayReactorResources reactorResources, Pa
this.identifyOptions = Objects.requireNonNull(identifyOptions, "identifyOptions");
this.initialObserver = Objects.requireNonNull(initialObserver, "initialObserver");
this.identifyLimiter = Objects.requireNonNull(identifyLimiter, "identifyLimiter");
this.maxMissedHeartbeatAck = maxMissedHeartbeatAck;
}

public String getToken() {
Expand Down Expand Up @@ -83,4 +85,8 @@ public GatewayObserver getInitialObserver() {
public PayloadTransformer getIdentifyLimiter() {
return identifyLimiter;
}

public int getMaxMissedHeartbeatAck() {
return maxMissedHeartbeatAck;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void test() {
reconnectOptions,
new IdentifyOptions(new ShardInfo(0, 1), null, Possible.absent(), true),
GatewayObserver.NOOP_LISTENER,
new RateLimitTransformer(1, Duration.ofSeconds(6))
new RateLimitTransformer(1, Duration.ofSeconds(6)),
1
);
GatewayClient gatewayClient = new DefaultGatewayClient(gatewayOptions);
gatewayClient.dispatch().subscribe(dispatch -> {
Expand Down Expand Up @@ -131,7 +132,8 @@ public void testShards() throws InterruptedException {
exit.countDown();
}
},
transformer
transformer,
1
);
GatewayClient shard = new DefaultGatewayClient(gatewayOptions);
latches.add(next);
Expand Down

0 comments on commit 4fd54c7

Please sign in to comment.