Skip to content

Commit

Permalink
Allow enabling unpooled inbound allocation method
Browse files Browse the repository at this point in the history
  • Loading branch information
quanticc committed Oct 23, 2020
1 parent 8417bee commit eae077b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class DefaultGatewayClient implements GatewayClient {
private final PayloadTransformer identifyLimiter;
private final ResettableInterval heartbeat;
private final int maxMissedHeartbeatAck;
private final boolean unpooled;

// reactive pipelines
private final EmitterProcessor<ByteBuf> receiver = EmitterProcessor.create(false);
Expand Down Expand Up @@ -133,6 +134,7 @@ public DefaultGatewayClient(GatewayOptions options) {
this.observer = options.getInitialObserver();
this.identifyLimiter = Objects.requireNonNull(options.getIdentifyLimiter());
this.maxMissedHeartbeatAck = Math.max(0, options.getMaxMissedHeartbeatAck());
this.unpooled = options.isUnpooled();
// TODO: consider exposing OverflowStrategy to GatewayOptions
this.receiverSink = receiver.sink(FluxSink.OverflowStrategy.BUFFER);
this.senderSink = sender.sink(FluxSink.OverflowStrategy.ERROR);
Expand Down Expand Up @@ -203,7 +205,7 @@ public Mono<Void> execute(String gatewayUrl) {
.then();

// Subscribe the receiver to process and transform the inbound payloads into Dispatch events
Mono<Void> receiverFuture = receiver.map(ByteBuf::retain)
Mono<Void> receiverFuture = receiver.map(buf -> unpooled ? buf : buf.retain())
.doOnNext(buf -> logPayload(receiverLog, context, buf))
.flatMap(payloadReader::read)
.doOnDiscard(ByteBuf.class, DefaultGatewayClient::safeRelease)
Expand Down
14 changes: 14 additions & 0 deletions gateway/src/main/java/discord4j/gateway/GatewayOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,20 @@ public class GatewayOptions {
private final GatewayObserver initialObserver;
private final PayloadTransformer identifyLimiter;
private final int maxMissedHeartbeatAck;
private final boolean unpooled;

public GatewayOptions(String token, GatewayReactorResources reactorResources, PayloadReader payloadReader,
PayloadWriter payloadWriter, ReconnectOptions reconnectOptions,
IdentifyOptions identifyOptions, GatewayObserver initialObserver,
PayloadTransformer identifyLimiter, int maxMissedHeartbeatAck) {
this(token, reactorResources, payloadReader, payloadWriter, reconnectOptions, identifyOptions, initialObserver,
identifyLimiter, maxMissedHeartbeatAck, false);
}

public GatewayOptions(String token, GatewayReactorResources reactorResources, PayloadReader payloadReader,
PayloadWriter payloadWriter, ReconnectOptions reconnectOptions,
IdentifyOptions identifyOptions, GatewayObserver initialObserver,
PayloadTransformer identifyLimiter, int maxMissedHeartbeatAck, boolean unpooled) {
this.token = Objects.requireNonNull(token, "token");
this.reactorResources = Objects.requireNonNull(reactorResources, "reactorResources");
this.payloadReader = Objects.requireNonNull(payloadReader, "payloadReader");
Expand All @@ -52,6 +61,7 @@ public GatewayOptions(String token, GatewayReactorResources reactorResources, Pa
this.initialObserver = Objects.requireNonNull(initialObserver, "initialObserver");
this.identifyLimiter = Objects.requireNonNull(identifyLimiter, "identifyLimiter");
this.maxMissedHeartbeatAck = maxMissedHeartbeatAck;
this.unpooled = unpooled;
}

public String getToken() {
Expand Down Expand Up @@ -89,4 +99,8 @@ public PayloadTransformer getIdentifyLimiter() {
public int getMaxMissedHeartbeatAck() {
return maxMissedHeartbeatAck;
}

public boolean isUnpooled() {
return unpooled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class GatewayWebsocketHandler {
private final Flux<ByteBuf> outbound;
private final MonoProcessor<DisconnectBehavior> sessionClose;
private final Context context;
private final boolean unpooled;

/**
* Create a new handler with the given data pipelines.
Expand All @@ -68,10 +69,16 @@ public class GatewayWebsocketHandler {
* @param context the Reactor {@link Context} that owns this handler, to enrich logging
*/
public GatewayWebsocketHandler(FluxSink<ByteBuf> inbound, Flux<ByteBuf> outbound, Context context) {
this(inbound, outbound, context, false);
}

public GatewayWebsocketHandler(FluxSink<ByteBuf> inbound, Flux<ByteBuf> outbound, Context context,
boolean unpooled) {
this.inbound = inbound;
this.outbound = outbound;
this.sessionClose = MonoProcessor.create();
this.context = context;
this.unpooled = unpooled;
}

/**
Expand Down Expand Up @@ -121,7 +128,7 @@ public Mono<Tuple2<DisconnectBehavior, CloseStatus>> handle(WebsocketInbound in,
.map(WebSocketFrame::content)
.transformDeferred(decompressor::completeMessages)
.doOnNext(inbound::next)
.doOnNext(GatewayWebsocketHandler::safeRelease)
.doOnNext(this::safeRelease)
.then();

return Mono.zip(outboundEvents, inboundEvents)
Expand Down Expand Up @@ -160,8 +167,8 @@ public void error(Throwable error) {
close(DisconnectBehavior.retryAbruptly(error));
}

private static void safeRelease(ByteBuf buf) {
if (buf.refCnt() > 0) {
private void safeRelease(ByteBuf buf) {
if (!unpooled && buf.refCnt() > 0) {
try {
buf.release();
} catch (IllegalReferenceCountException e) {
Expand Down
14 changes: 9 additions & 5 deletions gateway/src/main/java/discord4j/gateway/ZlibDecompressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
*/
package discord4j.gateway;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.*;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;

Expand All @@ -40,9 +37,15 @@ public class ZlibDecompressor {

private final ByteBufAllocator allocator;
private final Inflater context = new Inflater();
private final boolean unpooled;

public ZlibDecompressor(ByteBufAllocator allocator) {
this(allocator, false);
}

public ZlibDecompressor(ByteBufAllocator allocator, boolean unpooled) {
this.allocator = allocator;
this.unpooled = unpooled;
}

public Flux<ByteBuf> completeMessages(Flux<ByteBuf> payloads) {
Expand All @@ -62,7 +65,8 @@ public Flux<ByteBuf> completeMessages(Flux<ByteBuf> payloads) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (InflaterOutputStream inflater = new InflaterOutputStream(out, context)) {
inflater.write(ByteBufUtil.getBytes(buf, buf.readerIndex(), buf.readableBytes(), false));
return allocator.buffer().writeBytes(out.toByteArray()).asReadOnly();
ByteBuf outBuffer = unpooled ? Unpooled.buffer() : allocator.buffer();
return outBuffer.writeBytes(out.toByteArray()).asReadOnly();
} catch (IOException e) {
throw Exceptions.propagate(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,24 @@ public JacksonPayloadReader(ObjectMapper mapper, boolean lenient) {
@Override
public Mono<GatewayPayload<?>> read(ByteBuf buf) {
return Mono.create(sink -> {
sink.onRequest(__ -> {
try {
GatewayPayload<?> value = mapper.readValue(
ByteBufUtil.getBytes(buf, buf.readerIndex(), buf.readableBytes(), false),
new TypeReference<GatewayPayload<?>>() {});
sink.success(value);
} catch (IOException | IllegalArgumentException e) {
if (lenient) {
// if eof input - just ignore
if (buf.readableBytes() > 0) {
log.warn("Error while decoding JSON ({}): {}", e.toString(),
new String(ByteBufUtil.getBytes(buf), StandardCharsets.UTF_8));
}
sink.success();
} else {
sink.error(Exceptions.propagate(e));
sink.onDispose(() -> ReferenceCountUtil.release(buf));
try {
GatewayPayload<?> value = mapper.readValue(
ByteBufUtil.getBytes(buf, buf.readerIndex(), buf.readableBytes(), false),
new TypeReference<GatewayPayload<?>>() {});
sink.success(value);
} catch (IOException | IllegalArgumentException e) {
if (lenient) {
// if eof input - just ignore
if (buf.readableBytes() > 0) {
log.warn("Error while decoding JSON ({}): {}", e.toString(),
new String(ByteBufUtil.getBytes(buf), StandardCharsets.UTF_8));
}
sink.success();
} else {
sink.error(Exceptions.propagate(e));
}
});
sink.onDispose(() -> ReferenceCountUtil.release(buf));
}
});
}
}

0 comments on commit eae077b

Please sign in to comment.