diff --git a/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java b/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java index e8150a4b..3265ad64 100644 --- a/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java +++ b/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java @@ -174,7 +174,8 @@ private void receiveRepliesLoop() { : null); // read data from stream - final PbjGrpcDatagramReader datagramReader = new PbjGrpcDatagramReader(); + final PbjGrpcDatagramReader datagramReader = + new PbjGrpcDatagramReader(grpcClient.getConfig().maxIncomingBufferSize()); while (isStreamOpen() && !clientStream.trailers().isDone() && clientStream.hasEntity()) { final Http2FrameData frameData; try { diff --git a/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcClientConfig.java b/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcClientConfig.java index 549aa0f8..ceecbf18 100644 --- a/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcClientConfig.java +++ b/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcClientConfig.java @@ -11,6 +11,10 @@ /** * Configuration for PBJ GRPC client. * @param maxSize the maximum size of messages that the client is able to receive, defaults to Codec.DEFAULT_MAX_SIZE. + * @param maxIncomingBufferSize the max size of an incoming buffer for receiving messages. Must be larger than + * the `maxSize` to account for protobuf metadata as well as support high rate of ingress + * of multiple messages, especially in case of server or bidi streaming. + * Defaults to Codec.DEFAULT_MAX_SIZE * 5. */ public record PbjGrpcClientConfig( /** A read timeout. Duration.ofSeconds(10) is a good default. */ @@ -37,7 +41,8 @@ public record PbjGrpcClientConfig( * Note that the encoding must be registered as a `Decompressor` with `GrpcCompression` to actually be supported. */ Set acceptEncodings, - int maxSize) { + int maxSize, + int maxIncomingBufferSize) { /** For backward compatibility before encodings were introduced. */ public PbjGrpcClientConfig(Duration readTimeout, Tls tls, Optional authority, String contentType) { @@ -48,7 +53,8 @@ public PbjGrpcClientConfig(Duration readTimeout, Tls tls, Optional autho contentType, GrpcCompression.IDENTITY, GrpcCompression.getDecompressorNames(), - Codec.DEFAULT_MAX_SIZE); + Codec.DEFAULT_MAX_SIZE, + Codec.DEFAULT_MAX_SIZE * 5); } /** For backward compatibility before maxSize was introduced. */ @@ -59,6 +65,14 @@ public PbjGrpcClientConfig( String contentType, String encoding, Set acceptEncodings) { - this(readTimeout, tls, authority, contentType, encoding, acceptEncodings, Codec.DEFAULT_MAX_SIZE); + this( + readTimeout, + tls, + authority, + contentType, + encoding, + acceptEncodings, + Codec.DEFAULT_MAX_SIZE, + Codec.DEFAULT_MAX_SIZE * 5); } } diff --git a/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReader.java b/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReader.java index 42785cf5..4616d589 100644 --- a/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReader.java +++ b/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReader.java @@ -34,10 +34,8 @@ record Datagram(int compressedFlag, BufferData data) {} */ static final int PREFIX_LENGTH = 5; - // These are arbitrary limits, but they seem sane and support the immediate use-case. - // It would be nice to make them configurable in the future. + // This is an arbitrary initial size, but it seems reasonable. private static final int INITIAL_BUFFER_SIZE = 1024; - private static final int MAX_BUFFER_SIZE = 10 * 1024 * 1024; /** * A buffer for incoming data. We copy the incoming BufferData objects into this buffer @@ -49,7 +47,12 @@ record Datagram(int compressedFlag, BufferData data) {} * byte arrays when adding data to the buffer. Also, it doesn't support a capacity limit which * may be important to prevent OOM. For this reason, we use a low-level circular byte array here. */ - private byte[] buffer = new byte[INITIAL_BUFFER_SIZE]; + private byte[] buffer; + + /** + * The maximum size of the buffer. + */ + private final int maxBufferSize; /** Where we read data from. */ private int readPosition = 0; @@ -60,6 +63,11 @@ record Datagram(int compressedFlag, BufferData data) {} /** The length of the actual data added to the reader. Note that the buffer is circular. */ private int length = 0; + PbjGrpcDatagramReader(final int maxBufferSize) { + this.maxBufferSize = maxBufferSize; + this.buffer = new byte[Math.min(INITIAL_BUFFER_SIZE, maxBufferSize)]; + } + /** * Add a new piece of data to this reader. It may be a complete GRPC datagram, or a piece of it, * maybe even a piece containing a tail of one datagram and a head of another. @@ -161,12 +169,12 @@ private void ensureCapacity(final int minCapacity) { } final int newLength = buffer.length + (minCapacity - currentCapacity); - if (newLength > MAX_BUFFER_SIZE) { + if (newLength > maxBufferSize) { throw new BufferOverflowException(); } // Prefer to double the size each time. But resort to the newLength if it's greater, and respect the max limit. - final int actualNewLength = Math.min(Math.max(buffer.length * 2, newLength), MAX_BUFFER_SIZE); + final int actualNewLength = Math.min(Math.max(buffer.length * 2, newLength), maxBufferSize); final byte[] newBuffer = new byte[actualNewLength]; if (length > 0) { diff --git a/pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReaderTest.java b/pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReaderTest.java index 86ad6d28..d20b5ee5 100644 --- a/pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReaderTest.java +++ b/pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcDatagramReaderTest.java @@ -13,9 +13,11 @@ import org.junit.jupiter.api.Test; public class PbjGrpcDatagramReaderTest { + private static final int MAX_BUFFER_SIZE = 10 * 1024 * 1024; + @Test void checkBufferOverflow() { - PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(); + PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE); // First, test the happy case, fill up the buffer to the current max limit (note that it's hard-coded here): BufferData goodData = BufferData.create("a".repeat(10 * 1024 * 1024)); @@ -28,7 +30,7 @@ void checkBufferOverflow() { @Test void testSingleCompleteDatagram() { - PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(); + PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE); // Trivial case of a zero-size datagram BufferData zeroData = BufferData.create(new byte[] {0, 0, 0, 0, 0}); @@ -66,7 +68,7 @@ void testSingleCompleteDatagram() { @Test void testSplitDatagram() { - PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(); + PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE); // This is very similar to the many bytes long datagram test above, but we feed the reader // little by little instead of adding the entire datagram at once: @@ -97,7 +99,7 @@ void testSplitDatagram() { @Test void testFlipCircularBuffer() { - PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(); + PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE); // The initial size is currently 1024, so fill it up almost completely: String dataString = "a".repeat(1000); @@ -177,7 +179,7 @@ private void testDatagrams(final PbjGrpcDatagramReader reader, final List acceptEncodings, - int maxSize) { + int maxSize, + int maxIncomingBufferSize) { final Tls tls = Tls.builder().enabled(false).build(); final WebClient webClient = WebClient.builder().baseUri("http://localhost:" + port).tls(tls).build(); @@ -59,7 +61,14 @@ public static GrpcClient createGrpcClient( requestOptions.authority().isPresent() ? requestOptions.authority() : Optional.of("localhost:" + port); final PbjGrpcClientConfig config = new PbjGrpcClientConfig( - READ_TIMEOUT, tls, authority, requestOptions.contentType(), encoding, acceptEncodings, maxSize); + READ_TIMEOUT, + tls, + authority, + requestOptions.contentType(), + encoding, + acceptEncodings, + maxSize, + maxIncomingBufferSize); return new PbjGrpcClient(webClient, config); } diff --git a/pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcClientComprehensiveTest.java b/pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcClientComprehensiveTest.java index 4bf9886d..b4232b0d 100644 --- a/pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcClientComprehensiveTest.java +++ b/pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcClientComprehensiveTest.java @@ -124,7 +124,8 @@ void testUnaryMethodReceivingExtraLargePayload() { GrpcTestUtils.PROTO_OPTIONS, GrpcCompression.IDENTITY, GrpcCompression.getDecompressorNames(), - Codec.DEFAULT_MAX_SIZE * 2); + Codec.DEFAULT_MAX_SIZE * 2, + Codec.DEFAULT_MAX_SIZE * 5); final GreeterInterface.GreeterClient client = new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS); diff --git a/pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcCompressionTest.java b/pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcCompressionTest.java index 8e30e727..44612acf 100644 --- a/pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcCompressionTest.java +++ b/pbj-integration-tests/src/test/java/com/hedera/pbj/integration/test/grpc/GrpcCompressionTest.java @@ -30,7 +30,12 @@ void testPbjGrpcClientCompressionWithGoogleGrpcServer() { HelloReply.newBuilder().message("Hello " + request.name()).build()); try (final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient( - port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"), Codec.DEFAULT_MAX_SIZE)) { + port.port(), + GrpcTestUtils.PROTO_OPTIONS, + "gzip", + Set.of("gzip"), + Codec.DEFAULT_MAX_SIZE, + Codec.DEFAULT_MAX_SIZE * 5)) { final GreeterInterface.GreeterClient client = new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS); @@ -109,7 +114,12 @@ void testPbjGrpcServerCompressionWithPbjClient() { HelloReply.newBuilder().message("Hello " + request.name()).build()); try (final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient( - port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"), Codec.DEFAULT_MAX_SIZE)) { + port.port(), + GrpcTestUtils.PROTO_OPTIONS, + "gzip", + Set.of("gzip"), + Codec.DEFAULT_MAX_SIZE, + Codec.DEFAULT_MAX_SIZE * 5)) { final GreeterInterface.GreeterClient client = new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);