From 311aa67c7af557ab6da521bf7dd8c5098d8f56de Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 2 Jun 2017 15:25:00 -0700 Subject: [PATCH 01/10] Add a concurrent consumer in the transactions system tests. This will exercise the abort index --- .../kafkatest/tests/core/transactions_test.py | 81 +++++++++++++------ 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index a98a1c9e5ab37..1e52310df0fb7 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -84,7 +84,6 @@ def seed_messages(self): message_validator=is_int, max_messages=self.num_seed_messages, enable_idempotence=True) - seed_producer.start() wait_until(lambda: seed_producer.num_acked >= self.num_seed_messages, timeout_sec=seed_timeout_sec, @@ -93,28 +92,8 @@ def seed_messages(self): return seed_producer.acked def get_messages_from_output_topic(self): - consumer = ConsoleConsumer(context=self.test_context, - num_nodes=1, - kafka=self.kafka, - topic=self.output_topic, - new_consumer=True, - message_validator=is_int, - from_beginning=True, - consumer_timeout_ms=5000, - isolation_level="read_committed") - consumer.start() - # ensure that the consumer is up. - wait_until(lambda: consumer.alive(consumer.nodes[0]) == True, - timeout_sec=60, - err_msg="Consumer failed to start for %ds" %\ - 60) - # wait until the consumer closes, which will be 5 seconds after - # receiving the last message. - wait_until(lambda: consumer.alive(consumer.nodes[0]) == False, - timeout_sec=60, - err_msg="Consumer failed to consume %d messages in %ds" %\ - (self.num_seed_messages, 60)) - return consumer.messages_consumed[1] + consumer = self.start_consumer(self.output_topic, group_id="verifying_consumer") + return self.drain_consumer(consumer) def bounce_brokers(self, clean_shutdown): for node in self.kafka.nodes: @@ -170,8 +149,47 @@ def create_and_start_copiers(self): )) return copiers + def start_consumer(self, topic_to_read, group_id): + consumer = ConsoleConsumer(context=self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=topic_to_read, + group_id=group_id, + new_consumer=True, + message_validator=is_int, + from_beginning=True, + consumer_timeout_ms=5000, + isolation_level="read_committed") + consumer.start() + # ensure that the consumer is up. + wait_until(lambda: consumer.alive(consumer.nodes[0]) == True, + timeout_sec=60, + err_msg="Consumer failed to start for %ds" %\ + 60) + return consumer + + def drain_consumer(self, consumer): + # wait until the consumer closes, which will be 5 seconds after + # receiving the last message. + wait_until(lambda: consumer.alive(consumer.nodes[0]) == False, + timeout_sec=60, + err_msg="Consumer failed to consume %d messages in %ds" %\ + (self.num_seed_messages, 60)) + return consumer.messages_consumed[1] + def copy_messages_transactionally(self, failure_mode, bounce_target): + """Copies messages transactionally from the seeded input topic to the + output topic, either bouncing brokers or clients in a hard and soft + way as it goes. + + This method also consumes messages in read_committed mode from the + output topic while the bounces and copy is going on. + + It returns the concurrently consumed messages. + """ copiers = self.create_and_start_copiers() + concurrent_consumer = self.start_consumer(self.output_topic, + group_id="concurrent_consumer") clean_shutdown = False if failure_mode == "clean_bounce": clean_shutdown = True @@ -187,21 +205,32 @@ def copy_messages_transactionally(self, failure_mode, bounce_target): err_msg="%s - Failed to copy all messages in %ds." %\ (copier.transactional_id, 60)) self.logger.info("finished copying messages") + return self.drain_consumer(concurrent_consumer) - @cluster(num_nodes=8) + @cluster(num_nodes=9) @matrix(failure_mode=["clean_bounce", "hard_bounce"], - bounce_target=["brokers", "clients"]) + bounce_target=["clients"]) def test_transactions(self, failure_mode, bounce_target): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol self.kafka.start() input_messages = self.seed_messages() - self.copy_messages_transactionally(failure_mode, bounce_target) + concurrently_consumed_messages = self.copy_messages_transactionally(failure_mode, bounce_target) output_messages = self.get_messages_from_output_topic() + + concurrently_consumed_message_set = set(concurrently_consumed_messages) output_message_set = set(output_messages) input_message_set = set(input_messages) + num_dups = abs(len(output_messages) - len(output_message_set)) + num_dups_in_concurrent_consumer = abs(len(concurrently_consumed_messages) + - len(concurrently_consumed_message_set)) assert num_dups == 0, "Detected %d duplicates in the output stream" % num_dups assert input_message_set == output_message_set, "Input and output message sets are not equal. Num input messages %d. Num output messages %d" %\ (len(input_message_set), len(output_message_set)) + + assert num_dups_in_concurrent_consumer == 0, "Detected %d dups in concurrently consumed messages" % num_dups_in_concurrent_consumer + assert input_message_set == concurrently_consumed_message_set, \ + "Input and concurrently consumed output message sets are not equal. Num input messages: %d. Num concurrently_consumed_messages: %d" %\ + (len(input_message_set), len(concurrently_consumed_message_set)) From 36e917e8ad8091d53fd3f4721cffe77b8cdcb467 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 2 Jun 2017 15:29:47 -0700 Subject: [PATCH 02/10] Bounce brokers as well --- tests/kafkatest/tests/core/transactions_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 1e52310df0fb7..350b97f5108cb 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -209,7 +209,7 @@ def copy_messages_transactionally(self, failure_mode, bounce_target): @cluster(num_nodes=9) @matrix(failure_mode=["clean_bounce", "hard_bounce"], - bounce_target=["clients"]) + bounce_target=["clients", "brokers"]) def test_transactions(self, failure_mode, bounce_target): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol From 39299f9c9a80fb67bb19511fdb296eee5367a310 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Fri, 2 Jun 2017 16:00:09 -0700 Subject: [PATCH 03/10] Collect debug and data logs for transactions tests --- tests/kafkatest/tests/core/transactions_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 350b97f5108cb..0e6c98fbdbbe2 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -214,6 +214,8 @@ def test_transactions(self, failure_mode, bounce_target): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol + self.kafka.logs["kafka_data"]["collect_default"] = True + self.kafka.logs["kafka_operational_logs_debug"]["collect_default"] = True self.kafka.start() input_messages = self.seed_messages() concurrently_consumed_messages = self.copy_messages_transactionally(failure_mode, bounce_target) From 2927a6867befcf4f9c178b6168b5e49442a5247f Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Sat, 3 Jun 2017 13:40:07 -0700 Subject: [PATCH 04/10] Add some debug logging --- .../kafka/clients/producer/internals/TransactionManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 2842cd180aaba..34ccbe4abfed7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -397,7 +397,6 @@ synchronized TxnRequestHandler nextRequestHandler() { nextRequestHandler = pendingRequests.poll(); } - if (nextRequestHandler != null) log.trace("{}Request {} dequeued for sending", logPrefix, nextRequestHandler.requestBuilder()); From 60ec2c31d581fb43c30dc84f004e2282910846a9 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Mon, 5 Jun 2017 17:50:38 -0700 Subject: [PATCH 05/10] Tweak timeouts and wait conditions so that the concurrent consumer doesn't close prematurely when there are broker bounces --- tests/kafkatest/tests/core/transactions_test.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 0e6c98fbdbbe2..5d9d647fe99a6 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -45,8 +45,8 @@ def __init__(self, test_context): # Test parameters self.num_input_partitions = 2 self.num_output_partitions = 3 - self.num_seed_messages = 20000 - self.transaction_size = 500 + self.num_seed_messages = 100000 + self.transaction_size = 750 self.first_transactional_id = "my-first-transactional-id" self.second_transactional_id = "my-second-transactional-id" self.consumer_group = "transactions-test-consumer-group" @@ -158,18 +158,18 @@ def start_consumer(self, topic_to_read, group_id): new_consumer=True, message_validator=is_int, from_beginning=True, - consumer_timeout_ms=5000, + consumer_timeout_ms=15000, isolation_level="read_committed") consumer.start() # ensure that the consumer is up. - wait_until(lambda: consumer.alive(consumer.nodes[0]) == True, + wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True, timeout_sec=60, - err_msg="Consumer failed to start for %ds" %\ + err_msg="Consumer failed to consume any messages for for %ds" %\ 60) return consumer def drain_consumer(self, consumer): - # wait until the consumer closes, which will be 5 seconds after + # wait until the consumer closes, which will be 15 seconds after # receiving the last message. wait_until(lambda: consumer.alive(consumer.nodes[0]) == False, timeout_sec=60, From 07b378f14914d1a99668304483e0b59c6da4e461 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Tue, 6 Jun 2017 11:23:17 -0700 Subject: [PATCH 06/10] Fix timing issues in test, broker hard bounce now passes --- tests/kafkatest/tests/core/transactions_test.py | 12 ++++++------ .../kafka/tools/TransactionalMessageCopier.java | 5 +---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 5d9d647fe99a6..b0962b4f5ba7d 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -158,7 +158,6 @@ def start_consumer(self, topic_to_read, group_id): new_consumer=True, message_validator=is_int, from_beginning=True, - consumer_timeout_ms=15000, isolation_level="read_committed") consumer.start() # ensure that the consumer is up. @@ -171,10 +170,11 @@ def start_consumer(self, topic_to_read, group_id): def drain_consumer(self, consumer): # wait until the consumer closes, which will be 15 seconds after # receiving the last message. - wait_until(lambda: consumer.alive(consumer.nodes[0]) == False, + wait_until(lambda: len(consumer.messages_consumed[1]) >= self.num_seed_messages, timeout_sec=60, - err_msg="Consumer failed to consume %d messages in %ds" %\ - (self.num_seed_messages, 60)) + err_msg="Consumer consumed only %d out of %d messages in %ds" %\ + (len(consumer.messages_consumed[1]), self.num_seed_messages, 60)) + consumer.stop() return consumer.messages_consumed[1] def copy_messages_transactionally(self, failure_mode, bounce_target): @@ -208,8 +208,8 @@ def copy_messages_transactionally(self, failure_mode, bounce_target): return self.drain_consumer(concurrent_consumer) @cluster(num_nodes=9) - @matrix(failure_mode=["clean_bounce", "hard_bounce"], - bounce_target=["clients", "brokers"]) + @matrix(failure_mode=["hard_bounce", "clean_bounce"], + bounce_target=["brokers", "clients"]) def test_transactions(self, failure_mode, bounce_target): security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index c79c8546b4504..83167852b16aa 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -255,10 +255,7 @@ public void run() { try { while (0 < remainingMessages.get()) { - if ((((double) numMessagesProcessed.get() / maxMessages) * 100) % 10 == 0) { - // print status for every 10% we progress. - System.out.println(statusAsJson(numMessagesProcessed.get(), remainingMessages.get(), transactionalId)); - } + System.out.println(statusAsJson(numMessagesProcessed.get(), remainingMessages.get(), transactionalId)); if (isShuttingDown.get()) break; int messagesInCurrentTransaction = 0; From 61e2ffcd9b16df43dacb0109263282d05fd6b254 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Tue, 6 Jun 2017 13:43:03 -0700 Subject: [PATCH 07/10] Look up the coordinator again whenever there is a disconnect --- .../org/apache/kafka/clients/producer/internals/Sender.java | 2 ++ .../kafka/clients/producer/internals/TransactionManager.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8b3957fd17ed5..b15c05484cdf6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -356,6 +356,8 @@ private boolean maybeSendTransactionalRequest(long now) { } catch (IOException e) { log.debug("{}Disconnect from {} while trying to send request {}. Going " + "to back off and retry", transactionManager.logPrefix, targetNode, requestBuilder); + if (nextRequestHandler.needsCoordinator()) + transactionManager.lookupCoordinator(nextRequestHandler); } time.sleep(retryBackoffMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 34ccbe4abfed7..056407f6816fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -610,6 +610,8 @@ public void onComplete(ClientResponse response) { clearInFlightRequestCorrelationId(); if (response.wasDisconnected()) { log.trace("{}Disconnected from {}. Will retry.", logPrefix, response.destination()); + if (this.needsCoordinator()) + lookupCoordinator(this.coordinatorType(), this.coordinatorKey()); reenqueue(); } else if (response.versionMismatch() != null) { fatalError(response.versionMismatch()); From 2ea7503333c6047f373c86bafa2626f565a7721a Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Tue, 6 Jun 2017 13:56:29 -0700 Subject: [PATCH 08/10] Address PR Comments --- tests/kafkatest/tests/core/transactions_test.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index b0962b4f5ba7d..80f8edad999ad 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -163,13 +163,18 @@ def start_consumer(self, topic_to_read, group_id): # ensure that the consumer is up. wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True, timeout_sec=60, - err_msg="Consumer failed to consume any messages for for %ds" %\ + err_msg="Consumer failed to consume any messages for %ds" %\ 60) return consumer def drain_consumer(self, consumer): - # wait until the consumer closes, which will be 15 seconds after - # receiving the last message. + # wait until we read at least the expected number of messages. + # This is a safe check because both failure modes will be caught: + # 1. If we have 'num_seed_messages' but there are duplicates, then + # this is checked for later. + # + # 2. If we never reach 'num_seed_messages', then this will cause the + # test to fail. wait_until(lambda: len(consumer.messages_consumed[1]) >= self.num_seed_messages, timeout_sec=60, err_msg="Consumer consumed only %d out of %d messages in %ds" %\ From abe516964817d5bdeceb391f7fc7012c82259db7 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Tue, 6 Jun 2017 15:03:56 -0700 Subject: [PATCH 09/10] Add tests for disconnect cases --- .../clients/producer/internals/Sender.java | 5 +- .../internals/TransactionManager.java | 2 +- .../internals/TransactionManagerTest.java | 71 ++++++++++++++++++- 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index b15c05484cdf6..1a5b3212e8b29 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -356,8 +356,11 @@ private boolean maybeSendTransactionalRequest(long now) { } catch (IOException e) { log.debug("{}Disconnect from {} while trying to send request {}. Going " + "to back off and retry", transactionManager.logPrefix, targetNode, requestBuilder); - if (nextRequestHandler.needsCoordinator()) + if (nextRequestHandler.needsCoordinator()) { + // We break here so that we pick up the FindCoordinator request immediately. transactionManager.lookupCoordinator(nextRequestHandler); + break; + } } time.sleep(retryBackoffMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 056407f6816fb..dcd7a1f5f9971 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -609,7 +609,7 @@ public void onComplete(ClientResponse response) { } else { clearInFlightRequestCorrelationId(); if (response.wasDisconnected()) { - log.trace("{}Disconnected from {}. Will retry.", logPrefix, response.destination()); + log.debug("{}Disconnected from {}. Will retry.", logPrefix, response.destination()); if (this.needsCoordinator()) lookupCoordinator(this.coordinatorType(), this.coordinatorKey()); reenqueue(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index f661bafeca73c..4f2f10f7a76f2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -551,7 +551,76 @@ public void testDisconnectAndRetry() { } @Test - public void testCoordinatorLost() { + public void testLookupCoordinatorOnDisconnectAfterSend() { + // This is called from the initTransactions method in the producer as the first order of business. + // It finds the coordinator and then gets a PID. + final long pid = 13131L; + final short epoch = 1; + TransactionalRequestResult initPidResult = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + prepareInitPidResponse(Errors.NONE, true, pid, epoch); + // send pid to coordinator, should get disconnected before receiving the response, and resend the + // FindCoordinator and InitPid requests. + sender.run(time.milliseconds()); + + assertEquals(null, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + assertFalse(initPidResult.isCompleted()); + assertFalse(transactionManager.hasProducerId()); + + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + assertFalse(initPidResult.isCompleted()); + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + sender.run(time.milliseconds()); // get pid and epoch + + assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed. + assertTrue(transactionManager.hasProducerId()); + assertEquals(pid, transactionManager.producerIdAndEpoch().producerId); + assertEquals(epoch, transactionManager.producerIdAndEpoch().epoch); + } + + @Test + public void testLookupCoordinatorOnDisconnectBeforeSend() { + // This is called from the initTransactions method in the producer as the first order of business. + // It finds the coordinator and then gets a PID. + final long pid = 13131L; + final short epoch = 1; + TransactionalRequestResult initPidResult = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); // one loop to realize we need a coordinator. + sender.run(time.milliseconds()); // next loop to find coordintor. + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + client.disconnect(brokerNode.idString()); + client.blackout(brokerNode, 100); + // send pid to coordinator. Should get disconnected before the send and resend the FindCoordinator + // and InitPid requests. + sender.run(time.milliseconds()); + time.sleep(110); // waiting for the blackout period for the node to expire. + + assertEquals(null, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + assertFalse(initPidResult.isCompleted()); + assertFalse(transactionManager.hasProducerId()); + + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + assertFalse(initPidResult.isCompleted()); + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + sender.run(time.milliseconds()); // get pid and epoch + + assertTrue(initPidResult.isCompleted()); // The future should only return after the second round of retries succeed. + assertTrue(transactionManager.hasProducerId()); + assertEquals(pid, transactionManager.producerIdAndEpoch().producerId); + assertEquals(epoch, transactionManager.producerIdAndEpoch().epoch); + } + + @Test + public void testLookupCoordinatorOnNotCoordinatorError() { // This is called from the initTransactions method in the producer as the first order of business. // It finds the coordinator and then gets a PID. final long pid = 13131L; From 5ae38ee1076cd442b0c0b3536e2e1a8e6a8a1ce3 Mon Sep 17 00:00:00 2001 From: Apurva Mehta Date: Tue, 6 Jun 2017 15:35:31 -0700 Subject: [PATCH 10/10] Bump the drain timeout as it is too low on jenkins --- tests/kafkatest/tests/core/transactions_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 80f8edad999ad..c284bb6f62097 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -176,9 +176,9 @@ def drain_consumer(self, consumer): # 2. If we never reach 'num_seed_messages', then this will cause the # test to fail. wait_until(lambda: len(consumer.messages_consumed[1]) >= self.num_seed_messages, - timeout_sec=60, + timeout_sec=90, err_msg="Consumer consumed only %d out of %d messages in %ds" %\ - (len(consumer.messages_consumed[1]), self.num_seed_messages, 60)) + (len(consumer.messages_consumed[1]), self.num_seed_messages, 90)) consumer.stop() return consumer.messages_consumed[1]