Skip to content

Commit

Permalink
KAFKA-3782: Fix transient failure in connect distributed bounce test
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1650 from hachikuji/KAFKA-3782
  • Loading branch information
hachikuji authored and ewencp committed Jul 22, 2016
1 parent f1b37ee commit f5df136
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def test_bounce(self, clean):
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()

self.source = VerifiableSource(self.cc, tasks=num_tasks)
self.source = VerifiableSource(self.cc, tasks=num_tasks, throughput=100)
self.source.start()
self.sink = VerifiableSink(self.cc, tasks=num_tasks)
self.sink.start()
Expand All @@ -344,11 +344,14 @@ def test_bounce(self, clean):
monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
err_msg="Kafka Connect worker didn't successfully join group and start work")
self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
# If this is a hard bounce, give additional time for the consumer groups to recover. If we don't give
# some time here, the next bounce may cause consumers to be shut down before they have any time to process
# data and we can end up with zero data making it through the test.
if not clean:
time.sleep(15)

# Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are
# some cases where a restart can cause a rebalance to take the full length of the session timeout
# (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup).
# If we don't give enough time for the group to stabilize, the next bounce may cause consumers to
# be shut down before they have any time to process data and we can end up with zero data making it
# through the test.
time.sleep(15)


self.source.stop()
Expand Down

0 comments on commit f5df136

Please sign in to comment.