Skip to content

Commit

Permalink
KAFKA-4203: Align broker default for max.message.bytes with Java prod…
Browse files Browse the repository at this point in the history
…ucer default (#4154)

Also: Improve error message, Add test, Minor code quality fixes
Verified that the test fails if the broker default for max message bytes is lower or higher than the currently set value.

Reviewers: Andrew Choi <andchoi@linkedin.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
ijuma committed Jan 29, 2020
1 parent b4d7560 commit bd5a1c4
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
ProducerInterceptors<K, V> interceptors,
Time time) {
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
valueSerializer));
Expand Down Expand Up @@ -486,7 +486,7 @@ private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE);

if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) {
if (deliveryTimeoutMs < lingerAndRequestTimeoutMs) {
if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
// throw an exception if the user explicitly set an inconsistent value
throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
Expand Down Expand Up @@ -545,7 +545,7 @@ private static int configureInflightRequests(ProducerConfig config) {

private static short configureAcks(ProducerConfig config, Logger log) {
boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
short acks = Short.valueOf(config.getString(ProducerConfig.ACKS_CONFIG));
short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));

if (config.idempotenceEnabled()) {
if (!userConfiguredAcks)
Expand Down Expand Up @@ -1051,12 +1051,11 @@ private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long
* Validate that the record size isn't too large
*/
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize)
if (size > maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the maximum request size you have configured with the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
" configuration.");
if (size > this.totalMemorySize)
" bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
if (size > totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
Expand Down Expand Up @@ -1348,7 +1347,7 @@ private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> in
}

public void onCompletion(RecordMetadata metadata, Exception exception) {
metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1);
metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1L, -1, -1);
this.interceptors.onAcknowledgement(metadata, exception);
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public class ProducerConfig extends AbstractConfig {
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
.define(MAX_REQUEST_SIZE_CONFIG,
Type.INT,
1 * 1024 * 1024,
1024 * 1024,
atLeast(0),
Importance.MEDIUM,
MAX_REQUEST_SIZE_DOC)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object Defaults {
val BrokerIdGenerationEnable = true
val MaxReservedBrokerId = 1000
val BrokerId = -1
val MessageMaxBytes = 1000000 + Records.LOG_OVERHEAD
val MessageMaxBytes = 1024 * 1024 + Records.LOG_OVERHEAD
val NumNetworkThreads = 3
val NumIoThreads = 8
val BackgroundThreads = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import java.util.Properties
import java.util.concurrent.{ExecutionException, Future, TimeUnit}

import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.utils.ByteUtils
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Assertions.intercept
Expand Down Expand Up @@ -168,4 +171,25 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
verifySendFailure(send(producer2)) // should fail send since buffer is full
verifySendSuccess(future2) // previous batch should be completed and sent now
}

@Test
def testSendRecordBatchWithMaxRequestSizeAndHigher(): Unit = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val producer = registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer))

val keyLengthSize = 1
val headerLengthSize = 1
val valueLengthSize = 3
val overhead = Records.LOG_OVERHEAD + DefaultRecordBatch.RECORD_BATCH_OVERHEAD + DefaultRecord.MAX_RECORD_OVERHEAD +
keyLengthSize + headerLengthSize + valueLengthSize
val valueSize = Defaults.MessageMaxBytes - overhead

val record0 = new ProducerRecord(topic, new Array[Byte](0), new Array[Byte](valueSize))
assertEquals(record0.value.length, producer.send(record0).get.serializedValueSize)

val record1 = new ProducerRecord(topic, new Array[Byte](0), new Array[Byte](valueSize + 1))
assertEquals(classOf[RecordTooLargeException], intercept[ExecutionException](producer.send(record1).get).getCause.getClass)
}

}

0 comments on commit bd5a1c4

Please sign in to comment.