Skip to content
Permalink
Browse files

Use `PooledByteBufAllocator` for netty internally (#942)

Motivation:

We configure Netty to use ST unpooled `BufferAllocator`.
However, there are a lot of cases when netty needs to allocate
buffers internally and these buffers will never be exposed to the
users and will be properly released by netty. For example, when
netty writes data to the transport or when it cumulates for
internal state. Allocation of unpooled memory (especially direct
memory) significantly hurts performance and increases GC
pressure.

Modifications:

- Configure netty to use it's `PooledByteBufAllocator.DEFAULT`
allocator;
- Use allocator from the netty's ctx in `HttpObjectEncoder`
instead of hardcoded `POOLED_ALLOCATOR`;
- Remove `RecvByteBufAllocator` with pooled allocator that
is not required anymore;
- Do not use `CopyByteBufHandlerChannelInitializer` for h2;
- Copy h2 `DATA_FRAME`s from pooled to unpooled memory
before propagating payload body to the user;
- Do not override `ByteBufAllocator` for `SslHandler` anymore;
- `BufferAllocators.DEFAULT_ALLOCATOR` should prefer heap
memory by default;

Result:

1. Improved performance of gRPC:
- Aggregated API: RPS increased by 30-37%, p99 latency reduced
by x1.8-5.7, depending on payload body size;
- Streaming API: RPS increased by 23-29%, p99 latency reduced
by x2.1-4.7, depending on payload body size;
2. Improved performance of HTTP:
- Aggregated API: RPS increased by 7-13%, depending on payload
body size, no p99 latency change;
- Streaming API: RPS increased by 5-9%, depending on payload
body size, no p99 latency change;
3. Less code to maintain.
  • Loading branch information
idelpivnitskiy committed Feb 22, 2020
1 parent 1653fc7 commit 82e04313758da7dbc54422bbce6607c390976deb
Showing with 393 additions and 651 deletions.
  1. +3 −1 servicetalk-benchmarks/src/jmh/java/io/servicetalk/http/netty/HttpResponseDecoderBenchmark.java
  2. +1 −1 servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/BufferAllocators.java
  3. +3 −4 servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/BufferUtils.java
  4. +3 −3 servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/ServiceTalkBufferAllocator.java
  5. +36 −11 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractH2DuplexHandler.java
  6. +2 −4 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ToStH1ClientDuplexHandler.java
  7. +2 −4 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ToStH1ServerDuplexHandler.java
  8. +17 −12 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClientChannelInitializer.java
  9. +6 −5 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpObjectDecoder.java
  10. +4 −5 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpObjectEncoder.java
  11. +11 −7 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpRequestDecoder.java
  12. +10 −7 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpResponseDecoder.java
  13. +12 −6 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java
  14. +5 −2 servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java
  15. +2 −1 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestDecoderTest.java
  16. +3 −1 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java
  17. +3 −2 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpResponseDecoderTest.java
  18. +2 −8 ...p-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpClientChannelInitializer.java
  19. +10 −13 servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpConnector.java
  20. +16 −8 servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java
  21. +2 −8 ...p-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerChannelInitializer.java
  22. +15 −6 ...rt-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ByteToMessageDecoder.java
  23. +101 −0 ...l/src/main/java/io/servicetalk/transport/netty/internal/CopyByteBufHandlerChannelInitializer.java
  24. +0 −152 ...src/main/java/io/servicetalk/transport/netty/internal/PooledRecvByteBufAllocatorInitializers.java
  25. +1 −1 ...y-internal/src/main/java/io/servicetalk/transport/netty/internal/SslClientChannelInitializer.java
  26. +1 −1 ...y-internal/src/main/java/io/servicetalk/transport/netty/internal/SslServerChannelInitializer.java
  27. +6 −378 ...port-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WrappingSslContext.java
  28. +116 −0 ...-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/CopyByteBufHandlerTest.java
@@ -38,7 +38,9 @@
import java.util.ArrayDeque;

import static io.netty.handler.codec.http.HttpConstants.SP;
import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static io.servicetalk.buffer.netty.BufferAllocators.PREFER_DIRECT_ALLOCATOR;
import static io.servicetalk.buffer.netty.BufferUtils.getByteBufAllocator;
import static io.servicetalk.buffer.netty.BufferUtils.toByteBuf;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.netty.HttpObjectEncoder.CRLF_SHORT;
@@ -84,7 +86,7 @@ public void setup() {
responseByteBuf = toByteBuf(responseBuffer.slice());

channel = new EmbeddedChannel(new HttpResponseDecoder(new ArrayDeque<>(),
DefaultHttpHeadersFactory.INSTANCE, 8192, 8192));
getByteBufAllocator(DEFAULT_ALLOCATOR), DefaultHttpHeadersFactory.INSTANCE, 8192, 8192));
}

