diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index d4c4225a0bf8..b9757bad6d0a 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -329,7 +329,7 @@ def test_bounce(self, clean): self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() - self.source = VerifiableSource(self.cc, tasks=num_tasks) + self.source = VerifiableSource(self.cc, tasks=num_tasks, throughput=100) self.source.start() self.sink = VerifiableSink(self.cc, tasks=num_tasks) self.sink.start() @@ -344,11 +344,14 @@ def test_bounce(self, clean): monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90, err_msg="Kafka Connect worker didn't successfully join group and start work") self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started) - # If this is a hard bounce, give additional time for the consumer groups to recover. If we don't give - # some time here, the next bounce may cause consumers to be shut down before they have any time to process - # data and we can end up with zero data making it through the test. - if not clean: - time.sleep(15) + + # Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are + # some cases where a restart can cause a rebalance to take the full length of the session timeout + # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup). + # If we don't give enough time for the group to stabilize, the next bounce may cause consumers to + # be shut down before they have any time to process data and we can end up with zero data making it + # through the test. + time.sleep(15) self.source.stop()