Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x] HTTP/2 Client with flow-control #6399

Merged
merged 17 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void sendHeaders(Metadata headers) {
streamWriter.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS),
FlowControl.NOOP);
FlowControl.Outbound.NOOP);
}

@Override
Expand All @@ -175,7 +175,8 @@ public void sendMessage(RES message) {
Http2Flag.DataFlags.create(0),
streamId);

streamWriter.write(new Http2FrameData(header, bufferData), FlowControl.NOOP);
//FIXME: FC and MAX_FRAME_SIZE
streamWriter.writeData(new Http2FrameData(header, bufferData), FlowControl.Outbound.NOOP);
}

@Override
Expand All @@ -187,7 +188,7 @@ public void close(Status status, Metadata trailers) {
streamWriter.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
FlowControl.NOOP);
FlowControl.Outbound.NOOP);
currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void init() {
streamWriter.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
FlowControl.NOOP);
FlowControl.Outbound.NOOP);
currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public int read() throws IOException {
return -1;
}
ensureBuffer(512);
if (currentBuffer == null) {
if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read();
Expand All @@ -238,7 +238,7 @@ public int read(byte[] b, int off, int len) throws IOException {
return -1;
}
ensureBuffer(len);
if (currentBuffer == null) {
if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read(b, off, len);
Expand Down
15 changes: 15 additions & 0 deletions nima/http2/http2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.logging</groupId>
<artifactId>helidon-logging-jul</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.http2;

import java.time.Duration;
import java.util.function.BiConsumer;

import io.helidon.common.Builder;

import static java.lang.System.Logger.Level.DEBUG;

/**
* HTTP/2 Flow control for connection.
*/
public class ConnectionFlowControl {

private static final System.Logger LOGGER_OUTBOUND = System.getLogger(FlowControl.class.getName() + ".ofc");

private final Type type;
private final BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter;
private final Duration timeout;
private final WindowSize.Inbound inboundConnectionWindowSize;
private final WindowSize.Outbound outboundConnectionWindowSize;

private volatile int maxFrameSize = WindowSize.DEFAULT_MAX_FRAME_SIZE;
private volatile int initialWindowSize = WindowSize.DEFAULT_WIN_SIZE;

private ConnectionFlowControl(Type type,
int initialWindowSize,
int maxFrameSize,
BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter,
Duration timeout) {
this.type = type;
this.windowUpdateWriter = windowUpdateWriter;
this.timeout = timeout;
this.inboundConnectionWindowSize =
WindowSize.createInbound(type,
0,
initialWindowSize,
maxFrameSize,
windowUpdateWriter);
outboundConnectionWindowSize =
WindowSize.createOutbound(type, 0, this);
}

/**
* Create connection HTTP/2 flow-control for server side.
*
* @param windowUpdateWriter method called for sending WINDOW_UPDATE frames to the client.
* @return Connection HTTP/2 flow-control
*/
public static ConnectionFlowControlBuilder serverBuilder(BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter) {
return new ConnectionFlowControlBuilder(Type.SERVER, windowUpdateWriter);
}

/**
* Create connection HTTP/2 flow-control for client side.
*
* @param windowUpdateWriter method called for sending WINDOW_UPDATE frames to the server.
* @return Connection HTTP/2 flow-control
*/
public static ConnectionFlowControlBuilder clientBuilder(BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter) {
return new ConnectionFlowControlBuilder(Type.CLIENT, windowUpdateWriter);
}

/**
* Create stream specific inbound and outbound flow control.
*
* @param streamId stream id
* @return stream flow control
*/
public StreamFlowControl createStreamFlowControl(int streamId) {
return new StreamFlowControl(type, streamId, this, windowUpdateWriter);
}

/**
* Increment outbound connection flow control window, called when WINDOW_UPDATE is received.
*
* @param increment number of bytes other side has requested on top of actual demand
* @return outbound window size after increment
*/
public long incrementOutboundConnectionWindowSize(int increment) {
return outboundConnectionWindowSize.incrementWindowSize(increment);
}

/**
* Decrement inbound connection flow control window, called when DATA frame is received.
*
* @param decrement received DATA frame size in bytes
* @return inbound window size after decrement
*/
public long decrementInboundConnectionWindowSize(int decrement) {
return inboundConnectionWindowSize.decrementWindowSize(decrement);
}

/**
* Reset MAX_FRAME_SIZE for all streams, existing and future ones.
*
* @param maxFrameSize to split data frames according to when larger
*/
public void resetMaxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}

/**
* Reset an initial window size value for outbound flow control windows of a new streams.
* Don't forget to call stream.flowControl().outbound().resetStreamWindowSize(...) for each stream
* to align window size of existing streams.
*
* @param initialWindowSize INIT_WINDOW_SIZE received
*/
public void resetInitialWindowSize(int initialWindowSize) {
if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
LOGGER_OUTBOUND.log(DEBUG, String.format("%s OFC STR *: Recv INIT_WINDOW_SIZE %s", type, initialWindowSize));
}
this.initialWindowSize = initialWindowSize;
}

/**
* Connection outbound flow control window,
* decrements when DATA are sent and increments when WINDOW_UPDATE or INIT_WINDOW_SIZE is received.
* Blocks sending when window is depleted.
*
* @return connection outbound flow control window
*/
public WindowSize.Outbound outbound() {
return outboundConnectionWindowSize;
}

/**
* Connection inbound window is always manipulated by respective stream flow control,
* therefore package private is enough.
*
* @return connection inbound flow control window
*/
WindowSize.Inbound inbound() {
return inboundConnectionWindowSize;
}

int maxFrameSize() {
return maxFrameSize;
}

int initialWindowSize() {
return initialWindowSize;
}

Duration timeout() {
return timeout;
}

enum Type {
SERVER, CLIENT;
}

/**
* Connection flow control builder.
*/
public static class ConnectionFlowControlBuilder implements Builder<ConnectionFlowControlBuilder, ConnectionFlowControl> {

private static final Duration DEFAULT_TIMEOUT = Duration.ofMillis(100);
private final Type type;
private final BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter;
private int initialWindowSize = WindowSize.DEFAULT_WIN_SIZE;
private int maxFrameSize = WindowSize.DEFAULT_MAX_FRAME_SIZE;
private Duration blockTimeout = DEFAULT_TIMEOUT;

ConnectionFlowControlBuilder(Type type, BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter) {
this.type = type;
this.windowUpdateWriter = windowUpdateWriter;
}

/**
* Outbound flow control INITIAL_WINDOW_SIZE setting for new HTTP/2 connections.
*
* @param initialWindowSize units of octets
* @return updated builder
*/
public ConnectionFlowControlBuilder initialWindowSize(int initialWindowSize) {
this.initialWindowSize = initialWindowSize;
return this;
}

/**
* Initial MAX_FRAME_SIZE setting for new HTTP/2 connections.
* Maximum size of data frames in bytes we are prepared to accept from the other size.
* Default value is 2^14(16_384).
*
* @param maxFrameSize data frame size in bytes between 2^14(16_384) and 2^24-1(16_777_215)
* @return updated client
*/
public ConnectionFlowControlBuilder maxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
return this;
}

/**
* Timeout for blocking between windows size check iterations.
*
* @param timeout duration
* @return updated builder
*/
public ConnectionFlowControlBuilder blockTimeout(Duration timeout) {
this.blockTimeout = timeout;
return this;
}

@Override
public ConnectionFlowControl build() {
return new ConnectionFlowControl(type, initialWindowSize, maxFrameSize, windowUpdateWriter, blockTimeout);
}
}
}