Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ private void receiveRepliesLoop() {
: null);

// read data from stream
final PbjGrpcDatagramReader datagramReader = new PbjGrpcDatagramReader();
final PbjGrpcDatagramReader datagramReader =
new PbjGrpcDatagramReader(grpcClient.getConfig().maxIncomingBufferSize());
while (isStreamOpen() && !clientStream.trailers().isDone() && clientStream.hasEntity()) {
final Http2FrameData frameData;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
/**
* Configuration for PBJ GRPC client.
* @param maxSize the maximum size of messages that the client is able to receive, defaults to Codec.DEFAULT_MAX_SIZE.
* @param maxIncomingBufferSize the max size of an incoming buffer for receiving messages. Must be larger than
* the `maxSize` to account for protobuf metadata as well as support high rate of ingress
* of multiple messages, especially in case of server or bidi streaming.
* Defaults to Codec.DEFAULT_MAX_SIZE * 5.
*/
public record PbjGrpcClientConfig(
/** A read timeout. Duration.ofSeconds(10) is a good default. */
Expand All @@ -37,7 +41,8 @@ public record PbjGrpcClientConfig(
* Note that the encoding must be registered as a `Decompressor` with `GrpcCompression` to actually be supported.
*/
Set<String> acceptEncodings,
int maxSize) {
int maxSize,
int maxIncomingBufferSize) {

/** For backward compatibility before encodings were introduced. */
public PbjGrpcClientConfig(Duration readTimeout, Tls tls, Optional<String> authority, String contentType) {
Expand All @@ -48,7 +53,8 @@ public PbjGrpcClientConfig(Duration readTimeout, Tls tls, Optional<String> autho
contentType,
GrpcCompression.IDENTITY,
GrpcCompression.getDecompressorNames(),
Codec.DEFAULT_MAX_SIZE);
Codec.DEFAULT_MAX_SIZE,
Codec.DEFAULT_MAX_SIZE * 5);
}

/** For backward compatibility before maxSize was introduced. */
Expand All @@ -59,6 +65,14 @@ public PbjGrpcClientConfig(
String contentType,
String encoding,
Set<String> acceptEncodings) {
this(readTimeout, tls, authority, contentType, encoding, acceptEncodings, Codec.DEFAULT_MAX_SIZE);
this(
readTimeout,
tls,
authority,
contentType,
encoding,
acceptEncodings,
Codec.DEFAULT_MAX_SIZE,
Codec.DEFAULT_MAX_SIZE * 5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ record Datagram(int compressedFlag, BufferData data) {}
*/
static final int PREFIX_LENGTH = 5;

// These are arbitrary limits, but they seem sane and support the immediate use-case.
// It would be nice to make them configurable in the future.
// This is an arbitrary initial size, but it seems reasonable.
private static final int INITIAL_BUFFER_SIZE = 1024;
private static final int MAX_BUFFER_SIZE = 10 * 1024 * 1024;

/**
* A buffer for incoming data. We copy the incoming BufferData objects into this buffer
Expand All @@ -49,7 +47,12 @@ record Datagram(int compressedFlag, BufferData data) {}
* byte arrays when adding data to the buffer. Also, it doesn't support a capacity limit which
* may be important to prevent OOM. For this reason, we use a low-level circular byte array here.
*/
private byte[] buffer = new byte[INITIAL_BUFFER_SIZE];
private byte[] buffer;

/**
* The maximum size of the buffer.
*/
private final int maxBufferSize;

/** Where we read data from. */
private int readPosition = 0;
Expand All @@ -60,6 +63,11 @@ record Datagram(int compressedFlag, BufferData data) {}
/** The length of the actual data added to the reader. Note that the buffer is circular. */
private int length = 0;

PbjGrpcDatagramReader(final int maxBufferSize) {
this.maxBufferSize = maxBufferSize;
this.buffer = new byte[Math.min(INITIAL_BUFFER_SIZE, maxBufferSize)];
}

/**
* Add a new piece of data to this reader. It may be a complete GRPC datagram, or a piece of it,
* maybe even a piece containing a tail of one datagram and a head of another.
Expand Down Expand Up @@ -161,12 +169,12 @@ private void ensureCapacity(final int minCapacity) {
}

final int newLength = buffer.length + (minCapacity - currentCapacity);
if (newLength > MAX_BUFFER_SIZE) {
if (newLength > maxBufferSize) {
throw new BufferOverflowException();
}

// Prefer to double the size each time. But resort to the newLength if it's greater, and respect the max limit.
final int actualNewLength = Math.min(Math.max(buffer.length * 2, newLength), MAX_BUFFER_SIZE);
final int actualNewLength = Math.min(Math.max(buffer.length * 2, newLength), maxBufferSize);
final byte[] newBuffer = new byte[actualNewLength];

if (length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.junit.jupiter.api.Test;

public class PbjGrpcDatagramReaderTest {
private static final int MAX_BUFFER_SIZE = 10 * 1024 * 1024;

@Test
void checkBufferOverflow() {
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);

// First, test the happy case, fill up the buffer to the current max limit (note that it's hard-coded here):
BufferData goodData = BufferData.create("a".repeat(10 * 1024 * 1024));
Expand All @@ -28,7 +30,7 @@ void checkBufferOverflow() {

@Test
void testSingleCompleteDatagram() {
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);

// Trivial case of a zero-size datagram
BufferData zeroData = BufferData.create(new byte[] {0, 0, 0, 0, 0});
Expand Down Expand Up @@ -66,7 +68,7 @@ void testSingleCompleteDatagram() {

@Test
void testSplitDatagram() {
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);

// This is very similar to the many bytes long datagram test above, but we feed the reader
// little by little instead of adding the entire datagram at once:
Expand Down Expand Up @@ -97,7 +99,7 @@ void testSplitDatagram() {

@Test
void testFlipCircularBuffer() {
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);

// The initial size is currently 1024, so fill it up almost completely:
String dataString = "a".repeat(1000);
Expand Down Expand Up @@ -177,7 +179,7 @@ private void testDatagrams(final PbjGrpcDatagramReader reader, final List<String

@Test
void testEnlargePartiallyFilledBuffer() {
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader();
PbjGrpcDatagramReader reader = new PbjGrpcDatagramReader(MAX_BUFFER_SIZE);

// Add two datagrams of 1000 bytes, which will enlarge the initial 1024 bytes buffer
testDatagrams(reader, List.of("a".repeat(1000), "b".repeat(1000)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ static GreeterInterface.GreeterClient createClient(final int port, final String[
grpcClient = GrpcTestUtils.createGrpcClient(port, GrpcTestUtils.PROTO_OPTIONS);
} else {
grpcClient = GrpcTestUtils.createGrpcClient(
port, GrpcTestUtils.PROTO_OPTIONS, encodings[0], Set.of(encodings), Codec.DEFAULT_MAX_SIZE);
port,
GrpcTestUtils.PROTO_OPTIONS,
encodings[0],
Set.of(encodings),
Codec.DEFAULT_MAX_SIZE,
Codec.DEFAULT_MAX_SIZE * 5);
}

return new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ public static GrpcClient createGrpcClient(final int port, final ServiceInterface
requestOptions,
GrpcCompression.IDENTITY,
GrpcCompression.getDecompressorNames(),
Codec.DEFAULT_MAX_SIZE);
Codec.DEFAULT_MAX_SIZE,
Codec.DEFAULT_MAX_SIZE * 5);
}

public static GrpcClient createGrpcClient(
final int port,
final ServiceInterface.RequestOptions requestOptions,
String encoding,
Set<String> acceptEncodings,
int maxSize) {
int maxSize,
int maxIncomingBufferSize) {
final Tls tls = Tls.builder().enabled(false).build();
final WebClient webClient =
WebClient.builder().baseUri("http://localhost:" + port).tls(tls).build();
Expand All @@ -59,7 +61,14 @@ public static GrpcClient createGrpcClient(
requestOptions.authority().isPresent() ? requestOptions.authority() : Optional.of("localhost:" + port);

final PbjGrpcClientConfig config = new PbjGrpcClientConfig(
READ_TIMEOUT, tls, authority, requestOptions.contentType(), encoding, acceptEncodings, maxSize);
READ_TIMEOUT,
tls,
authority,
requestOptions.contentType(),
encoding,
acceptEncodings,
maxSize,
maxIncomingBufferSize);

return new PbjGrpcClient(webClient, config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ void testUnaryMethodReceivingExtraLargePayload() {
GrpcTestUtils.PROTO_OPTIONS,
GrpcCompression.IDENTITY,
GrpcCompression.getDecompressorNames(),
Codec.DEFAULT_MAX_SIZE * 2);
Codec.DEFAULT_MAX_SIZE * 2,
Codec.DEFAULT_MAX_SIZE * 5);
final GreeterInterface.GreeterClient client =
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ void testPbjGrpcClientCompressionWithGoogleGrpcServer() {
HelloReply.newBuilder().message("Hello " + request.name()).build());

try (final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient(
port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"), Codec.DEFAULT_MAX_SIZE)) {
port.port(),
GrpcTestUtils.PROTO_OPTIONS,
"gzip",
Set.of("gzip"),
Codec.DEFAULT_MAX_SIZE,
Codec.DEFAULT_MAX_SIZE * 5)) {
final GreeterInterface.GreeterClient client =
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);

Expand Down Expand Up @@ -109,7 +114,12 @@ void testPbjGrpcServerCompressionWithPbjClient() {
HelloReply.newBuilder().message("Hello " + request.name()).build());

try (final GrpcClient grpcClient = GrpcTestUtils.createGrpcClient(
port.port(), GrpcTestUtils.PROTO_OPTIONS, "gzip", Set.of("gzip"), Codec.DEFAULT_MAX_SIZE)) {
port.port(),
GrpcTestUtils.PROTO_OPTIONS,
"gzip",
Set.of("gzip"),
Codec.DEFAULT_MAX_SIZE,
Codec.DEFAULT_MAX_SIZE * 5)) {
final GreeterInterface.GreeterClient client =
new GreeterInterface.GreeterClient(grpcClient, GrpcTestUtils.PROTO_OPTIONS);

Expand Down
Loading