Skip to content

Commit

Permalink
Merge fix #822 and fix #824 into 3.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
quanticc committed Jan 2, 2021
2 parents 7f5e62b + 53201ba commit 7c68b4e
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
18 changes: 14 additions & 4 deletions gateway/src/main/java/discord4j/gateway/DefaultGatewayClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,22 +210,28 @@ public Mono<Void> execute(String gatewayUrl) {
Sinks.Empty<Void> ping = Sinks.empty();

// Setup the sending logic from multiple sources into one merged Flux
Mono<Void> onConnected = state.asFlux().filter(s -> s == GatewayConnection.State.CONNECTED).next().then();
Flux<ByteBuf> heartbeatFlux = heartbeats.asFlux()
.flatMap(payload -> Flux.from(payloadWriter.write(payload)));
Flux<ByteBuf> identifyFlux = outbound.asFlux()
.filter(payload -> Opcode.IDENTIFY.equals(payload.getOp()))
.delayUntil(payload -> ping.asMono())
.delayUntil(__ -> ping.asMono())
.flatMap(payload -> Flux.from(payloadWriter.write(payload)))
.transform(identifyLimiter);
Flux<ByteBuf> resumeFlux = outbound.asFlux()
.filter(payload -> Opcode.RESUME.equals(payload.getOp()))
.flatMap(payload -> Flux.from(payloadWriter.write(payload)));
Flux<ByteBuf> payloadFlux = outbound.asFlux()
.filter(payload -> !Opcode.IDENTIFY.equals(payload.getOp()))
.filter(DefaultGatewayClient::isNotStartupPayload)
.delayUntil(__ -> onConnected)
.flatMap(payload -> Flux.from(payloadWriter.write(payload)))
.transform(buf -> Flux.merge(buf, sender.asFlux()))
.transform(new RateLimitOperator<>(outboundLimiterCapacity(), Duration.ofSeconds(60),
reactorResources.getTimerTaskScheduler(),
reactorResources.getPayloadSenderScheduler()));
Flux<ByteBuf> outFlux = Flux.merge(heartbeatFlux, identifyFlux, payloadFlux)
.doOnNext(buf -> logPayload(senderLog, context, buf));
Flux<ByteBuf> outFlux = Flux.merge(heartbeatFlux, identifyFlux, resumeFlux, payloadFlux)
.doOnNext(buf -> logPayload(senderLog, context, buf))
.doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease);

sessionHandler = new GatewayWebsocketHandler(receiver, outFlux, context);

Expand Down Expand Up @@ -350,6 +356,10 @@ private void logPayload(Logger logger, ContextView context, ByteBuf buf) {
.replaceAll("(\"token\": ?\")([A-Za-z0-9._-]*)(\")", "$1hunter2$3")));
}

private static boolean isNotStartupPayload(GatewayPayload<?> payload) {
return !Opcode.IDENTIFY.equals(payload.getOp()) && !Opcode.RESUME.equals(payload.getOp());
}

private static boolean isReadyOrResumed(Dispatch d) {
return Ready.class.isAssignableFrom(d.getClass()) || Resumed.class.isAssignableFrom(d.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import discord4j.gateway.json.GatewayPayload;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

Expand All @@ -36,9 +36,7 @@ public JacksonPayloadWriter(ObjectMapper mapper) {
public Mono<ByteBuf> write(GatewayPayload<?> payload) {
return Mono.create(sink -> sink.onRequest(__ -> {
try {
sink.success(ByteBufAllocator.DEFAULT.buffer()
.touch("discord4j.gateway.payload")
.writeBytes(mapper.writeValueAsBytes(payload)));
sink.success(Unpooled.wrappedBuffer(mapper.writeValueAsBytes(payload)));
} catch (JsonProcessingException e) {
sink.error(Exceptions.propagate(e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,9 @@ public Mono<Tuple2<DisconnectBehavior, CloseStatus>> handle(WebsocketInbound in,
.map(status -> new CloseStatus(status.code(), status.reasonText()))
.doOnNext(status -> {
log.debug(format(context, "Received close status: {}"), status);
if (status.getCode() == 4014) {
close(DisconnectBehavior.stop(new VoiceGatewayException(context, "Disconnected")));
} else {
close(DisconnectBehavior.retryAbruptly(
new VoiceGatewayException(context, "Inbound close status")));
}
// TODO: discord uses code 4014 for both resumable and non-resumable disconnects
// we optimistically issue a retry. might encounter a 4006 if invalid
close(DisconnectBehavior.retryAbruptly(new VoiceGatewayException(context, "Inbound close status")));
});

Mono<Void> outboundEvents = out.sendObject(Flux.merge(outboundClose, outbound.map(TextWebSocketFrame::new)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class VoiceGatewayRetrySpec extends Retry {
public static final List<Integer> NON_RETRYABLE_STATUS_CODES = Arrays.asList(
4004, // Authentication failed
4006, // Session no longer valid
4014, // Disconnected
4016 // Unknown encryption mode
);

Expand Down

0 comments on commit 7c68b4e

Please sign in to comment.