From caf74a65184f51c24e4f70324df15920ebb3e1ba Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Wed, 18 Jul 2018 14:07:32 +0200 Subject: [PATCH 1/2] Proposed API changes --- .../bootstrap/Http2MultiplexingRequester.java | 28 ++++++++++++++++--- .../impl/bootstrap/HttpAsyncRequester.java | 25 +++++++++++++++-- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java index b72d59d109..db46f8bf19 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java @@ -51,6 +51,7 @@ import org.apache.hc.core5.http.impl.DefaultAddressResolver; import org.apache.hc.core5.http.impl.bootstrap.AsyncRequester; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; import org.apache.hc.core5.http.nio.CapacityChannel; @@ -115,20 +116,29 @@ public void setValidateAfterInactivity(final TimeValue timeValue) { connPool.setValidateAfterInactivity(timeValue); } - public Cancellable execute( + public final Cancellable execute( final AsyncClientExchangeHandler exchangeHandler, + final AsyncPushConsumer pushConsumer, final Timeout timeout, final HttpContext context) { Args.notNull(exchangeHandler, "Exchange handler"); Args.notNull(timeout, "Timeout"); Args.notNull(context, "Context"); final CancellableExecution cancellableExecution = new CancellableExecution(); - execute(exchangeHandler, cancellableExecution, timeout, context); + execute(exchangeHandler, pushConsumer, cancellableExecution, timeout, context); return cancellableExecution; } + public final Cancellable execute( + final AsyncClientExchangeHandler exchangeHandler, + final Timeout timeout, + final HttpContext context) { + return execute(exchangeHandler, null, timeout, context); + } + private void execute( final AsyncClientExchangeHandler exchangeHandler, + final AsyncPushConsumer pushConsumer, final CancellableDependency cancellableDependency, final Timeout timeout, final HttpContext context) { @@ -236,6 +246,7 @@ public void cancelled() { public final Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, + final AsyncPushConsumer pushConsumer, final Timeout timeout, final HttpContext context, final FutureCallback callback) { @@ -261,16 +272,25 @@ public void cancelled() { } }); - execute(exchangeHandler, future, timeout, context != null ? context : HttpCoreContext.create()); + execute(exchangeHandler, pushConsumer, future, timeout, context != null ? context : HttpCoreContext.create()); return future; } + public final Future execute( + final AsyncRequestProducer requestProducer, + final AsyncResponseConsumer responseConsumer, + final Timeout timeout, + final HttpContext context, + final FutureCallback callback) { + return execute(requestProducer, responseConsumer, null, timeout, context, callback); + } + public final Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, final Timeout timeout, final FutureCallback callback) { - return execute(requestProducer, responseConsumer, timeout, null, callback); + return execute(requestProducer, responseConsumer, null, timeout, null, callback); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java index 315aae6b4b..12cc060270 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java @@ -50,6 +50,7 @@ import org.apache.hc.core5.http.impl.DefaultAddressResolver; import org.apache.hc.core5.http.nio.AsyncClientEndpoint; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; import org.apache.hc.core5.http.nio.CapacityChannel; @@ -249,8 +250,9 @@ public Future connect(final HttpHost host, final Timeout ti return connect(host, timeout, null, null); } - public void execute( + public final void execute( final AsyncClientExchangeHandler exchangeHandler, + final AsyncPushConsumer pushConsumer, final Timeout timeout, final HttpContext context) { Args.notNull(exchangeHandler, "Exchange handler"); @@ -363,9 +365,17 @@ public void cancelled() { } } + public final void execute( + final AsyncClientExchangeHandler exchangeHandler, + final Timeout timeout, + final HttpContext context) { + execute(exchangeHandler, null, timeout, context); + } + public final Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, + final AsyncPushConsumer pushConsumer, final Timeout timeout, final HttpContext context, final FutureCallback callback) { @@ -391,16 +401,25 @@ public void cancelled() { } }); - execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create()); + execute(exchangeHandler, pushConsumer, timeout, context != null ? context : HttpCoreContext.create()); return future; } + public final Future execute( + final AsyncRequestProducer requestProducer, + final AsyncResponseConsumer responseConsumer, + final Timeout timeout, + final HttpContext context, + final FutureCallback callback) { + return execute(requestProducer, responseConsumer, null, timeout, context, callback); + } + public final Future execute( final AsyncRequestProducer requestProducer, final AsyncResponseConsumer responseConsumer, final Timeout timeout, final FutureCallback callback) { - return execute(requestProducer, responseConsumer, timeout, null, callback); + return execute(requestProducer, responseConsumer, null, timeout, null, callback); } private class InternalAsyncClientEndpoint extends AsyncClientEndpoint { From 08103e1311058355817ff8396f8931cb7bcacfd4 Mon Sep 17 00:00:00 2001 From: Alex Osborne Date: Thu, 19 Jul 2018 17:10:39 +0900 Subject: [PATCH 2/2] Implement AsyncPushConsumer execution API --- .../nio/AbstractHttp2StreamMultiplexer.java | 20 +++++++++++++---- .../nio/ClientHttp2StreamMultiplexer.java | 3 ++- .../nio/ClientPushHttp2StreamHandler.java | 22 +++++++++++-------- .../nio/ServerHttp2StreamMultiplexer.java | 2 ++ .../bootstrap/Http2MultiplexingRequester.java | 2 +- .../http/nio/command/ExecutionCommand.java | 10 ++++++++- 6 files changed, 43 insertions(+), 16 deletions(-) diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java index f17e3a84cb..77a9262851 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractHttp2StreamMultiplexer.java @@ -61,6 +61,7 @@ import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics; import org.apache.hc.core5.http.impl.CharCodingSupport; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncPushProducer; import org.apache.hc.core5.http.nio.command.ExecutionCommand; import org.apache.hc.core5.http.nio.command.ShutdownCommand; @@ -177,7 +178,7 @@ public String getId() { } abstract Http2StreamHandler createRemotelyInitiatedStream( - Http2StreamChannel channel, HttpProcessor httpProcessor, BasicHttpConnectionMetrics connMetrics) throws IOException; + Http2StreamChannel channel, HttpProcessor httpProcessor, AsyncPushConsumer pushConsumer, BasicHttpConnectionMetrics connMetrics) throws IOException; private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException { for (;;) { @@ -600,6 +601,7 @@ private void processPendingCommands() throws IOException, HttpException { localConfig.getInitialWindowSize(), remoteConfig.getInitialWindowSize()); final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler(); + final AsyncPushConsumer pushConsumer = executionCommand.getPushConsumer(); final CancellableDependency cancellableDependency = executionCommand.getCancellableDependency(); final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext()); context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession()); @@ -610,7 +612,7 @@ private void processPendingCommands() throws IOException, HttpException { connMetrics, exchangeHandler, context); - final Http2Stream stream = new Http2Stream(channel, streamHandler, false); + final Http2Stream stream = new Http2Stream(channel, streamHandler, pushConsumer, false); streamMap.put(streamId, stream); if (stream.isOutputReady()) { @@ -754,7 +756,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio localConfig.getInitialWindowSize(), remoteConfig.getInitialWindowSize()); final Http2StreamHandler streamHandler = createRemotelyInitiatedStream( - channel, httpProcessor, connMetrics); + channel, httpProcessor, null, connMetrics); stream = new Http2Stream(channel, streamHandler, true); if (stream.isOutputReady()) { stream.produceOutput(); @@ -933,7 +935,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio localConfig.getInitialWindowSize(), remoteConfig.getInitialWindowSize()); final Http2StreamHandler streamHandler = createRemotelyInitiatedStream( - channel, httpProcessor, connMetrics); + channel, httpProcessor, stream.pushConsumer, connMetrics); final Http2Stream promisedStream = new Http2Stream(channel, streamHandler, true); streamMap.put(promisedStreamId, promisedStream); @@ -1476,14 +1478,24 @@ private static class Http2Stream { private final Http2StreamChannelImpl channel; private final Http2StreamHandler handler; + private final AsyncPushConsumer pushConsumer; private final boolean remoteInitiated; private Http2Stream( final Http2StreamChannelImpl channel, final Http2StreamHandler handler, final boolean remoteInitiated) { + this(channel, handler, null, remoteInitiated); + } + + private Http2Stream( + final Http2StreamChannelImpl channel, + final Http2StreamHandler handler, + final AsyncPushConsumer pushConsumer, + final boolean remoteInitiated) { this.channel = channel; this.handler = handler; + this.pushConsumer = pushConsumer; this.remoteInitiated = remoteInitiated; } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java index d6dcf59c92..fb8f3ffa32 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientHttp2StreamMultiplexer.java @@ -83,11 +83,12 @@ public ClientHttp2StreamMultiplexer( Http2StreamHandler createRemotelyInitiatedStream( final Http2StreamChannel channel, final HttpProcessor httpProcessor, + final AsyncPushConsumer pushConsumer, final BasicHttpConnectionMetrics connMetrics) throws IOException { final HttpCoreContext context = HttpCoreContext.create(); context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession()); context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails()); - return new ClientPushHttp2StreamHandler(channel, httpProcessor, connMetrics, pushHandlerFactory, context); + return new ClientPushHttp2StreamHandler(channel, httpProcessor, connMetrics, pushHandlerFactory, pushConsumer, context); } @Override diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java index dfbf816ea6..f79e645fc5 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientPushHttp2StreamHandler.java @@ -72,11 +72,13 @@ class ClientPushHttp2StreamHandler implements Http2StreamHandler { final HttpProcessor httpProcessor, final BasicHttpConnectionMetrics connMetrics, final HandlerFactory pushHandlerFactory, + final AsyncPushConsumer pushConsumer, final HttpCoreContext context) { this.internalOutputChannel = outputChannel; this.httpProcessor = httpProcessor; this.connMetrics = connMetrics; this.pushHandlerFactory = pushHandlerFactory; + this.exchangeHandler = pushConsumer; this.context = context; this.failed = new AtomicBoolean(false); this.done = new AtomicBoolean(false); @@ -99,16 +101,18 @@ public void consumePromise(final List
headers) throws HttpException, IOE request = DefaultH2RequestConverter.INSTANCE.convert(headers); - final AsyncPushConsumer handler; - try { - handler = pushHandlerFactory != null ? pushHandlerFactory.create(request, context) : null; - } catch (final ProtocolException ex) { - throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, ex.getMessage()); - } - if (handler == null) { - throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused"); + if (exchangeHandler == null) { + final AsyncPushConsumer handler; + try { + handler = pushHandlerFactory != null ? pushHandlerFactory.create(request, context) : null; + } catch (final ProtocolException ex) { + throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, ex.getMessage()); + } + if (handler == null) { + throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused"); + } + exchangeHandler = handler; } - exchangeHandler = handler; context.setProtocolVersion(HttpVersion.HTTP_2); context.setAttribute(HttpCoreContext.HTTP_REQUEST, request); diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java index b2f8139cb6..c8d84b5dcd 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerHttp2StreamMultiplexer.java @@ -30,6 +30,7 @@ import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.protocol.HttpCoreContext; @@ -76,6 +77,7 @@ public ServerHttp2StreamMultiplexer( Http2StreamHandler createRemotelyInitiatedStream( final Http2StreamChannel channel, final HttpProcessor httpProcessor, + final AsyncPushConsumer pushConsumer, final BasicHttpConnectionMetrics connMetrics) throws IOException { final HttpCoreContext context = HttpCoreContext.create(); context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession()); diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java index db46f8bf19..98de1287e5 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/Http2MultiplexingRequester.java @@ -220,7 +220,7 @@ public void failed(final Exception cause) { exchangeHandler.failed(cause); } - }, cancellableDependency, context), Command.Priority.NORMAL); + }, pushConsumer, cancellableDependency, context), Command.Priority.NORMAL); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java index f4006ab1fe..dc6edbd1fe 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/command/ExecutionCommand.java @@ -29,6 +29,7 @@ import org.apache.hc.core5.concurrent.CancellableDependency; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.reactor.Command; import org.apache.hc.core5.util.Args; @@ -41,14 +42,17 @@ public final class ExecutionCommand implements Command { private final AsyncClientExchangeHandler exchangeHandler; + private final AsyncPushConsumer pushConsumer; private final CancellableDependency cancellableDependency; private final HttpContext context; public ExecutionCommand( final AsyncClientExchangeHandler exchangeHandler, + final AsyncPushConsumer pushConsumer, final CancellableDependency cancellableDependency, final HttpContext context) { this.exchangeHandler = Args.notNull(exchangeHandler, "Handler"); + this.pushConsumer = pushConsumer; this.cancellableDependency = cancellableDependency; this.context = context; } @@ -56,7 +60,7 @@ public ExecutionCommand( public ExecutionCommand( final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) { - this(exchangeHandler, null, context); + this(exchangeHandler, null, null, context); } public AsyncClientExchangeHandler getExchangeHandler() { @@ -71,6 +75,10 @@ public HttpContext getContext() { return context; } + public AsyncPushConsumer getPushConsumer() { + return pushConsumer; + } + @Override public boolean cancel() { exchangeHandler.cancel();