diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py index fbdce0e21f..2efadd2fc1 100644 --- a/tests/kafkatest/services/transactional_message_copier.py +++ b/tests/kafkatest/services/transactional_message_copier.py @@ -48,7 +48,8 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group, input_topic, input_partition, output_topic, max_messages=-1, transaction_size=1000, transaction_timeout=None, enable_random_aborts=True, - use_group_metadata=False, group_mode=False, producer_block_timeout_ms=None): + use_group_metadata=False, group_mode=False, producer_block_timeout_ms=None, + consumer_default_api_timeout_ms=None): super(TransactionalMessageCopier, self).__init__(context, num_nodes) self.kafka = kafka self.transactional_id = transactional_id @@ -71,6 +72,7 @@ def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group, "org.apache.kafka.clients.consumer": "TRACE" } self.producer_block_timeout_ms = producer_block_timeout_ms + self.consumer_default_api_timeout_ms = consumer_default_api_timeout_ms def _worker(self, idx, node): node.account.ssh("mkdir -p %s" % TransactionalMessageCopier.PERSISTENT_ROOT, @@ -140,6 +142,9 @@ def start_cmd(self, node, idx): if self.group_mode: cmd += " --group-mode" + if self.consumer_default_api_timeout_ms is not None: + cmd += " --consumer-default-api-timeout-ms %s" % str(self.consumer_default_api_timeout_ms) + if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) cmd += " 2>> %s | tee -a %s &" % (TransactionalMessageCopier.STDERR_CAPTURE, TransactionalMessageCopier.STDOUT_CAPTURE) diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py index 62361130af..750210f0c1 100644 --- a/tests/kafkatest/tests/core/group_mode_transactions_test.py +++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py @@ -113,7 +113,8 @@ def bounce_brokers(self, clean_shutdown): self.kafka.await_no_under_replicated_partitions() def create_and_start_message_copier(self, input_topic, output_topic, transactional_id, - producer_block_timeout_ms, transaction_timeout): + producer_block_timeout_ms, transaction_timeout, + consumer_default_api_timeout_ms): message_copier = TransactionalMessageCopier( context=self.test_context, num_nodes=1, @@ -128,7 +129,8 @@ def create_and_start_message_copier(self, input_topic, output_topic, transaction transaction_timeout=transaction_timeout, use_group_metadata=True, group_mode=True, - producer_block_timeout_ms=producer_block_timeout_ms + producer_block_timeout_ms=producer_block_timeout_ms, + consumer_default_api_timeout_ms=consumer_default_api_timeout_ms ) message_copier.start() wait_until(lambda: message_copier.alive(message_copier.nodes[0]), @@ -148,7 +150,7 @@ def bounce_copiers(self, copiers, clean_shutdown, timeout_sec=240): copier.restart(clean_shutdown) def create_and_start_copiers(self, input_topic, output_topic, num_copiers, producer_block_timeout_ms, - transaction_timeout): + transaction_timeout, consumer_default_api_timeout_ms): copiers = [] for i in range(0, num_copiers): copiers.append(self.create_and_start_message_copier( @@ -156,7 +158,8 @@ def create_and_start_copiers(self, input_topic, output_topic, num_copiers, produ output_topic=output_topic, transactional_id="copier-" + str(i), producer_block_timeout_ms=producer_block_timeout_ms, - transaction_timeout=transaction_timeout + transaction_timeout=transaction_timeout, + consumer_default_api_timeout_ms=consumer_default_api_timeout_ms )) return copiers @@ -233,16 +236,19 @@ def copy_messages_transactionally(self, failure_mode, bounce_target, It returns the concurrently consumed messages. """ producer_block_timeout_ms = None + consumer_default_api_timeout_ms = None transaction_timeout = self.transaction_timeout if failure_mode != "clean_bounce" and bounce_target == "brokers": # change from the default 60s to 90s to wait for broker recovery producer_block_timeout_ms = 90000 transaction_timeout = 90000 + consumer_default_api_timeout_ms = 90000 copiers = self.create_and_start_copiers(input_topic=input_topic, output_topic=output_topic, num_copiers=num_copiers, producer_block_timeout_ms=producer_block_timeout_ms, - transaction_timeout=transaction_timeout) + transaction_timeout=transaction_timeout, + consumer_default_api_timeout_ms=consumer_default_api_timeout_ms) concurrent_consumer = self.start_consumer(output_topic, group_id="concurrent_consumer") clean_shutdown = False diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 5cf2e4389e..20361480ca 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -182,6 +182,15 @@ private static ArgumentParser argParser() { .dest("useGroupMetadata") .help("Whether to use the new transactional commit API with group metadata"); + parser.addArgument("--consumer-default-api-timeout-ms") + .action(store()) + .required(false) + .setDefault(60000) + .type(Integer.class) + .metavar("CONSUMER-DEFAULT-API-TIMEOUT-MS") + .dest("consumerDefaultApiTimeoutMs") + .help("The default API timeout in milliseconds for the consumer."); + return parser; } @@ -224,6 +233,7 @@ private static KafkaConsumer createConsumer(Namespace parsedArgs "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, parsedArgs.getInt("consumerDefaultApiTimeoutMs")); return new KafkaConsumer<>(props); } @@ -407,7 +417,7 @@ public void onPartitionsAssigned(Collection partitions) { } catch (ProducerFencedException e) { throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId), e); } catch (KafkaException e) { - log.debug("Aborting transaction after catching exception", e); + log.error("Aborting transaction after catching exception", e); abortTransactionAndResetPosition(producer, consumer); } }