Skip to content

Commit

Permalink
Integration test to send larger message size data in produce
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorvmittal10 committed Mar 9, 2023
1 parent 7e69329 commit 24efcbe
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 0 deletions.
Expand Up @@ -19,6 +19,7 @@
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
Expand All @@ -30,6 +31,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -3101,6 +3103,58 @@ public void produceAvroWithRecordSchemaSubjectStrategyAndLatestVersion_returnsBa
assertEquals(400, actual.getErrorCode());
}

@Test
public void produceBinaryWithLargerSizeMessage() throws Exception {
String clusterId = testEnv.kafkaCluster().getClusterId();
ByteString key = ByteString.copyFromUtf8("foo");
// Kafka server and producer is configured to accept messages upto 20971520 Bytes (20MB) but
// KafkaProducer calculates produced bytes including key, value, headers size and additional
// record overhead bytes hence producing message of 20971420 bytes.
int valueSize = ((2<<20) * 10) - 100;
byte[] value = generateBinaryData(valueSize);
ProduceRequest request =
ProduceRequest.builder()
.setKey(
ProduceRequestData.builder()
.setFormat(EmbeddedFormat.BINARY)
.setData(BinaryNode.valueOf(key.toByteArray()))
.build())
.setValue(
ProduceRequestData.builder()
.setFormat(EmbeddedFormat.BINARY)
.setData(BinaryNode.valueOf(value))
.build())
.setOriginalSize(0L)
.build();

Response response =
testEnv
.kafkaRest()
.target()
.path("/v3/clusters/" + clusterId + "/topics/" + TOPIC_NAME + "/records")
.request()
.accept(MediaType.APPLICATION_JSON)
.post(Entity.entity(request, MediaType.APPLICATION_JSON));
assertEquals(Status.OK.getStatusCode(), response.getStatus());

ProduceResponse actual = readProduceResponse(response);
assertTrue(actual.getValue().isPresent());
assertEquals(valueSize, actual.getValue().get().getSize());

ConsumerRecord<byte[], byte[]> produced =
testEnv
.kafkaCluster()
.getRecord(
TOPIC_NAME,
actual.getPartitionId(),
actual.getOffset(),
new ByteArrayDeserializer(),
new ByteArrayDeserializer());
assertEquals(key, ByteString.copyFrom(produced.key()));
assertEquals(valueSize, produced.serializedValueSize());
assertEquals(Arrays.toString(value), Arrays.toString(produced.value()));
}

private static ProduceResponse readProduceResponse(Response response) {
response.bufferEntity();
try {
Expand All @@ -3119,4 +3173,10 @@ private static ImmutableList<ErrorResponse> readErrorResponses(Response response
return ImmutableList.copyOf(
response.readEntity(new GenericType<MappingIterator<ErrorResponse>>() {}));
}

private static byte[] generateBinaryData(int messageSize) {
byte[] data = new byte[messageSize];
Arrays.fill(data, (byte) 1);
return data;
}
}
Expand Up @@ -42,6 +42,7 @@ public final class DefaultKafkaRestTestEnvironment
.addSuperUser("schema-registry")
.setCertificates(certificates, "kafka-1", "kafka-2", "kafka-3")
.setConfig("ssl.client.auth", "required")
.setConfig("message.max.bytes", String.valueOf((2<<20) * 10))
.setNumBrokers(3)
.setSecurityProtocol(SecurityProtocol.SASL_SSL)
.setZookeeper(zookeeper)
Expand All @@ -59,6 +60,7 @@ public final class DefaultKafkaRestTestEnvironment
.setCertificates(certificates, "kafka-rest")
.setConfig("producer.max.block.ms", "5000")
.setConfig("ssl.client.authentication", "REQUIRED")
.setConfig("producer.max.request.size", String.valueOf((2<<20) * 10))
.setKafkaCluster(kafkaCluster)
.setKafkaUser("kafka-rest", "kafka-rest-pass")
.setSchemaRegistry(schemaRegistry)
Expand Down

0 comments on commit 24efcbe

Please sign in to comment.