@Benchmark
@@ -25,7 +25,7 @@
/**
* Default {@link BufferAllocator} whose {@link Buffer}s are typically backed by Netty buffers.
*/
public static final BufferAllocator DEFAULT_ALLOCATOR = BufferUtils.PREFER_DIRECT_ALLOCATOR_WITHOUT_ZEROING;
public static final BufferAllocator DEFAULT_ALLOCATOR = BufferUtils.PREFER_HEAP_ALLOCATOR;

/**
* Default {@link BufferAllocator} whose {@link Buffer}s are typically backed by Netty buffers and prefers direct
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,9 +34,8 @@
*/
public final class BufferUtils {

static final BufferAllocator PREFER_HEAP_ALLOCATOR = new ServiceTalkBufferAllocator(false, false);
static final BufferAllocator PREFER_DIRECT_ALLOCATOR = new ServiceTalkBufferAllocator(true, false);
static final BufferAllocator PREFER_DIRECT_ALLOCATOR_WITHOUT_ZEROING = new ServiceTalkBufferAllocator(true, true);
static final BufferAllocator PREFER_HEAP_ALLOCATOR = new ServiceTalkBufferAllocator(false);
static final BufferAllocator PREFER_DIRECT_ALLOCATOR = new ServiceTalkBufferAllocator(true);

private BufferUtils() {
// no instances
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,9 +44,9 @@

private final boolean noZeroing;

ServiceTalkBufferAllocator(boolean preferDirect, boolean tryNoZeroing) {
ServiceTalkBufferAllocator(boolean preferDirect) {
super(preferDirect);
this.noZeroing = tryNoZeroing && useDirectBufferWithoutZeroing();
this.noZeroing = useDirectBufferWithoutZeroing();
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package io.servicetalk.http.netty;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.netty.H2ToStH1Utils.H2StreamRefusedException;
@@ -31,18 +32,24 @@
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2ResetFrame;
import io.netty.util.ReferenceCountUtil;

import javax.annotation.Nullable;

import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
import static io.servicetalk.buffer.netty.BufferUtils.newBufferFrom;
import static io.servicetalk.buffer.netty.BufferUtils.toByteBuf;
import static io.servicetalk.buffer.netty.BufferUtils.toByteBufNoThrow;
import static io.servicetalk.http.netty.H2ToStH1Utils.h1HeadersToH2Headers;

abstract class AbstractH2DuplexHandler extends ChannelDuplexHandler {

final BufferAllocator allocator;
final HttpHeadersFactory headersFactory;
final CloseHandler closeHandler;

AbstractH2DuplexHandler(HttpHeadersFactory headersFactory, final CloseHandler closeHandler) {
AbstractH2DuplexHandler(BufferAllocator allocator, HttpHeadersFactory headersFactory, CloseHandler closeHandler) {
this.allocator = allocator;
this.headersFactory = headersFactory;
this.closeHandler = closeHandler;
}
@@ -85,14 +92,32 @@ final void writeTrailers(ChannelHandlerContext ctx, Object msg, ChannelPromise p
}

final void readDataFrame(ChannelHandlerContext ctx, Object msg) {
Http2DataFrame dataFrame = (Http2DataFrame) msg;
if (dataFrame.content().isReadable()) {
ctx.fireChannelRead(newBufferFrom(dataFrame.content()));
} else {
dataFrame.release();
}
if (dataFrame.isEndStream()) {
ctx.fireChannelRead(headersFactory.newEmptyTrailers());
Object toRelease = msg;
try {
Http2DataFrame dataFrame = (Http2DataFrame) msg;
if (dataFrame.content().isReadable()) {
// Copy to unpooled memory before passing to the user
Buffer data = allocator.newBuffer(dataFrame.content().readableBytes());
ByteBuf nettyData = toByteBuf(data);
nettyData.writeBytes(dataFrame.content());
toRelease = release(dataFrame);
ctx.fireChannelRead(data);
} else {
toRelease = release(dataFrame);
}
if (dataFrame.isEndStream()) {
ctx.fireChannelRead(headersFactory.newEmptyTrailers());
}
} finally {
if (toRelease != null) {
ReferenceCountUtil.release(toRelease);
}
}
}

@Nullable
private static Http2DataFrame release(Http2DataFrame dataFrame) {
dataFrame.release();
return null;
}
}
@@ -1,5 +1,5 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -53,15 +53,13 @@
final class H2ToStH1ClientDuplexHandler extends AbstractH2DuplexHandler {
private boolean readHeaders;
private final HttpScheme scheme;
private final BufferAllocator allocator;
@Nullable
private HttpRequestMethod method;

H2ToStH1ClientDuplexHandler(boolean sslEnabled, BufferAllocator allocator, HttpHeadersFactory headersFactory,
CloseHandler closeHandler) {
super(headersFactory, closeHandler);
super(allocator, headersFactory, closeHandler);
this.scheme = sslEnabled ? HttpScheme.HTTPS : HttpScheme.HTTP;
this.allocator = allocator;
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,12 +51,10 @@

final class H2ToStH1ServerDuplexHandler extends AbstractH2DuplexHandler {
private boolean readHeaders;
private final BufferAllocator allocator;

H2ToStH1ServerDuplexHandler(BufferAllocator allocator, HttpHeadersFactory headersFactory,
CloseHandler closeHandler) {
super(headersFactory, closeHandler);
this.allocator = allocator;
super(allocator, headersFactory, closeHandler);
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,9 @@
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;

@@ -29,26 +31,29 @@

final class HttpClientChannelInitializer implements ChannelInitializer {

private final H1ProtocolConfig config;
private final CloseHandler closeHandler;
private final ChannelInitializer delegate;

/**
* Creates a new instance.
* @param config {@link H1ProtocolConfig}
* @param closeHandler observes protocol state events
*/
HttpClientChannelInitializer(H1ProtocolConfig config, CloseHandler closeHandler) {
this.config = config;
this.closeHandler = closeHandler;
HttpClientChannelInitializer(final ByteBufAllocator alloc, final H1ProtocolConfig config,
final CloseHandler closeHandler) {
// H1 slices passed memory chunks into headers and payload body without copying and will emit them to the
// user-code. Therefore, ByteBufs must be copied to unpooled memory before HttpObjectDecoder.
this.delegate = new CopyByteBufHandlerChannelInitializer(alloc).andThen(channel -> {
final Queue<HttpRequestMethod> methodQueue = new ArrayDeque<>(min(8, config.maxPipelinedRequests()));
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpResponseDecoder(methodQueue, alloc, config.headersFactory(),
config.maxStartLineLength(), config.maxHeaderFieldLength(), closeHandler));
pipeline.addLast(new HttpRequestEncoder(methodQueue,
config.headersEncodedSizeEstimate(), config.trailersEncodedSizeEstimate(), closeHandler));
});
}

@Override
public void init(final Channel channel) {
Queue<HttpRequestMethod> methodQueue = new ArrayDeque<>(min(8, config.maxPipelinedRequests()));
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpResponseDecoder(methodQueue, config.headersFactory(),
config.maxStartLineLength(), config.maxHeaderFieldLength(), closeHandler));
pipeline.addLast(new HttpRequestEncoder(methodQueue,
config.headersEncodedSizeEstimate(), config.trailersEncodedSizeEstimate(), closeHandler));
delegate.init(channel);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -124,8 +124,10 @@
/**
* Creates a new instance with the specified parameters.
*/
protected HttpObjectDecoder(HttpHeadersFactory headersFactory, int maxStartLineLength, int maxHeaderFieldLength,
protected HttpObjectDecoder(final ByteBufAllocator alloc, final HttpHeadersFactory headersFactory,
final int maxStartLineLength, final int maxHeaderFieldLength,
final CloseHandler closeHandler) {
super(alloc);
this.closeHandler = closeHandler;
if (maxStartLineLength <= 0) {
throw new IllegalArgumentException("maxStartLineLength: " + maxStartLineLength + " (expected >0)");
@@ -407,11 +409,10 @@ protected final void decode(ChannelHandlerContext ctx, ByteBuf buffer) {
}

@Override
protected final ByteBuf swapAndCopyCumulation(final ByteBufAllocator alloc,
final ByteBuf cumulation,
protected final ByteBuf swapAndCopyCumulation(final ByteBuf cumulation,
final ByteBuf in) {
final int readerIndex = cumulation.readerIndex();
ByteBuf newCumulation = super.swapAndCopyCumulation(alloc, cumulation, in);
final ByteBuf newCumulation = super.swapAndCopyCumulation(cumulation, in);
cumulationIndex -= readerIndex - newCumulation.readerIndex();
return newCumulation;
}
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -61,7 +61,6 @@
import static io.servicetalk.http.api.HeaderUtils.isTransferEncodingChunked;
import static io.servicetalk.http.netty.HeaderUtils.calculateContentLength;
import static io.servicetalk.http.netty.HttpKeepAlive.shouldClose;
import static io.servicetalk.transport.netty.internal.PooledRecvByteBufAllocatorInitializers.POOLED_ALLOCATOR;
import static java.lang.Long.toHexString;
import static java.lang.Math.max;
import static java.nio.charset.StandardCharsets.US_ASCII;
@@ -125,7 +124,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
// We prefer a direct allocation here because it is expected the resulted encoded Buffer will be written
// to a socket. In order to do the write to the socket the memory typically needs to be allocated in direct
// memory and will be copied to direct memory if not. Using a direct buffer will avoid the copy.
ByteBuf byteBuf = POOLED_ALLOCATOR.directBuffer((int) headersEncodedSizeAccumulator);
ByteBuf byteBuf = ctx.alloc().directBuffer((int) headersEncodedSizeAccumulator);
Buffer stBuf = newBufferFrom(byteBuf);

// Encode the message.
@@ -256,7 +255,7 @@ private static void encodeChunkedContent(ChannelHandlerContext ctx, Buffer msg,
PromiseCombiner promiseCombiner) {
if (contentLength > 0) {
String lengthHex = toHexString(contentLength);
ByteBuf buf = POOLED_ALLOCATOR.directBuffer(lengthHex.length() + 2);
ByteBuf buf = ctx.alloc().directBuffer(lengthHex.length() + 2);
buf.writeCharSequence(lengthHex, US_ASCII);
writeShortBE(buf, CRLF_SHORT);
promiseCombiner.add(ctx.write(buf));
@@ -274,7 +273,7 @@ private void encodeAndWriteTrailers(ChannelHandlerContext ctx, HttpHeaders heade
if (headers.isEmpty()) {
ctx.write(ZERO_CRLF_CRLF_BUF.duplicate(), promise);
} else {
ByteBuf buf = POOLED_ALLOCATOR.directBuffer((int) trailersEncodedSizeAccumulator);
ByteBuf buf = ctx.alloc().directBuffer((int) trailersEncodedSizeAccumulator);
writeMediumBE(buf, ZERO_CRLF_MEDIUM);
encodeHeaders(headers, buf);
writeShortBE(buf, CRLF_SHORT);
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
import io.servicetalk.transport.netty.internal.CloseHandler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;

import java.util.Queue;
@@ -36,14 +37,17 @@

private final Queue<HttpRequestMethod> methodQueue;

HttpRequestDecoder(Queue<HttpRequestMethod> methodQueue,
HttpHeadersFactory headersFactory, int maxStartLineLength, int maxHeaderFieldLength) {
this(methodQueue, headersFactory, maxStartLineLength, maxHeaderFieldLength, UNSUPPORTED_PROTOCOL_CLOSE_HANDLER);
HttpRequestDecoder(final Queue<HttpRequestMethod> methodQueue, final ByteBufAllocator alloc,
final HttpHeadersFactory headersFactory, final int maxStartLineLength,
final int maxHeaderFieldLength) {
this(methodQueue, alloc, headersFactory, maxStartLineLength, maxHeaderFieldLength,
UNSUPPORTED_PROTOCOL_CLOSE_HANDLER);
}

HttpRequestDecoder(Queue<HttpRequestMethod> methodQueue, HttpHeadersFactory headersFactory,
int maxStartLineLength, int maxHeaderFieldLength, CloseHandler closeHandler) {
super(headersFactory, maxStartLineLength, maxHeaderFieldLength, closeHandler);
HttpRequestDecoder(final Queue<HttpRequestMethod> methodQueue, final ByteBufAllocator alloc,
final HttpHeadersFactory headersFactory, final int maxStartLineLength,
final int maxHeaderFieldLength, final CloseHandler closeHandler) {
super(alloc, headersFactory, maxStartLineLength, maxHeaderFieldLength, closeHandler);
this.methodQueue = requireNonNull(methodQueue);
}

0 comments on commit 82e0431

Please sign in to comment.
You can’t perform that action at this time.