From cd5e6f4700a4387f9383b84aca0ee9c4639b1033 Mon Sep 17 00:00:00 2001 From: jinxing Date: Wed, 9 Dec 2015 21:49:07 +0800 Subject: [PATCH 1/4] KAFKA-2837: fix transient failure kafka.api.ProducerBounceTest > testBrokerFailure --- .../test/scala/integration/kafka/api/ProducerBounceTest.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 29e146e6b871..9d7ba2a94e11 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -25,14 +25,12 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} class ProducerBounceTest extends KafkaServerTestHarness { - private val producerBufferSize = 30000 - private val serverMessageMaxBytes = producerBufferSize/2 + private val producerBufferSize = 1024L * 1024L val numServers = 2 val overridingProps = new Properties() overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) - overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString) // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) From 2bcf010c73923bb24bbd9cece7e39983b2bdce0c Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 10 Dec 2015 12:47:39 +0800 Subject: [PATCH 2/4] KAFKA-2837: WIP --- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7c928c5456cd..835a2dababef 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -455,7 +455,7 @@ object TestUtils extends Logging { */ def createNewProducer(brokerList: String, acks: Int = -1, - metadataFetchTimeout: Long = 3000L, + maxBlockMs: Long = Long.MaxValue, blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, retries: Int = 0, @@ -468,7 +468,7 @@ object TestUtils extends Logging { val producerProps = props.getOrElse(new Properties) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) - producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) From 7118e11813e445bca3eab65a23028e76138b136a Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 10 Dec 2015 12:51:43 +0800 Subject: [PATCH 3/4] KAFKA-2837: WIP --- .../test/scala/integration/kafka/api/ProducerBounceTest.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 9d7ba2a94e11..29e146e6b871 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -25,12 +25,14 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} class ProducerBounceTest extends KafkaServerTestHarness { - private val producerBufferSize = 1024L * 1024L + private val producerBufferSize = 30000 + private val serverMessageMaxBytes = producerBufferSize/2 val numServers = 2 val overridingProps = new Properties() overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) + overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString) // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) From 310dd6b34547b52aad21a35dcf631bda3e15ab64 Mon Sep 17 00:00:00 2001 From: jinxing Date: Fri, 11 Dec 2015 11:43:32 +0800 Subject: [PATCH 4/4] KAFKA-2837: WIP --- .../scala/integration/kafka/api/ProducerBounceTest.scala | 6 +++--- .../kafka/api/ProducerFailureHandlingTest.scala | 8 ++++---- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 1 - 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 29e146e6b871..369c3b7cd60c 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -66,9 +66,9 @@ class ProducerBounceTest extends KafkaServerTestHarness { override def setUp() { super.setUp() - producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize) - producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) - producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer1 = TestUtils.createNewProducer(brokerList, acks = 0, bufferSize = producerBufferSize) + producer2 = TestUtils.createNewProducer(brokerList, acks = 1, bufferSize = producerBufferSize) + producer3 = TestUtils.createNewProducer(brokerList, acks = -1, bufferSize = producerBufferSize) } @After diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 8ba7fad6d9b6..7b0910c8094f 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -63,9 +63,9 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { override def setUp() { super.setUp() - producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize) - producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) - producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer1 = TestUtils.createNewProducer(brokerList, acks = 0, maxBlockMs = 3000L, bufferSize = producerBufferSize) + producer2 = TestUtils.createNewProducer(brokerList, acks = 1, maxBlockMs = 3000L, bufferSize = producerBufferSize) + producer3 = TestUtils.createNewProducer(brokerList, acks = -1, maxBlockMs = 3000L, bufferSize = producerBufferSize) } @After @@ -134,7 +134,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers) // producer with incorrect broker list - producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 3000L, bufferSize = producerBufferSize) // send a record with incorrect broker list val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 835a2dababef..ecec866b85b6 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -456,7 +456,6 @@ object TestUtils extends Logging { def createNewProducer(brokerList: String, acks: Int = -1, maxBlockMs: Long = Long.MaxValue, - blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, retries: Int = 0, lingerMs: Long = 0,