Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tests/kafkatest/services/transactional_message_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group,
input_topic, input_partition, output_topic, max_messages=-1,
transaction_size=1000, transaction_timeout=None, enable_random_aborts=True,
use_group_metadata=False, group_mode=False):
use_group_metadata=False, group_mode=False, producer_block_timeout_ms=None):
super(TransactionalMessageCopier, self).__init__(context, num_nodes)
self.kafka = kafka
self.transactional_id = transactional_id
Expand All @@ -70,6 +70,7 @@ def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group,
"org.apache.kafka.clients.producer": "TRACE",
"org.apache.kafka.clients.consumer": "TRACE"
}
self.producer_block_timeout_ms = producer_block_timeout_ms

def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % TransactionalMessageCopier.PERSISTENT_ROOT,
Expand Down Expand Up @@ -124,6 +125,9 @@ def start_cmd(self, node, idx):
cmd += " --input-partition %s" % str(self.input_partition)
cmd += " --transaction-size %s" % str(self.transaction_size)

if self.producer_block_timeout_ms is not None:
cmd += " --producer-block-timeout-ms %s" % str(self.producer_block_timeout_ms)

if self.transaction_timeout is not None:
cmd += " --transaction-timeout %s" % str(self.transaction_timeout)

Expand Down
18 changes: 13 additions & 5 deletions tests/kafkatest/tests/core/group_mode_transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def bounce_brokers(self, clean_shutdown):

self.kafka.await_no_under_replicated_partitions()

def create_and_start_message_copier(self, input_topic, output_topic, transactional_id):
def create_and_start_message_copier(self, input_topic, output_topic, transactional_id,
producer_block_timeout_ms):
message_copier = TransactionalMessageCopier(
context=self.test_context,
num_nodes=1,
Expand All @@ -126,7 +127,8 @@ def create_and_start_message_copier(self, input_topic, output_topic, transaction
transaction_size=self.transaction_size,
transaction_timeout=self.transaction_timeout,
use_group_metadata=True,
group_mode=True
group_mode=True,
producer_block_timeout_ms=producer_block_timeout_ms
)
message_copier.start()
wait_until(lambda: message_copier.alive(message_copier.nodes[0]),
Expand All @@ -145,13 +147,14 @@ def bounce_copiers(self, copiers, clean_shutdown, timeout_sec=240):
str(copier.progress_percent())))
copier.restart(clean_shutdown)

def create_and_start_copiers(self, input_topic, output_topic, num_copiers):
def create_and_start_copiers(self, input_topic, output_topic, num_copiers, producer_block_timeout_ms):
copiers = []
for i in range(0, num_copiers):
copiers.append(self.create_and_start_message_copier(
input_topic=input_topic,
output_topic=output_topic,
transactional_id="copier-" + str(i)
transactional_id="copier-" + str(i),
producer_block_timeout_ms=producer_block_timeout_ms
))
return copiers

Expand Down Expand Up @@ -227,9 +230,14 @@ def copy_messages_transactionally(self, failure_mode, bounce_target,

It returns the concurrently consumed messages.
"""
producer_block_timeout_ms = None
if failure_mode != "clean_bounce" and bounce_target == "brokers":
# change from the default 60s to 90s to wait for broker recovery
producer_block_timeout_ms = 90000
copiers = self.create_and_start_copiers(input_topic=input_topic,
output_topic=output_topic,
num_copiers=num_copiers)
num_copiers=num_copiers,
producer_block_timeout_ms=producer_block_timeout_ms)
concurrent_consumer = self.start_consumer(output_topic,
group_id="concurrent_consumer")
clean_shutdown = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ private static ArgumentParser argParser() {
.dest("transactionalId")
.help("The transactionalId to assign to the producer");

parser.addArgument("--producer-block-timeout-ms")
.action(store())
.required(false)
.setDefault(60000)
.type(Integer.class)
.metavar("PRODUCER-BLOCK-TIMEOUT-MS")
.dest("producerBlockTimeoutMs")
.help("The maximum time in milliseconds the producer will block for during a send request.");

parser.addArgument("--enable-random-aborts")
.action(storeTrue())
.type(Boolean.class)
Expand Down Expand Up @@ -190,6 +199,7 @@ private static KafkaProducer<String, String> createProducer(Namespace parsedArgs
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, parsedArgs.getInt("transactionTimeout"));
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, parsedArgs.getInt("producerBlockTimeoutMs"));

return new KafkaProducer<>(props);
}
Expand Down