Skip to content

Commit

Permalink
HTTP/2 Client Flow-control - tests
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Apr 1, 2023
1 parent 78d3f7b commit c1aa6fd
Show file tree
Hide file tree
Showing 14 changed files with 444 additions and 37 deletions.
6 changes: 6 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
<version.lib.snakeyaml>2.0</version.lib.snakeyaml>
<version.lib.typesafe-config>1.4.2</version.lib.typesafe-config>
<version.lib.tyrus>2.0.4</version.lib.tyrus>
<version.lib.vertx-core>4.3.7</version.lib.vertx-core>
<version.lib.weld-api>5.0.SP3</version.lib.weld-api>
<version.lib.weld>5.1.0.Final</version.lib.weld>
<version.lib.yasson>2.0.4</version.lib.yasson>
Expand Down Expand Up @@ -1385,6 +1386,11 @@
<artifactId>mssql-jdbc</artifactId>
<version>${version.lib.mssql-jdbc}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${version.lib.vertx-core}</version>
</dependency>
<!-- END OF Section 4: Testing -->

<!-- imported boms -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public int read() throws IOException {
return -1;
}
ensureBuffer(512);
if (currentBuffer == null) {
if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read();
Expand All @@ -238,7 +238,7 @@ public int read(byte[] b, int off, int len) throws IOException {
return -1;
}
ensureBuffer(len);
if (currentBuffer == null) {
if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read(b, off, len);
Expand Down
10 changes: 10 additions & 0 deletions nima/http2/http2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.logging</groupId>
<artifactId>helidon-logging-jul</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import java.util.function.BiConsumer;

import static java.lang.System.Logger.Level.INFO;
import static java.lang.System.Logger.Level.DEBUG;

/**
* HTTP/2 Flow control for connection.
*/
public class ConnectionFlowControl {

private static final System.Logger LOGGER = System.getLogger(FlowControl.class.getName());
private static final System.Logger LOGGER_OUTBOUND = System.getLogger(FlowControl.class.getName() + ".ofc");

private final Type type;
private final BiConsumer<Integer, Http2WindowUpdate> windowUpdateWriter;
Expand Down Expand Up @@ -114,7 +114,7 @@ public void resetMaxFrameSize(int maxFrameSize) {
* @param initialWindowSize INIT_WINDOW_SIZE received
*/
public void resetInitialWindowSize(int initialWindowSize) {
LOGGER.log(INFO, () -> String.format("%s OFC STR *: Recv INIT_WINDOW_SIZE %s", type, initialWindowSize));
LOGGER_OUTBOUND.log(DEBUG, () -> String.format("%s OFC STR *: Recv INIT_WINDOW_SIZE %s", type, initialWindowSize));
this.initialWindowSize = initialWindowSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

abstract class FlowControlImpl implements FlowControl {

private static final System.Logger LOGGER = System.getLogger(FlowControl.class.getName());
private static final System.Logger LOGGER_INBOUND = System.getLogger(FlowControl.class.getName() + ".ifc");
private static final System.Logger LOGGER_OUTBOUND = System.getLogger(FlowControl.class.getName() + ".ofc");

private final int streamId;

Expand Down Expand Up @@ -102,17 +103,17 @@ WindowSize streamWindowSize() {
@Override
public void decrementWindowSize(int decrement) {
long strRemaining = streamWindowSize().decrementWindowSize(decrement);
LOGGER.log(DEBUG, () -> String.format("%s IFC STR %d: -%d(%d)", type, streamId(), decrement, strRemaining));
LOGGER_INBOUND.log(DEBUG, () -> String.format("%s IFC STR %d: -%d(%d)", type, streamId(), decrement, strRemaining));
long connRemaining = connectionWindowSize().decrementWindowSize(decrement);
LOGGER.log(DEBUG, () -> String.format("%s IFC STR 0: -%d(%d)", type, decrement, connRemaining));
LOGGER_INBOUND.log(DEBUG, () -> String.format("%s IFC STR 0: -%d(%d)", type, decrement, connRemaining));
}

@Override
public void incrementWindowSize(int increment) {
long strRemaining = streamWindowSize.incrementWindowSize(increment);
LOGGER.log(DEBUG, () -> String.format("%s IFC STR %d: +%d(%d)", type, streamId(), increment, strRemaining));
LOGGER_INBOUND.log(DEBUG, () -> String.format("%s IFC STR %d: +%d(%d)", type, streamId(), increment, strRemaining));
long conRemaining = connectionWindowSize.incrementWindowSize(increment);
LOGGER.log(DEBUG, () -> String.format("%s IFC STR 0: +%d(%d)", type, increment, conRemaining));
LOGGER_INBOUND.log(DEBUG, () -> String.format("%s IFC STR 0: +%d(%d)", type, increment, conRemaining));
}

}
Expand Down Expand Up @@ -144,16 +145,16 @@ WindowSize streamWindowSize() {

public void decrementWindowSize(int decrement) {
long strRemaining = streamWindowSize().decrementWindowSize(decrement);
LOGGER.log(DEBUG, () -> String.format("%s OFC STR %d: -%d(%d)", type, streamId(), decrement, strRemaining));
LOGGER_OUTBOUND.log(DEBUG, () -> String.format("%s OFC STR %d: -%d(%d)", type, streamId(), decrement, strRemaining));

long connRemaining = connectionWindowSize().decrementWindowSize(decrement);
LOGGER.log(DEBUG, () -> String.format("%s OFC STR 0: -%d(%d)", type, decrement, connRemaining));
LOGGER_OUTBOUND.log(DEBUG, () -> String.format("%s OFC STR 0: -%d(%d)", type, decrement, connRemaining));
}

@Override
public long incrementStreamWindowSize(int increment) {
long remaining = streamWindowSize.incrementWindowSize(increment);
LOGGER.log(DEBUG, () -> String.format("%s OFC STR %d: +%d(%d)", type, streamId(), increment, remaining));
LOGGER_OUTBOUND.log(DEBUG, () -> String.format("%s OFC STR %d: +%d(%d)", type, streamId(), increment, remaining));
connectionFlowControl.outbound().triggerUpdate();
return remaining;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Http2FrameData[] split(int size) {
for (int i = 0; i < allFrames; i++) {
boolean lastFrame = allFrames == i + 1;
// only last frame can be smaller than max size
byte[] data = new byte[lastFrame ? lastFrameSize : size];
byte[] data = new byte[lastFrame ? (lastFrameSize != 0 ? lastFrameSize : size) : size];
this.data().read(data);
BufferData bufferData = BufferData.create(data);
splitFrames[i] = new Http2FrameData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import java.util.function.BiConsumer;

import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.INFO;

/**
* Window size container, used with {@link io.helidon.nima.http2.FlowControl}.
*/
abstract class WindowSizeImpl implements WindowSize {

private static final System.Logger LOGGER = System.getLogger(FlowControl.class.getName());
private static final System.Logger LOGGER_INBOUND = System.getLogger(FlowControl.class.getName() + ".ifc");
private static final System.Logger LOGGER_OUTBOUND = System.getLogger(FlowControl.class.getName() + ".ofc");

private final ConnectionFlowControl.Type type;
private final int streamId;
Expand All @@ -52,8 +52,8 @@ public void resetWindowSize(int size) {
// it maintains by the difference between the new value and the old value
remainingWindowSize.updateAndGet(o -> o + size - windowSize);
windowSize = size;
LOGGER.log(DEBUG, () -> String.format("%s OFC STR %d: Recv INITIAL_WINDOW_SIZE %d(%d)",
type, streamId, windowSize, remainingWindowSize.get()));
LOGGER_OUTBOUND.log(DEBUG, () -> String.format("%s OFC STR %d: Recv INITIAL_WINDOW_SIZE %d(%d)",
type, streamId, windowSize, remainingWindowSize.get()));
}

@Override
Expand Down Expand Up @@ -135,6 +135,7 @@ static final class Outbound extends WindowSizeImpl implements WindowSize.Outboun
@Override
public long incrementWindowSize(int increment) {
long remaining = super.incrementWindowSize(increment);
LOGGER_OUTBOUND.log(DEBUG, () -> String.format("%s OFC STR %d: +%d(%d)", type, streamId, increment, remaining));
triggerUpdate();
return remaining;
}
Expand All @@ -151,7 +152,8 @@ public void blockTillUpdate() {
//TODO configurable timeout
updated.get().get(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.log(DEBUG, () -> String.format("%s OFC STR %d: Window depleted, waiting for update.", type, streamId));
LOGGER_OUTBOUND.log(DEBUG, () ->
String.format("%s OFC STR %d: Window depleted, waiting for update.", type, streamId));
}
}
}
Expand Down Expand Up @@ -223,7 +225,7 @@ Context context() {
return context;
}

int streamId(){
int streamId() {
return this.streamId;
}

Expand Down Expand Up @@ -283,7 +285,7 @@ private Simple(Context context, int streamId, BiConsumer<Integer, Http2WindowUpd

@Override
void windowUpdate(ConnectionFlowControl.Type type, int streamId, int increment) {
LOGGER.log(INFO, () -> String.format("%s IFC STR %d: Send WINDOW_UPDATE %s", type, streamId, increment));
LOGGER_INBOUND.log(DEBUG, () -> String.format("%s IFC STR %d: Send WINDOW_UPDATE %s", type, streamId, increment));
windowUpdateWriter().accept(streamId(), new Http2WindowUpdate(increment));
}

Expand All @@ -307,12 +309,12 @@ private Bisection(Context context, int streamId, BiConsumer<Integer, Http2Window

@Override
void windowUpdate(ConnectionFlowControl.Type type, int streamId, int increment) {
LOGGER.log(DEBUG, () -> String.format("%s IFC STR %d: Deferred WINDOW_UPDATE %d, total %d, watermark %d",
type, streamId, increment, delayedIncrement, watermark));
LOGGER_INBOUND.log(DEBUG, () -> String.format("%s IFC STR %d: Deferred WINDOW_UPDATE %d, total %d, watermark %d",
type, streamId, increment, delayedIncrement, watermark));
delayedIncrement += increment;
if (delayedIncrement > watermark) {
LOGGER.log(DEBUG, () -> String.format("%s IFC STR %d: Send WINDOW_UPDATE %d, watermark %d",
type, streamId, delayedIncrement, watermark));
LOGGER_INBOUND.log(DEBUG, () -> String.format("%s IFC STR %d: Send WINDOW_UPDATE %d, watermark %d",
type, streamId, delayedIncrement, watermark));
windowUpdateWriter().accept(streamId(), new Http2WindowUpdate(delayedIncrement));
delayedIncrement = 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.http2;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.stream.Stream;

import io.helidon.common.buffers.BufferData;
import io.helidon.logging.common.LogConfig;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static java.lang.System.Logger.Level.DEBUG;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

class MaxFrameSizeSplitTest {

private static final System.Logger LOGGER = System.getLogger(MaxFrameSizeSplitTest.class.getName());

private static final String TEST_STRING = "Helidon data!!!!";
private static final byte[] TEST_DATA = TEST_STRING.getBytes(StandardCharsets.UTF_8);

@BeforeAll
static void beforeAll() {
LogConfig.configureRuntime();
}

private static Stream<Arguments> splitMultiple() {
return Stream.of(
Arguments.of(17, 1, 16),
Arguments.of(16, 1, 16),
Arguments.of(15, 2, 1),
Arguments.of(14, 2, 2),
Arguments.of(13, 2, 3),
Arguments.of(12, 2, 4),
Arguments.of(11, 2, 5),
Arguments.of(10, 2, 6),
Arguments.of(9, 2, 7),
Arguments.of(8, 2, 8),
Arguments.of(7, 3, 2),
Arguments.of(6, 3, 4),
Arguments.of(5, 4, 1),
Arguments.of(4, 4, 4),
Arguments.of(3, 6, 1),
Arguments.of(2, 8, 2),
Arguments.of(1, 16, 1)
);
}

@ParameterizedTest
@MethodSource
void splitMultiple(int sizeOfFrames,
int numberOfFrames,
int sizeOfLastFrame) {
LOGGER.log(DEBUG, "Splitting " + Arrays.toString(TEST_DATA) + " to frames of max size " + sizeOfFrames);

Http2FrameData frameData = createFrameData(TEST_DATA);
Http2FrameData[] split = frameData.split(sizeOfFrames);
assertThat("Unexpected number of frames", split.length, is(numberOfFrames));

BufferData joined = Stream.of(split)
.collect(() -> BufferData.create(TEST_DATA.length),
(bb, b) -> bb.write(b.data()),
(bb, bb2) -> {
});

assertThat("Result after split and join differs",
joined.readString(joined.available(), StandardCharsets.UTF_8),
is(TEST_STRING));

// Reload data depleted by previous test
split = createFrameData(TEST_DATA).split(sizeOfFrames);

for (int i = 0; i < numberOfFrames - 1; i++) {
Http2FrameData frame = split[i];
assertThat("Only last frame can have endOfStream flag",
frame.header().flags(Http2FrameTypes.DATA).endOfStream(),
is(false));

byte[] bytes = toBytes(frame);
LOGGER.log(DEBUG, i + ". frame: " + Arrays.toString(bytes));
assertThat("Unexpected size of frame " + i, bytes.length, is(sizeOfFrames));
}

Http2FrameData lastFrame = split[numberOfFrames - 1];
assertThat("Last frame is missing endOfStream flag",
lastFrame.header().flags(Http2FrameTypes.DATA).endOfStream(),
is(true));

byte[] bytes = toBytes(lastFrame);
LOGGER.log(DEBUG, numberOfFrames - 1 + ". frame: " + Arrays.toString(bytes));
assertThat("Unexpected size of the last frame", bytes.length, is(sizeOfLastFrame));
}

private Http2FrameData createFrameData(byte[] data) {
Http2FrameHeader http2FrameHeader = Http2FrameHeader.create(data.length,
Http2FrameTypes.DATA,
Http2Flag.DataFlags.create(Http2Flag.DataFlags.END_OF_STREAM),
1);
return new Http2FrameData(http2FrameHeader, BufferData.create(data));
}

private byte[] toBytes(Http2FrameData frameData) {
return toBytes(frameData.data());
}

private byte[] toBytes(BufferData data) {
byte[] b = new byte[data.available()];
data.read(b);
return b;
}
}
23 changes: 23 additions & 0 deletions nima/http2/http2/src/test/resources/logging-test.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2023 Oracle and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
handlers=io.helidon.logging.jul.HelidonConsoleHandler
# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n
#java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS.%1$tL %5$s%6$s%n
# Global logging level. Can be overridden by specific loggers
.level=INFO
io.helidon.nima.level=INFO
#io.helidon.nima.http2.MaxFrameSizeSplitTest.level=ALL

0 comments on commit c1aa6fd

Please sign in to comment.