Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion tests/kafkatest/services/transactional_message_copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group,
input_topic, input_partition, output_topic, max_messages=-1,
transaction_size=1000, transaction_timeout=None, enable_random_aborts=True,
use_group_metadata=False, group_mode=False, producer_block_timeout_ms=None):
use_group_metadata=False, group_mode=False, producer_block_timeout_ms=None,
consumer_default_api_timeout_ms=None):
super(TransactionalMessageCopier, self).__init__(context, num_nodes)
self.kafka = kafka
self.transactional_id = transactional_id
Expand All @@ -71,6 +72,7 @@ def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group,
"org.apache.kafka.clients.consumer": "TRACE"
}
self.producer_block_timeout_ms = producer_block_timeout_ms
self.consumer_default_api_timeout_ms = consumer_default_api_timeout_ms

def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % TransactionalMessageCopier.PERSISTENT_ROOT,
Expand Down Expand Up @@ -140,6 +142,9 @@ def start_cmd(self, node, idx):
if self.group_mode:
cmd += " --group-mode"

if self.consumer_default_api_timeout_ms is not None:
cmd += " --consumer-default-api-timeout-ms %s" % str(self.consumer_default_api_timeout_ms)

if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
cmd += " 2>> %s | tee -a %s &" % (TransactionalMessageCopier.STDERR_CAPTURE, TransactionalMessageCopier.STDOUT_CAPTURE)
Expand Down
16 changes: 11 additions & 5 deletions tests/kafkatest/tests/core/group_mode_transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def bounce_brokers(self, clean_shutdown):
self.kafka.await_no_under_replicated_partitions()

def create_and_start_message_copier(self, input_topic, output_topic, transactional_id,
producer_block_timeout_ms, transaction_timeout):
producer_block_timeout_ms, transaction_timeout,
consumer_default_api_timeout_ms):
message_copier = TransactionalMessageCopier(
context=self.test_context,
num_nodes=1,
Expand All @@ -128,7 +129,8 @@ def create_and_start_message_copier(self, input_topic, output_topic, transaction
transaction_timeout=transaction_timeout,
use_group_metadata=True,
group_mode=True,
producer_block_timeout_ms=producer_block_timeout_ms
producer_block_timeout_ms=producer_block_timeout_ms,
consumer_default_api_timeout_ms=consumer_default_api_timeout_ms
)
message_copier.start()
wait_until(lambda: message_copier.alive(message_copier.nodes[0]),
Expand All @@ -148,15 +150,16 @@ def bounce_copiers(self, copiers, clean_shutdown, timeout_sec=240):
copier.restart(clean_shutdown)

def create_and_start_copiers(self, input_topic, output_topic, num_copiers, producer_block_timeout_ms,
transaction_timeout):
transaction_timeout, consumer_default_api_timeout_ms):
copiers = []
for i in range(0, num_copiers):
copiers.append(self.create_and_start_message_copier(
input_topic=input_topic,
output_topic=output_topic,
transactional_id="copier-" + str(i),
producer_block_timeout_ms=producer_block_timeout_ms,
transaction_timeout=transaction_timeout
transaction_timeout=transaction_timeout,
consumer_default_api_timeout_ms=consumer_default_api_timeout_ms
))
return copiers

Expand Down Expand Up @@ -233,16 +236,19 @@ def copy_messages_transactionally(self, failure_mode, bounce_target,
It returns the concurrently consumed messages.
"""
producer_block_timeout_ms = None
consumer_default_api_timeout_ms = None
transaction_timeout = self.transaction_timeout
if failure_mode != "clean_bounce" and bounce_target == "brokers":
# change from the default 60s to 90s to wait for broker recovery
producer_block_timeout_ms = 90000
transaction_timeout = 90000
consumer_default_api_timeout_ms = 90000
copiers = self.create_and_start_copiers(input_topic=input_topic,
output_topic=output_topic,
num_copiers=num_copiers,
producer_block_timeout_ms=producer_block_timeout_ms,
transaction_timeout=transaction_timeout)
transaction_timeout=transaction_timeout,
consumer_default_api_timeout_ms=consumer_default_api_timeout_ms)
concurrent_consumer = self.start_consumer(output_topic,
group_id="concurrent_consumer")
clean_shutdown = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ private static ArgumentParser argParser() {
.dest("useGroupMetadata")
.help("Whether to use the new transactional commit API with group metadata");

parser.addArgument("--consumer-default-api-timeout-ms")
.action(store())
.required(false)
.setDefault(60000)
.type(Integer.class)
.metavar("CONSUMER-DEFAULT-API-TIMEOUT-MS")
.dest("consumerDefaultApiTimeoutMs")
.help("The default API timeout in milliseconds for the consumer.");

return parser;
}

Expand Down Expand Up @@ -224,6 +233,7 @@ private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, parsedArgs.getInt("consumerDefaultApiTimeoutMs"));

return new KafkaConsumer<>(props);
}
Expand Down Expand Up @@ -407,7 +417,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
} catch (ProducerFencedException e) {
throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId), e);
} catch (KafkaException e) {
log.debug("Aborting transaction after catching exception", e);
log.error("Aborting transaction after catching exception", e);
abortTransactionAndResetPosition(producer, consumer);
}
}
Expand Down