Skip to content

Commit

Permalink
Support write of read-only Buffer(s) for HTTP/2 (#1171)
Browse files Browse the repository at this point in the history
Motivation:

ServiceTalk provides a default `BufferAllocator` backed by netty's
`ByteBufAllocator`, but also provides a `ReadOnlyBufferAllocator`
implementation that does not depend on netty. `AbstractH2DuplexHandler`
currently supports only `ServiceTalkBufferAllocator`.

Modifications:
- Share `encodeAndRetain` logic between HTTP/1.x and HTTP/2 encoders;
- Add a test that verified both HTTP versions support all `BufferAllocator`s;
- Share `HttpProtocol` enum between different tests;
- Implement `ReadOnlyBufferAllocator.toString()` for better output for test
parameters;

Result:

HTTP/2 encoder supports `Buffer`s produced by `ReadOnlyBufferAllocator`.
  • Loading branch information
idelpivnitskiy committed Oct 10, 2020
1 parent c00c1b2 commit 597e8cd
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,9 @@ public Buffer wrap(byte[] bytes, int offset, int len) {
public Buffer wrap(ByteBuffer buffer) {
return new ReadOnlyByteBuffer(buffer);
}

@Override
public String toString() {
return this.getClass().getSimpleName() + "(directByDefault: " + preferDirect + ')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.servicetalk.http.netty.H2ToStH1Utils.H2StreamRefusedException;
import io.servicetalk.http.netty.H2ToStH1Utils.H2StreamResetException;
import io.servicetalk.transport.api.ConnectionObserver.StreamObserver;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.CloseHandler;

import io.netty.buffer.ByteBuf;
Expand All @@ -41,8 +40,8 @@
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
import static io.servicetalk.buffer.netty.BufferUtils.toByteBuf;
import static io.servicetalk.buffer.netty.BufferUtils.toByteBufNoThrow;
import static io.servicetalk.http.netty.H2ToStH1Utils.h1HeadersToH2Headers;
import static io.servicetalk.http.netty.HttpObjectEncoder.encodeAndRetain;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.channelError;

abstract class AbstractH2DuplexHandler extends ChannelDuplexHandler {
Expand Down Expand Up @@ -76,14 +75,7 @@ public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
}

static void writeBuffer(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ByteBuf byteBuf = toByteBufNoThrow((Buffer) msg);
if (byteBuf == null) {
final IllegalArgumentException cause = new IllegalArgumentException("unsupported Buffer type:" + msg);
promise.setFailure(cause);
ChannelCloseUtils.close(ctx, cause);
} else {
ctx.write(new DefaultHttp2DataFrame(byteBuf.retain(), false), promise);
}
ctx.write(new DefaultHttp2DataFrame(encodeAndRetain((Buffer) msg), false), promise);
}

final void writeTrailers(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,14 @@ private static void writeBufferToByteBuf(Buffer src, ByteBuf dstByteBuf, Buffer
}
}

private static ByteBuf encodeAndRetain(Buffer msg) {
static ByteBuf encodeAndRetain(Buffer msg) {
// We still want to retain the objects we encode because otherwise folks may hold on to references of objects
// with a 0 reference count and get an IllegalReferenceCountException.
// TODO(scott): add support for file region
return toByteBuf(msg).retain();
}

static ByteBuf toByteBuf(Buffer buffer) {
private static ByteBuf toByteBuf(Buffer buffer) {
ByteBuf byteBuf = toByteBufNoThrow(buffer);
return byteBuf != null ? byteBuf : wrappedBuffer(buffer.toNioBuffer());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout;
import io.servicetalk.http.api.BlockingHttpClient;
import io.servicetalk.http.api.BlockingHttpConnection;
import io.servicetalk.http.api.HttpProtocolConfig;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.transport.api.HostAndPort;
Expand All @@ -40,8 +39,6 @@

import static io.servicetalk.http.api.HttpSerializationProviders.textDeserializer;
import static io.servicetalk.http.api.HttpSerializationProviders.textSerializer;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h1Default;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static java.lang.String.valueOf;
Expand All @@ -57,32 +54,18 @@
@RunWith(Parameterized.class)
public class HttpConnectionContextSocketOptionTest {

private enum Protocol {

HTTP_1_1(h1Default(), false),
HTTP_2(h2Default(), true);

final HttpProtocolConfig config;
final boolean autoRead;

Protocol(HttpProtocolConfig config, boolean autoRead) {
this.config = config;
this.autoRead = autoRead;
}
}

@Rule
public final Timeout timeout = new ServiceTalkTestTimeout();

private final Protocol protocol;
private final HttpProtocol protocol;

public HttpConnectionContextSocketOptionTest(Protocol protocol) {
public HttpConnectionContextSocketOptionTest(HttpProtocol protocol) {
this.protocol = protocol;
}

@Parameters(name = "protocol={0}")
public static Object[] data() {
return Protocol.values();
return HttpProtocol.values();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.http.api.HttpProtocolConfig;
import io.servicetalk.http.api.HttpProtocolVersion;

import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h1Default;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default;

enum HttpProtocol {
HTTP_1(h1Default(), HTTP_1_1),
HTTP_2(h2Default(), HTTP_2_0);

final HttpProtocolConfig config;
final HttpProtocolVersion version;

HttpProtocol(HttpProtocolConfig config, HttpProtocolVersion version) {
this.config = config;
this.version = version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpProtocolConfig;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpConnection;
Expand Down Expand Up @@ -53,14 +51,12 @@
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
import static io.servicetalk.http.api.HttpHeaderValues.ZERO;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.api.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED;
import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED_SERVER;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h1Default;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h2Default;
import static io.servicetalk.http.netty.HttpProtocol.HTTP_1;
import static io.servicetalk.http.netty.HttpProtocol.HTTP_2;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ERROR_BEFORE_READ;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ERROR_DURING_READ;
Expand All @@ -82,20 +78,7 @@
@RunWith(Parameterized.class)
public class HttpTransportObserverTest extends AbstractNettyHttpServerTest {

private enum Protocol {
HTTP_1(h1Default(), HTTP_1_1),
HTTP_2(h2Default(), HTTP_2_0);

private final HttpProtocolConfig config;
private final HttpProtocolVersion version;

Protocol(HttpProtocolConfig config, HttpProtocolVersion version) {
this.config = config;
this.version = version;
}
}

private final Protocol protocol;
private final HttpProtocol protocol;

private final TransportObserver clientTransportObserver;
private final ConnectionObserver clientConnectionObserver;
Expand All @@ -117,7 +100,7 @@ private enum Protocol {
private final CountDownLatch requestReceived = new CountDownLatch(1);
private final CountDownLatch processRequest = new CountDownLatch(1);

public HttpTransportObserverTest(Protocol protocol) {
public HttpTransportObserverTest(HttpProtocol protocol) {
super(CACHED, CACHED_SERVER);
this.protocol = protocol;
protocol(protocol.config);
Expand Down Expand Up @@ -176,8 +159,8 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
}

@Parameters(name = "protocol={0}")
public static Protocol[] data() {
return Protocol.values();
public static HttpProtocol[] data() {
return HttpProtocol.values();
}

@Test
Expand All @@ -187,7 +170,7 @@ public void connectionEstablished() throws Exception {

verify(clientTransportObserver).onNewConnection();
verify(serverTransportObserver, await()).onNewConnection();
if (protocol == Protocol.HTTP_1) {
if (protocol == HTTP_1) {
verify(clientConnectionObserver).connectionEstablished(any(ConnectionInfo.class));
verify(serverConnectionObserver, await()).connectionEstablished(any(ConnectionInfo.class));

Expand All @@ -205,7 +188,7 @@ public void connectionEstablished() throws Exception {

verifyNoMoreInteractions(clientTransportObserver, clientDataObserver, clientMultiplexedObserver,
serverTransportObserver, serverDataObserver, serverMultiplexedObserver);
if (protocol != Protocol.HTTP_2) {
if (protocol != HTTP_2) {
// HTTP/2 coded adds additional write/flush events related to connection preface. Also, it may emit more
// flush events on the pipeline after the connection is closed.
verifyNoMoreInteractions(clientConnectionObserver, serverConnectionObserver);
Expand Down Expand Up @@ -243,7 +226,7 @@ public void testRequestResponse(StreamingHttpRequest request, HttpResponseStatus
verify(serverReadObserver, atLeastOnce()).requestedToRead(anyLong());
verify(serverReadObserver, atLeastOnce()).itemRead();

if (protocol == Protocol.HTTP_2) {
if (protocol == HTTP_2) {
// HTTP/1.x has a single write publisher across all requests that does not complete after each response
verify(serverWriteObserver, await()).writeComplete();
}
Expand All @@ -256,7 +239,7 @@ public void testRequestResponse(StreamingHttpRequest request, HttpResponseStatus
verify(clientReadObserver, atLeastOnce()).itemRead();
verify(clientReadObserver).readComplete();

if (protocol == Protocol.HTTP_2) {
if (protocol == HTTP_2) {
verify(clientStreamObserver, await()).streamClosed();
verify(serverStreamObserver, await()).streamClosed();
}
Expand All @@ -282,11 +265,11 @@ public void testServerFailsResponsePayloadBody(String path, boolean serverReadCo

ExecutionException e = assertThrows(ExecutionException.class,
() -> response.payloadBody().ignoreElements().toFuture().get());
Class<? extends Throwable> causeType = protocol == Protocol.HTTP_1 ?
Class<? extends Throwable> causeType = protocol == HTTP_1 ?
ClosedChannelException.class : H2StreamResetException.class;
assertThat(e.getCause(), instanceOf(causeType));

if (protocol == Protocol.HTTP_2) {
if (protocol == HTTP_2) {
connection.closeGracefully();
}
assertConnectionClosed();
Expand Down Expand Up @@ -319,7 +302,7 @@ public void testServerFailsResponsePayloadBody(String path, boolean serverReadCo
verify(clientReadObserver, atLeastOnce()).itemRead();
verify(clientReadObserver).readFailed(any(causeType));

if (protocol == Protocol.HTTP_1) {
if (protocol == HTTP_1) {
verify(clientConnectionObserver).connectionClosed(); // FIXME should we see connection RST here?
verify(serverConnectionObserver).connectionClosed(DELIBERATE_EXCEPTION);
} else {
Expand Down Expand Up @@ -352,7 +335,7 @@ public void clientFailsRequestPayloadBody() throws Exception {
assertThat(e.getCause(), is(DELIBERATE_EXCEPTION));
processRequest.countDown();

if (protocol == Protocol.HTTP_2) {
if (protocol == HTTP_2) {
connection.closeGracefully();
}
assertConnectionClosed();
Expand All @@ -367,7 +350,7 @@ public void clientFailsRequestPayloadBody() throws Exception {
verify(serverReadObserver, atLeastOnce()).requestedToRead(anyLong());
verify(serverReadObserver, atLeastOnce()).itemRead();
verify(serverReadObserver, atMostOnce()).readCancelled();
if (protocol == Protocol.HTTP_1) {
if (protocol == HTTP_1) {
verify(serverReadObserver, atMostOnce()).readFailed(any(IOException.class));
} else {
verify(serverReadObserver, await()).readFailed(any(H2StreamResetException.class));
Expand All @@ -381,7 +364,7 @@ public void clientFailsRequestPayloadBody() throws Exception {
verify(clientReadObserver, atLeastOnce()).requestedToRead(anyLong());
verify(clientReadObserver).readCancelled();

if (protocol == Protocol.HTTP_1) {
if (protocol == HTTP_1) {
verify(clientConnectionObserver).connectionClosed(DELIBERATE_EXCEPTION);
verify(serverConnectionObserver).connectionClosed(any(IOException.class));
} else {
Expand All @@ -397,7 +380,7 @@ public void clientFailsRequestPayloadBody() throws Exception {
}

private void verifyNewReadAndNewWrite(int nonMultiplexedTimes) {
if (protocol == Protocol.HTTP_1) {
if (protocol == HTTP_1) {
verify(clientDataObserver).onNewRead();
verify(clientDataObserver).onNewWrite();
verify(serverDataObserver, await().times(nonMultiplexedTimes)).onNewRead();
Expand Down
Loading

0 comments on commit 597e8cd

Please sign in to comment.