From 48c1977c7dfb8d5c741ae999f5a228fbdfb3874c Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Mon, 5 Feb 2024 19:47:28 +0800 Subject: [PATCH] fix(e2e): modify max block ms for hard bounce Signed-off-by: Curtis Wan --- .../services/transactional_message_copier.py | 6 +++++- .../tests/core/group_mode_transactions_test.py | 18 +++++++++++++----- .../tools/TransactionalMessageCopier.java | 10 ++++++++++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py index 675c7d7153..fbdce0e21f 100644 --- a/tests/kafkatest/services/transactional_message_copier.py +++ b/tests/kafkatest/services/transactional_message_copier.py @@ -48,7 +48,7 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group, input_topic, input_partition, output_topic, max_messages=-1, transaction_size=1000, transaction_timeout=None, enable_random_aborts=True, - use_group_metadata=False, group_mode=False): + use_group_metadata=False, group_mode=False, producer_block_timeout_ms=None): super(TransactionalMessageCopier, self).__init__(context, num_nodes) self.kafka = kafka self.transactional_id = transactional_id @@ -70,6 +70,7 @@ def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group, "org.apache.kafka.clients.producer": "TRACE", "org.apache.kafka.clients.consumer": "TRACE" } + self.producer_block_timeout_ms = producer_block_timeout_ms def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % TransactionalMessageCopier.PERSISTENT_ROOT, @@ -124,6 +125,9 @@ def start_cmd(self, node, idx): cmd += " --input-partition %s" % str(self.input_partition) cmd += " --transaction-size %s" % str(self.transaction_size) + if self.producer_block_timeout_ms is not None: + cmd += " --producer-block-timeout-ms %s" % str(self.producer_block_timeout_ms) + if self.transaction_timeout is not None: cmd += " --transaction-timeout %s" % str(self.transaction_timeout) diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py index 37a6da39d3..bb75578025 100644 --- a/tests/kafkatest/tests/core/group_mode_transactions_test.py +++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py @@ -112,7 +112,8 @@ def bounce_brokers(self, clean_shutdown): self.kafka.await_no_under_replicated_partitions() - def create_and_start_message_copier(self, input_topic, output_topic, transactional_id): + def create_and_start_message_copier(self, input_topic, output_topic, transactional_id, + producer_block_timeout_ms): message_copier = TransactionalMessageCopier( context=self.test_context, num_nodes=1, @@ -126,7 +127,8 @@ def create_and_start_message_copier(self, input_topic, output_topic, transaction transaction_size=self.transaction_size, transaction_timeout=self.transaction_timeout, use_group_metadata=True, - group_mode=True + group_mode=True, + producer_block_timeout_ms=producer_block_timeout_ms ) message_copier.start() wait_until(lambda: message_copier.alive(message_copier.nodes[0]), @@ -145,13 +147,14 @@ def bounce_copiers(self, copiers, clean_shutdown, timeout_sec=240): str(copier.progress_percent()))) copier.restart(clean_shutdown) - def create_and_start_copiers(self, input_topic, output_topic, num_copiers): + def create_and_start_copiers(self, input_topic, output_topic, num_copiers, producer_block_timeout_ms): copiers = [] for i in range(0, num_copiers): copiers.append(self.create_and_start_message_copier( input_topic=input_topic, output_topic=output_topic, - transactional_id="copier-" + str(i) + transactional_id="copier-" + str(i), + producer_block_timeout_ms=producer_block_timeout_ms )) return copiers @@ -227,9 +230,14 @@ def copy_messages_transactionally(self, failure_mode, bounce_target, It returns the concurrently consumed messages. """ + producer_block_timeout_ms = None + if failure_mode != "clean_bounce" and bounce_target == "brokers": + # change from the default 60s to 90s to wait for broker recovery + producer_block_timeout_ms = 90000 copiers = self.create_and_start_copiers(input_topic=input_topic, output_topic=output_topic, - num_copiers=num_copiers) + num_copiers=num_copiers, + producer_block_timeout_ms=producer_block_timeout_ms) concurrent_consumer = self.start_consumer(output_topic, group_id="concurrent_consumer") clean_shutdown = False diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 289327e570..5cf2e4389e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -151,6 +151,15 @@ private static ArgumentParser argParser() { .dest("transactionalId") .help("The transactionalId to assign to the producer"); + parser.addArgument("--producer-block-timeout-ms") + .action(store()) + .required(false) + .setDefault(60000) + .type(Integer.class) + .metavar("PRODUCER-BLOCK-TIMEOUT-MS") + .dest("producerBlockTimeoutMs") + .help("The maximum time in milliseconds the producer will block for during a send request."); + parser.addArgument("--enable-random-aborts") .action(storeTrue()) .type(Boolean.class) @@ -190,6 +199,7 @@ private static KafkaProducer createProducer(Namespace parsedArgs props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512"); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, parsedArgs.getInt("transactionTimeout")); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, parsedArgs.getInt("producerBlockTimeoutMs")); return new KafkaProducer<>(props); }