diff --git a/common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteable.java b/common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteable.java
index a32aab05db3..7de85f74cf1 100644
--- a/common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteable.java
+++ b/common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteable.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteableImpl.java b/common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteableImpl.java
index 6ea4979ce5e..64b98cbffa3 100644
--- a/common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteableImpl.java
+++ b/common/uri/src/main/java/io/helidon/common/uri/UriQueryWriteableImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandler.java b/nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandler.java
index 4f455c49cd8..8d70c5ba092 100644
--- a/nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandler.java
+++ b/nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandler.java
@@ -151,7 +151,7 @@ public void sendHeaders(Metadata headers) {
streamWriter.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS),
- FlowControl.NOOP);
+ FlowControl.Outbound.NOOP);
}
@Override
@@ -175,7 +175,8 @@ public void sendMessage(RES message) {
Http2Flag.DataFlags.create(0),
streamId);
- streamWriter.write(new Http2FrameData(header, bufferData), FlowControl.NOOP);
+ //FIXME: FC and MAX_FRAME_SIZE
+ streamWriter.writeData(new Http2FrameData(header, bufferData), FlowControl.Outbound.NOOP);
}
@Override
@@ -187,7 +188,7 @@ public void close(Status status, Metadata trailers) {
streamWriter.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
- FlowControl.NOOP);
+ FlowControl.Outbound.NOOP);
currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL;
}
diff --git a/nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandlerNotFound.java b/nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandlerNotFound.java
index da8f7315660..19e49811926 100644
--- a/nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandlerNotFound.java
+++ b/nima/grpc/webserver/src/main/java/io/helidon/nima/grpc/webserver/GrpcProtocolHandlerNotFound.java
@@ -50,7 +50,7 @@ public void init() {
streamWriter.writeHeaders(http2Headers,
streamId,
Http2Flag.HeaderFlags.create(Http2Flag.END_OF_HEADERS | Http2Flag.END_OF_STREAM),
- FlowControl.NOOP);
+ FlowControl.Outbound.NOOP);
currentStreamState = Http2StreamState.HALF_CLOSED_LOCAL;
}
diff --git a/nima/http/media/media/src/main/java/io/helidon/nima/http/media/ReadableEntityBase.java b/nima/http/media/media/src/main/java/io/helidon/nima/http/media/ReadableEntityBase.java
index 211482f1071..6460f4bb2c1 100644
--- a/nima/http/media/media/src/main/java/io/helidon/nima/http/media/ReadableEntityBase.java
+++ b/nima/http/media/media/src/main/java/io/helidon/nima/http/media/ReadableEntityBase.java
@@ -226,7 +226,7 @@ public int read() throws IOException {
return -1;
}
ensureBuffer(512);
- if (currentBuffer == null) {
+ if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read();
@@ -238,7 +238,7 @@ public int read(byte[] b, int off, int len) throws IOException {
return -1;
}
ensureBuffer(len);
- if (currentBuffer == null) {
+ if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read(b, off, len);
diff --git a/nima/http2/http2/pom.xml b/nima/http2/http2/pom.xml
index 9db9c531a8c..36b5495c78c 100644
--- a/nima/http2/http2/pom.xml
+++ b/nima/http2/http2/pom.xml
@@ -46,6 +46,21 @@
hamcrest-all
test
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+ io.helidon.logging
+ helidon-logging-jul
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/ConnectionFlowControl.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/ConnectionFlowControl.java
new file mode 100644
index 00000000000..e26bdd2b248
--- /dev/null
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/ConnectionFlowControl.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2023 Oracle and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.nima.http2;
+
+import java.time.Duration;
+import java.util.function.BiConsumer;
+
+import io.helidon.common.Builder;
+
+import static java.lang.System.Logger.Level.DEBUG;
+
+/**
+ * HTTP/2 Flow control for connection.
+ */
+public class ConnectionFlowControl {
+
+ private static final System.Logger LOGGER_OUTBOUND = System.getLogger(FlowControl.class.getName() + ".ofc");
+
+ private final Type type;
+ private final BiConsumer windowUpdateWriter;
+ private final Duration timeout;
+ private final WindowSize.Inbound inboundConnectionWindowSize;
+ private final WindowSize.Outbound outboundConnectionWindowSize;
+
+ private volatile int maxFrameSize = WindowSize.DEFAULT_MAX_FRAME_SIZE;
+ private volatile int initialWindowSize = WindowSize.DEFAULT_WIN_SIZE;
+
+ private ConnectionFlowControl(Type type,
+ int initialWindowSize,
+ int maxFrameSize,
+ BiConsumer windowUpdateWriter,
+ Duration timeout) {
+ this.type = type;
+ this.windowUpdateWriter = windowUpdateWriter;
+ this.timeout = timeout;
+ this.inboundConnectionWindowSize =
+ WindowSize.createInbound(type,
+ 0,
+ initialWindowSize,
+ maxFrameSize,
+ windowUpdateWriter);
+ outboundConnectionWindowSize =
+ WindowSize.createOutbound(type, 0, this);
+ }
+
+ /**
+ * Create connection HTTP/2 flow-control for server side.
+ *
+ * @param windowUpdateWriter method called for sending WINDOW_UPDATE frames to the client.
+ * @return Connection HTTP/2 flow-control
+ */
+ public static ConnectionFlowControlBuilder serverBuilder(BiConsumer windowUpdateWriter) {
+ return new ConnectionFlowControlBuilder(Type.SERVER, windowUpdateWriter);
+ }
+
+ /**
+ * Create connection HTTP/2 flow-control for client side.
+ *
+ * @param windowUpdateWriter method called for sending WINDOW_UPDATE frames to the server.
+ * @return Connection HTTP/2 flow-control
+ */
+ public static ConnectionFlowControlBuilder clientBuilder(BiConsumer windowUpdateWriter) {
+ return new ConnectionFlowControlBuilder(Type.CLIENT, windowUpdateWriter);
+ }
+
+ /**
+ * Create stream specific inbound and outbound flow control.
+ *
+ * @param streamId stream id
+ * @return stream flow control
+ */
+ public StreamFlowControl createStreamFlowControl(int streamId) {
+ return new StreamFlowControl(type, streamId, this, windowUpdateWriter);
+ }
+
+ /**
+ * Increment outbound connection flow control window, called when WINDOW_UPDATE is received.
+ *
+ * @param increment number of bytes other side has requested on top of actual demand
+ * @return outbound window size after increment
+ */
+ public long incrementOutboundConnectionWindowSize(int increment) {
+ return outboundConnectionWindowSize.incrementWindowSize(increment);
+ }
+
+ /**
+ * Decrement inbound connection flow control window, called when DATA frame is received.
+ *
+ * @param decrement received DATA frame size in bytes
+ * @return inbound window size after decrement
+ */
+ public long decrementInboundConnectionWindowSize(int decrement) {
+ return inboundConnectionWindowSize.decrementWindowSize(decrement);
+ }
+
+ /**
+ * Reset MAX_FRAME_SIZE for all streams, existing and future ones.
+ *
+ * @param maxFrameSize to split data frames according to when larger
+ */
+ public void resetMaxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
+
+ /**
+ * Reset an initial window size value for outbound flow control windows of a new streams.
+ * Don't forget to call stream.flowControl().outbound().resetStreamWindowSize(...) for each stream
+ * to align window size of existing streams.
+ *
+ * @param initialWindowSize INIT_WINDOW_SIZE received
+ */
+ public void resetInitialWindowSize(int initialWindowSize) {
+ if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
+ LOGGER_OUTBOUND.log(DEBUG, String.format("%s OFC STR *: Recv INIT_WINDOW_SIZE %s", type, initialWindowSize));
+ }
+ this.initialWindowSize = initialWindowSize;
+ }
+
+ /**
+ * Connection outbound flow control window,
+ * decrements when DATA are sent and increments when WINDOW_UPDATE or INIT_WINDOW_SIZE is received.
+ * Blocks sending when window is depleted.
+ *
+ * @return connection outbound flow control window
+ */
+ public WindowSize.Outbound outbound() {
+ return outboundConnectionWindowSize;
+ }
+
+ /**
+ * Connection inbound window is always manipulated by respective stream flow control,
+ * therefore package private is enough.
+ *
+ * @return connection inbound flow control window
+ */
+ WindowSize.Inbound inbound() {
+ return inboundConnectionWindowSize;
+ }
+
+ int maxFrameSize() {
+ return maxFrameSize;
+ }
+
+ int initialWindowSize() {
+ return initialWindowSize;
+ }
+
+ Duration timeout() {
+ return timeout;
+ }
+
+ enum Type {
+ SERVER, CLIENT;
+ }
+
+ /**
+ * Connection flow control builder.
+ */
+ public static class ConnectionFlowControlBuilder implements Builder {
+
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofMillis(100);
+ private final Type type;
+ private final BiConsumer windowUpdateWriter;
+ private int initialWindowSize = WindowSize.DEFAULT_WIN_SIZE;
+ private int maxFrameSize = WindowSize.DEFAULT_MAX_FRAME_SIZE;
+ private Duration blockTimeout = DEFAULT_TIMEOUT;
+
+ ConnectionFlowControlBuilder(Type type, BiConsumer windowUpdateWriter) {
+ this.type = type;
+ this.windowUpdateWriter = windowUpdateWriter;
+ }
+
+ /**
+ * Outbound flow control INITIAL_WINDOW_SIZE setting for new HTTP/2 connections.
+ *
+ * @param initialWindowSize units of octets
+ * @return updated builder
+ */
+ public ConnectionFlowControlBuilder initialWindowSize(int initialWindowSize) {
+ this.initialWindowSize = initialWindowSize;
+ return this;
+ }
+
+ /**
+ * Initial MAX_FRAME_SIZE setting for new HTTP/2 connections.
+ * Maximum size of data frames in bytes we are prepared to accept from the other size.
+ * Default value is 2^14(16_384).
+ *
+ * @param maxFrameSize data frame size in bytes between 2^14(16_384) and 2^24-1(16_777_215)
+ * @return updated client
+ */
+ public ConnectionFlowControlBuilder maxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ return this;
+ }
+
+ /**
+ * Timeout for blocking between windows size check iterations.
+ *
+ * @param timeout duration
+ * @return updated builder
+ */
+ public ConnectionFlowControlBuilder blockTimeout(Duration timeout) {
+ this.blockTimeout = timeout;
+ return this;
+ }
+
+ @Override
+ public ConnectionFlowControl build() {
+ return new ConnectionFlowControl(type, initialWindowSize, maxFrameSize, windowUpdateWriter, blockTimeout);
+ }
+ }
+}
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControl.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControl.java
index 774fb70921b..ceaa4d20192 100644
--- a/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControl.java
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControl.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,64 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.helidon.nima.http2;
/**
* Flow control used by HTTP/2 for backpressure.
*/
public interface FlowControl {
- /**
- * No-op flow control, used for connection related frames.
- */
- FlowControl NOOP = new FlowControl() {
- @Override
- public void resetStreamWindowSize(long increment) {
- }
-
- @Override
- public void decrementWindowSize(int decrement) {
- }
-
- @Override
- public boolean incrementStreamWindowSize(int increment) {
- return false;
- }
-
- @Override
- public int getRemainingWindowSize() {
- return Integer.MAX_VALUE;
- }
-
- @Override
- public Http2FrameData[] split(Http2FrameData frame) {
- return new Http2FrameData[] {frame};
- }
-
- @Override
- public boolean blockTillUpdate() {
- return false;
- }
- };
-
- /**
- * Create a flow control for a stream.
- *
- * @param streamId stream id
- * @param streamInitialWindowSize initial window size for stream
- * @param connectionWindowSize connection window size
- * @return a new flow control
- */
- static FlowControl create(int streamId, int streamInitialWindowSize, WindowSize connectionWindowSize) {
- return new FlowControlImpl(streamId, streamInitialWindowSize, connectionWindowSize);
- }
-
- /**
- * Reset stream window size.
- *
- * @param increment increment
- */
- void resetStreamWindowSize(long increment);
/**
* Decrement window size.
@@ -80,12 +28,11 @@ static FlowControl create(int streamId, int streamInitialWindowSize, WindowSize
void decrementWindowSize(int decrement);
/**
- * Increment stream window size.
+ * Reset stream window size.
*
- * @param increment increment in bytes
- * @return {@code true} if succeeded, {@code false} if timed out
+ * @param size new window size
*/
- boolean incrementStreamWindowSize(int increment);
+ void resetStreamWindowSize(int size);
/**
* Remaining window size in bytes.
@@ -95,17 +42,55 @@ static FlowControl create(int streamId, int streamInitialWindowSize, WindowSize
int getRemainingWindowSize();
/**
- * Split frame into frames that can be sent.
- *
- * @param frame frame to split
- * @return result
+ * Inbound flow control used by HTTP/2 for backpressure.
*/
- Http2FrameData[] split(Http2FrameData frame);
+ interface Inbound extends FlowControl {
+
+ /**
+ * Increment window size.
+ *
+ * @param increment increment in bytes
+ */
+ void incrementWindowSize(int increment);
+ }
/**
- * Block until a window size update happens.
- *
- * @return {@code true} if window update happened, {@code false} in case of timeout
+ * Outbound flow control used by HTTP/2 for backpressure.
*/
- boolean blockTillUpdate();
+ interface Outbound extends FlowControl {
+
+ /**
+ * No-op outbound flow control, used for connection related frames.
+ */
+ Outbound NOOP = new FlowControlNoop.Outbound();
+
+ /**
+ * Increment stream window size.
+ *
+ * @param increment increment in bytes
+ * @return {@code true} if succeeded, {@code false} if timed out
+ */
+ long incrementStreamWindowSize(int increment);
+
+ /**
+ * Split frame into frames that can be sent.
+ *
+ * @param frame frame to split
+ * @return result
+ */
+ Http2FrameData[] cut(Http2FrameData frame);
+
+ /**
+ * Block until a window size update happens.
+ *
+ */
+ void blockTillUpdate();
+
+ /**
+ * MAX_FRAME_SIZE setting last received from the other side or default.
+ * @return MAX_FRAME_SIZE
+ */
+ int maxFrameSize();
+ }
+
}
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControlImpl.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControlImpl.java
index d5f204e5514..14f5d29c75f 100644
--- a/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControlImpl.java
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControlImpl.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,96 +13,183 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package io.helidon.nima.http2;
-import io.helidon.common.buffers.BufferData;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+
+import static java.lang.System.Logger.Level.DEBUG;
+
+abstract class FlowControlImpl implements FlowControl {
-class FlowControlImpl implements FlowControl {
+ private static final System.Logger LOGGER_INBOUND = System.getLogger(FlowControl.class.getName() + ".ifc");
+ private static final System.Logger LOGGER_OUTBOUND = System.getLogger(FlowControl.class.getName() + ".ofc");
private final int streamId;
- private final WindowSize connectionWindowSize;
- private final WindowSize streamWindowSize;
- FlowControlImpl(int streamId, int streamInitialWindowSize, WindowSize connectionWindowSize) {
+ FlowControlImpl(int streamId) {
this.streamId = streamId;
- this.connectionWindowSize = connectionWindowSize;
- this.streamWindowSize = new WindowSize(streamInitialWindowSize);
}
- @Override
- public void resetStreamWindowSize(long increment) {
- streamWindowSize.resetWindowSize(increment);
- }
+ abstract WindowSize connectionWindowSize();
- @Override
- public void decrementWindowSize(int decrement) {
- connectionWindowSize.decrementWindowSize(decrement);
- streamWindowSize.decrementWindowSize(decrement);
- }
+ abstract WindowSize streamWindowSize();
@Override
- public boolean incrementStreamWindowSize(int increment) {
- boolean overflow = streamWindowSize.incrementWindowSize(increment);
- connectionWindowSize.triggerUpdate();
- return overflow;
+ public void resetStreamWindowSize(int size) {
+ streamWindowSize().resetWindowSize(size);
}
@Override
public int getRemainingWindowSize() {
- return Integer.min(connectionWindowSize.getRemainingWindowSize(), streamWindowSize.getRemainingWindowSize());
- }
-
- @Override
- public Http2FrameData[] split(Http2FrameData frame) {
- return split(getRemainingWindowSize(), frame);
- }
-
- @Override
- public boolean blockTillUpdate() {
- return connectionWindowSize.blockTillUpdate();
+ return Math.max(0,
+ Integer.min(
+ connectionWindowSize().getRemainingWindowSize(),
+ streamWindowSize().getRemainingWindowSize()
+ )
+ );
}
@Override
public String toString() {
return "FlowControlImpl{"
+ "streamId=" + streamId
- + ", connectionWindowSize=" + connectionWindowSize
- + ", streamWindowSize=" + streamWindowSize
+ + ", connectionWindowSize=" + connectionWindowSize()
+ + ", streamWindowSize=" + streamWindowSize()
+ '}';
}
- private Http2FrameData[] split(int size, Http2FrameData frame) {
- int length = frame.header().length();
- if (length <= size || length == 0) {
- return new Http2FrameData[] {frame};
+ protected int streamId() {
+ return this.streamId;
+ }
+
+ static class Inbound extends FlowControlImpl implements FlowControl.Inbound {
+
+ private final WindowSize.Inbound connectionWindowSize;
+ private final WindowSize.Inbound streamWindowSize;
+ private final ConnectionFlowControl.Type type;
+
+ Inbound(ConnectionFlowControl.Type type,
+ int streamId,
+ int streamInitialWindowSize,
+ int streamMaxFrameSize,
+ WindowSize.Inbound connectionWindowSize,
+ BiConsumer windowUpdateStreamWriter) {
+ super(streamId);
+ this.type = type;
+ if (streamInitialWindowSize == 0) {
+ throw new IllegalArgumentException("Window size in bytes for stream-level flow control was not set.");
+ }
+ Objects.requireNonNull(connectionWindowSize, "Window size in bytes for connection-level flow control was not set.");
+ Objects.requireNonNull(windowUpdateStreamWriter, "Stream-level window update writer was not set.");
+ this.connectionWindowSize = connectionWindowSize;
+ this.streamWindowSize = WindowSize.createInbound(type,
+ streamId,
+ streamInitialWindowSize,
+ streamMaxFrameSize,
+ windowUpdateStreamWriter);
+ }
+
+ @Override
+ WindowSize connectionWindowSize() {
+ return connectionWindowSize;
+ }
+
+ @Override
+ WindowSize streamWindowSize() {
+ return streamWindowSize;
+ }
+
+ @Override
+ public void decrementWindowSize(int decrement) {
+ long strRemaining = streamWindowSize().decrementWindowSize(decrement);
+ if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
+ LOGGER_INBOUND.log(DEBUG, String.format("%s IFC STR %d: -%d(%d)", type, streamId(), decrement, strRemaining));
+ }
+ long connRemaining = connectionWindowSize().decrementWindowSize(decrement);
+ if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
+ LOGGER_INBOUND.log(DEBUG, String.format("%s IFC STR 0: -%d(%d)", type, decrement, connRemaining));
+ }
+ }
+
+ @Override
+ public void incrementWindowSize(int increment) {
+ long strRemaining = streamWindowSize.incrementWindowSize(increment);
+ if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
+ LOGGER_INBOUND.log(DEBUG, String.format("%s IFC STR %d: +%d(%d)", type, streamId(), increment, strRemaining));
+ }
+ long conRemaining = connectionWindowSize.incrementWindowSize(increment);
+ if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
+ LOGGER_INBOUND.log(DEBUG, String.format("%s IFC STR 0: +%d(%d)", type, increment, conRemaining));
+ }
+ }
+
+ }
+
+ static class Outbound extends FlowControlImpl implements FlowControl.Outbound {
+
+ private final ConnectionFlowControl.Type type;
+ private final ConnectionFlowControl connectionFlowControl;
+ private final WindowSize.Outbound streamWindowSize;
+
+ Outbound(ConnectionFlowControl.Type type,
+ int streamId,
+ ConnectionFlowControl connectionFlowControl) {
+ super(streamId);
+ this.type = type;
+ this.connectionFlowControl = connectionFlowControl;
+ this.streamWindowSize = WindowSize.createOutbound(type, streamId, connectionFlowControl);
}
- if (size == 0) {
- return new Http2FrameData[0];
+ @Override
+ WindowSize connectionWindowSize() {
+ return connectionFlowControl.outbound();
}
- byte[] data1 = new byte[size];
- byte[] data2 = new byte[length - size];
+ @Override
+ WindowSize streamWindowSize() {
+ return streamWindowSize;
+ }
- frame.data().read(data1);
- frame.data().read(data2);
+ @Override
+ public void decrementWindowSize(int decrement) {
+ long strRemaining = streamWindowSize().decrementWindowSize(decrement);
+ if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
+ LOGGER_OUTBOUND.log(DEBUG, String.format("%s OFC STR %d: -%d(%d)", type, streamId(), decrement, strRemaining));
+ }
- BufferData bufferData1 = BufferData.create(data1);
- BufferData bufferData2 = BufferData.create(data2);
+ long connRemaining = connectionWindowSize().decrementWindowSize(decrement);
+ if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
+ LOGGER_OUTBOUND.log(DEBUG, String.format("%s OFC STR 0: -%d(%d)", type, decrement, connRemaining));
- Http2FrameData frameData1 = new Http2FrameData(Http2FrameHeader.create(bufferData1.available(),
- Http2FrameTypes.DATA,
- Http2Flag.DataFlags.create(0),
- frame.header().streamId()),
- bufferData1);
+ }
+ }
- Http2FrameData frameData2 = new Http2FrameData(Http2FrameHeader.create(bufferData2.available(),
- Http2FrameTypes.DATA,
- Http2Flag.DataFlags.create(0),
- frame.header().streamId()),
- bufferData2);
+ @Override
+ public long incrementStreamWindowSize(int increment) {
+ long remaining = streamWindowSize.incrementWindowSize(increment);
+ if (LOGGER_OUTBOUND.isLoggable(DEBUG)) {
+ LOGGER_OUTBOUND.log(DEBUG, String.format("%s OFC STR %d: +%d(%d)", type, streamId(), increment, remaining));
+ }
+ connectionFlowControl.outbound().triggerUpdate();
+ return remaining;
+ }
+
+ @Override
+ public Http2FrameData[] cut(Http2FrameData frame) {
+ return frame.cut(getRemainingWindowSize());
+ }
- return new Http2FrameData[] {frameData1, frameData2};
+ @Override
+ public void blockTillUpdate() {
+ connectionFlowControl.outbound().blockTillUpdate();
+ streamWindowSize.blockTillUpdate();
+ }
+
+ @Override
+ public int maxFrameSize() {
+ return connectionFlowControl.maxFrameSize();
+ }
}
+
}
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControlNoop.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControlNoop.java
new file mode 100644
index 00000000000..14296f8fc8c
--- /dev/null
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/FlowControlNoop.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.helidon.nima.http2;
+
+import java.util.function.BiConsumer;
+
+class FlowControlNoop implements FlowControl {
+
+ @Override
+ public void decrementWindowSize(int decrement) {
+ }
+
+ @Override
+ public void resetStreamWindowSize(int size) {
+ }
+
+ @Override
+ public int getRemainingWindowSize() {
+ return Integer.MAX_VALUE;
+ }
+
+
+ // Even NOOP sends WINDOW_UPDATE frames
+ static class Inbound extends FlowControlNoop implements FlowControl.Inbound {
+
+ private final WindowSize.Inbound connectionWindowSize;
+ private final WindowSize.Inbound streamWindowSize;
+
+ Inbound(int streamId,
+ WindowSize.Inbound connectionWindowSize,
+ BiConsumer windowUpdateStreamWriter) {
+ this.connectionWindowSize = connectionWindowSize;
+ this.streamWindowSize = WindowSize.createInboundNoop(streamId, windowUpdateStreamWriter);
+ }
+
+ @Override
+ public void incrementWindowSize(int increment) {
+ streamWindowSize.incrementWindowSize(increment);
+ connectionWindowSize.incrementWindowSize(increment);
+ }
+
+ }
+
+ static class Outbound extends FlowControlNoop implements FlowControl.Outbound {
+
+ @Override
+ public long incrementStreamWindowSize(int increment) {
+ return WindowSize.MAX_WIN_SIZE;
+ }
+
+ @Override
+ public Http2FrameData[] cut(Http2FrameData frame) {
+ return new Http2FrameData[] {frame};
+ }
+
+ @Override
+ public void blockTillUpdate() {
+ }
+
+ @Override
+ public int maxFrameSize() {
+ return WindowSize.DEFAULT_MAX_FRAME_SIZE;
+ }
+ }
+
+}
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2ConnectionWriter.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2ConnectionWriter.java
index 74075aa7efa..998a3febee8 100755
--- a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2ConnectionWriter.java
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2ConnectionWriter.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,7 +35,7 @@ public class Http2ConnectionWriter implements Http2StreamWriter {
private final Lock streamLock = new ReentrantLock(true);
private final SocketContext ctx;
private final Http2FrameListener listener;
- private final Http2Headers.DynamicTable responseDynamicTable;
+ private final Http2Headers.DynamicTable outboundDynamicTable;
private final Http2HuffmanEncoder responseHuffman;
private final BufferData headerBuffer = BufferData.growing(512);
@@ -52,20 +52,24 @@ public Http2ConnectionWriter(SocketContext ctx, DataWriter writer, List {
- noLockWrite(flowControl, frame);
- return null;
- });
+ public void write(Http2FrameData frame) {
+ lockedWrite(frame);
+ }
+
+ @Override
+ public void writeData(Http2FrameData frame, FlowControl.Outbound flowControl) {
+ for (Http2FrameData f : frame.split(flowControl.maxFrameSize())) {
+ splitAndWrite(f, flowControl);
+ }
}
@Override
- public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlags flags, FlowControl flowControl) {
+ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlags flags, FlowControl.Outbound flowControl) {
// this is executing in the thread of the stream
// we must enforce parallelism of exactly 1, to make sure the dynamic table is updated
// and then immediately written
@@ -73,7 +77,7 @@ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlag
return withStreamLock(() -> {
int written = 0;
headerBuffer.clear();
- headers.write(responseDynamicTable, responseHuffman, headerBuffer);
+ headers.write(outboundDynamicTable, responseHuffman, headerBuffer);
Http2FrameHeader frameHeader = Http2FrameHeader.create(headerBuffer.available(),
Http2FrameTypes.HEADERS,
flags,
@@ -81,7 +85,7 @@ public int writeHeaders(Http2Headers headers, int streamId, Http2Flag.HeaderFlag
written += frameHeader.length();
written += Http2FrameHeader.LENGTH;
- noLockWrite(flowControl, new Http2FrameData(frameHeader, headerBuffer));
+ noLockWrite(new Http2FrameData(frameHeader, headerBuffer));
return written;
});
@@ -92,7 +96,7 @@ public int writeHeaders(Http2Headers headers,
int streamId,
Http2Flag.HeaderFlags flags,
Http2FrameData dataFrame,
- FlowControl flowControl) {
+ FlowControl.Outbound flowControl) {
// this is executing in the thread of the stream
// we must enforce parallelism of exactly 1, to make sure the dynamic table is updated
// and then immediately written
@@ -101,7 +105,7 @@ public int writeHeaders(Http2Headers headers,
int bytesWritten = 0;
headerBuffer.clear();
- headers.write(responseDynamicTable, responseHuffman, headerBuffer);
+ headers.write(outboundDynamicTable, responseHuffman, headerBuffer);
bytesWritten += headerBuffer.available();
Http2FrameHeader frameHeader = Http2FrameHeader.create(headerBuffer.available(),
@@ -110,8 +114,8 @@ public int writeHeaders(Http2Headers headers,
streamId);
bytesWritten += Http2FrameHeader.LENGTH;
- noLockWrite(flowControl, new Http2FrameData(frameHeader, headerBuffer));
- noLockWrite(flowControl, dataFrame);
+ noLockWrite(new Http2FrameData(frameHeader, headerBuffer));
+ writeData(dataFrame, flowControl);
bytesWritten += Http2FrameHeader.LENGTH;
bytesWritten += dataFrame.header().length();
@@ -127,7 +131,14 @@ public int writeHeaders(Http2Headers headers,
*/
public void updateHeaderTableSize(long newSize) throws InterruptedException {
withStreamLock(() -> {
- responseDynamicTable.protocolMaxTableSize(newSize);
+ outboundDynamicTable.protocolMaxTableSize(newSize);
+ return null;
+ });
+ }
+
+ private void lockedWrite(Http2FrameData frame) {
+ withStreamLock(() -> {
+ noLockWrite(frame);
return null;
});
}
@@ -145,40 +156,7 @@ private T withStreamLock(Callable callable) {
}
}
- private void noLockWrite(FlowControl flowControl, Http2FrameData frame) {
- if (frame.header().type() == Http2FrameTypes.DATA.type()) {
- splitAndWrite(frame, flowControl);
- } else {
- writeFrameInternal(frame);
- }
- }
-
- private void splitAndWrite(Http2FrameData frame, FlowControl flowControl) {
- Http2FrameData[] splitFrames = flowControl.split(frame);
- if (splitFrames.length == 1) {
- // windows are wide enough
- writeFrameInternal(frame);
- flowControl.decrementWindowSize(frame.header().length());
- } else if (splitFrames.length == 0) {
- // block until window update
- if (!flowControl.blockTillUpdate()) {
- // no timeout
- splitAndWrite(frame, flowControl);
- }
- } else if (splitFrames.length == 2) {
- // write send-able part and block until window update with the rest
- writeFrameInternal(splitFrames[0]);
- flowControl.decrementWindowSize(frame.header().length());
- if (!flowControl.blockTillUpdate()) {
- // no timeout
- splitAndWrite(splitFrames[1], flowControl);
- } else {
- //TODO discarded frames after timeout
- }
- }
- }
-
- private void writeFrameInternal(Http2FrameData frame) {
+ private void noLockWrite(Http2FrameData frame) {
Http2FrameHeader frameHeader = frame.header();
listener.frameHeader(ctx, frameHeader);
@@ -194,6 +172,28 @@ private void writeFrameInternal(Http2FrameData frame) {
}
}
+ private void splitAndWrite(Http2FrameData frame, FlowControl.Outbound flowControl) {
+ Http2FrameData currFrame = frame;
+ while (true) {
+ Http2FrameData[] splitFrames = flowControl.cut(currFrame);
+ if (splitFrames.length == 1) {
+ // windows are wide enough
+ lockedWrite(currFrame);
+ flowControl.decrementWindowSize(currFrame.header().length());
+ break;
+ } else if (splitFrames.length == 0) {
+ // block until window update
+ flowControl.blockTillUpdate();
+ } else if (splitFrames.length == 2) {
+ // write send-able part and block until window update with the rest
+ lockedWrite(splitFrames[0]);
+ flowControl.decrementWindowSize(currFrame.header().length());
+ flowControl.blockTillUpdate();
+ currFrame = splitFrames[1];
+ }
+ }
+ }
+
// TODO use for fastpath
// private void noLockWrite(Http2FrameData... frames) {
// List toWrite = new LinkedList<>();
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2FrameData.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2FrameData.java
index fe8ec9140fc..6cac83d87d3 100644
--- a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2FrameData.java
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2FrameData.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,4 +25,102 @@
* @param data frame data
*/
public record Http2FrameData(Http2FrameHeader header, BufferData data) {
+
+ /**
+ * Split this frame to smaller frames of maximum frame size.
+ *
+ * @param size maximum frame size
+ * @return array of
+ */
+ public Http2FrameData[] split(int size) {
+ int length = this.header().length();
+
+ // Already smaller than max size
+ if (length <= size || length == 0) {
+ return new Http2FrameData[] {this};
+ }
+
+ // Zero max size fast path
+ if (size == 0) {
+ return new Http2FrameData[0];
+ }
+
+ // End of stream flag is set only to the last frame in the array
+ boolean endOfStream = this.header().flags(Http2FrameTypes.DATA).endOfStream();
+
+ int lastFrameSize = length % size;
+
+ // Avoid creating 0 length last frame
+ int allFrames = (length / size) + (lastFrameSize != 0 ? 1 : 0);
+ Http2FrameData[] splitFrames = new Http2FrameData[allFrames];
+
+ for (int i = 0; i < allFrames; i++) {
+ boolean lastFrame = allFrames == i + 1;
+ // only last frame can be smaller than max size
+ byte[] data = new byte[lastFrame ? (lastFrameSize != 0 ? lastFrameSize : size) : size];
+ this.data().read(data);
+ BufferData bufferData = BufferData.create(data);
+ splitFrames[i] = new Http2FrameData(
+ Http2FrameHeader.create(bufferData.available(),
+ Http2FrameTypes.DATA,
+ Http2Flag.DataFlags.create(endOfStream && lastFrame
+ ? Http2Flag.END_OF_STREAM
+ : 0),
+ this.header().streamId()),
+ bufferData);
+ }
+ return splitFrames;
+ }
+
+ /**
+ * Cut the frame of given size from larger frame,
+ * returns two frames, first of given size, second with the rest of the data.
+ *
+ * @param size maximum frame size of the first frame
+ * @return array of 0,1 or 2 frames
+ */
+ public Http2FrameData[] cut(int size) {
+ int length = this.header().length();
+
+ // Already smaller than max size
+ if (length <= size || length == 0) {
+ return new Http2FrameData[] {this};
+ }
+
+ // Zero max size fast path
+ if (size == 0) {
+ return new Http2FrameData[0];
+ }
+
+ // End of stream flag is set only to the last frame in the array
+ boolean endOfStream = this.header.flags(Http2FrameTypes.DATA).endOfStream();
+
+ byte[] data1 = new byte[size];
+ byte[] data2 = new byte[length - size];
+
+ this.data().read(data1);
+ this.data().read(data2);
+
+ BufferData bufferData1 = BufferData.create(data1);
+ BufferData bufferData2 = BufferData.create(data2);
+
+ Http2FrameData frameData1 =
+ new Http2FrameData(Http2FrameHeader.create(bufferData1.available(),
+ Http2FrameTypes.DATA,
+ Http2Flag.DataFlags.create(0),
+ this.header().streamId()),
+ bufferData1);
+
+ Http2FrameData frameData2 =
+ new Http2FrameData(Http2FrameHeader.create(bufferData2.available(),
+ Http2FrameTypes.DATA,
+ Http2Flag.DataFlags.create(endOfStream
+ ? Http2Flag.END_OF_STREAM
+ : 0),
+ this.header().streamId()),
+ bufferData2);
+
+ return new Http2FrameData[] {frameData1, frameData2};
+ }
+
}
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Headers.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Headers.java
index ae3c9d66010..e9ed7ac5513 100644
--- a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Headers.java
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Headers.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -779,12 +779,12 @@ enum StaticHeader implements IndexedHeaderRecord {
for (StaticHeader predefinedHeader : StaticHeader.values()) {
BY_INDEX.put(predefinedHeader.index(), predefinedHeader);
maxIndex = Math.max(maxIndex, predefinedHeader.index);
+ // Indexed headers may be referenced either with or without value, so we need to store them in both tables
if (predefinedHeader.hasValue()) {
BY_NAME_VALUE.computeIfAbsent(predefinedHeader.headerName().lowerCase(), it -> new HashMap<>())
.put(predefinedHeader.value(), predefinedHeader);
- } else {
- BY_NAME_NO_VALUE.put(predefinedHeader.headerName().lowerCase(), predefinedHeader);
}
+ BY_NAME_NO_VALUE.putIfAbsent(predefinedHeader.headerName().lowerCase(), predefinedHeader);
}
MAX_INDEX = maxIndex;
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Setting.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Setting.java
index 401fad64f83..ce82da8770c 100644
--- a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Setting.java
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Setting.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -74,7 +74,7 @@ public interface Http2Setting {
int identifier();
/**
- * Typed default value of this settiing.
+ * Typed default value of this setting.
*
* @return default value
*/
diff --git a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Settings.java b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Settings.java
index 0739c7aa5f5..8aa31644446 100644
--- a/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Settings.java
+++ b/nima/http2/http2/src/main/java/io/helidon/nima/http2/Http2Settings.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,10 +25,13 @@
import io.helidon.common.buffers.BufferData;
import io.helidon.common.socket.SocketContext;
+import static java.lang.System.Logger.Level.DEBUG;
+
/**
* HTTP settings frame.
*/
public final class Http2Settings implements Http2Frame {
+ private static final System.Logger LOGGER = System.getLogger(Http2Settings.class.getName());
private final Map values;
Http2Settings(Map values) {
@@ -36,7 +39,7 @@ public final class Http2Settings implements Http2Frame
}
/**
- * Create emtpy settings frame.
+ * Create empty settings frame.
*
* @return settings frame
*/
@@ -81,6 +84,10 @@ public static Http2Settings create(BufferData frame) {
return new Http2Settings(values);
}
+ private static String toString(String setting, int code, Object value) {
+ return String.format("[%s (0x%02x):%s]", setting, code, value);
+ }
+
@SuppressWarnings("unchecked")
@Override
public Http2FrameData toFrameData(Http2Settings settings, int streamId, Http2Flag.SettingsFlags flags) {
@@ -89,9 +96,11 @@ public Http2FrameData toFrameData(Http2Settings settings, int streamId, Http2Fla
values.values().forEach(it -> {
Object value = it.value();
Http2Setting