Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] NOT_COORDINATOR Error always happen randomly when enable transactionCoordinator #1698

Open
fengyongshe opened this issue Jan 17, 2023 · 1 comment
Labels

Comments

@fengyongshe
Copy link

fengyongshe commented Jan 17, 2023

Describe the bug
When enable transactionCoordinator by setting kafkaTransactionCoordinatorEnabled=true , The exception keeps occured in kafka client
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 10000 milliseconds while awaiting InitProducerId

And debug msg:
[2023-01-17 01:07:14,223] DEBUG [Producer clientId=producer-xgq0e21b, transactionalId=xgq0e21b] Received INIT_PRODUCER_ID response from node 1599868786 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-xgq0e21b, correlationId=62): InitProducerIdResponseData(throttleTimeMs=0, errorCode=16, producerId=-1, producerEpoch=-1) (org.apache.kafka.clients.NetworkClient)

The errocode=16 means NOT_COORDINATOR

This is caused by inconsistently function to generate the partitionId in the transaction , there are two place to use partition id

  1. Get topic broker for TransactionCoordinator(based on __transaction_state-{partition-id}) , the partition id is generated by
 TransactionCoordinator
  public static int partitionFor(String transactionalId, int transactionLogNumPartitions) {
        return MathUtils.signSafeMod(
                transactionalId.hashCode(),
                transactionLogNumPartitions
        );
    }

  1. Get TransactionMeta from cache according to the partition id
TransactionStateManager
 public int partitionFor(String transactionalId) {
        return Utils.abs(transactionalId.hashCode()) % transactionTopicPartitionCount;
   }

For Example, when setting props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "qlf8pp81") in kafka client , The partitionId in the two place FOR "qlf8pp81" is different :
16 for getting topic broker for TransactionCoordinator
34 for getting TransactionMetadata from Transaction Cache

The inconsistently function to generate the partitionId will trigger the exception in transactionStateManager

             Map<String, TransactionMetadata> metadataMap = transactionMetadataCache.get(partitionId);
               if (metadataMap == null) {
                   return new ErrorsAndData<>(Errors.NOT_COORDINATOR, Optional.empty());
               }

To Reproduce
Steps to reproduce the behavior:
0. Enable transaction by setting :kafkaTransactionCoordinatorEnabled=true and transactionLogNumPartitions=50

  1. Choose the different partition Id generated for same transaction id , eg: qlf8pp81, xgq0e21b。 Ensure that there is no transactionMetadata for the txid in transactionMetadataCache(Map<Integer {partitionId}, Map<tx_id, TransactionMetadata>>) , you can add debug info in TransactionStateManager to print the cache :
    log.debug("The Cache content before Add Transaction State: {}", transactionMetadataCache);
    The content in cache for example: {2={}, 22={}, 6={}, 26={}}

  2. Kafka Example use transaction , kernel node as below
    .....
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xgq0e21b") ;
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    producer.initTransactions();
    producer.beginTransaction();
    Future metadataFuture = producer.send(new ProducerRecord<>("cptest", "world"));
    ....
    producer.commitTransaction();
    producer.close();

  3. Run the test , the exception will occure , add the debug in transactionStateManater

2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - The partitionId : 44 for transactionId: xgq0e21b
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - The Cache content before Add Transaction State: {2={}, 22={}, 6={}, 26={}}
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - Transaction Metadata Cache content for partition Id:44 for metadataMap:null
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - There is not coordiantor for partitionId: 44
2023-01-17T01:06:47,850+0800 [pulsar-io-4-16] DEBUG io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager - The Content in transactionMetadataCache : {2={}, 22={}, 6={}, 26={}}

image

Expected behavior
Use the same function to generate the partition id (Function: partitionFor(String transactionalId) )in TransactionCoordinator and TransactionStateManager . The bug is resolved.

When load bundle to a broker , kop.NamespaceBundleOwnershipListenerImpl will trigger to put the empty TransactionMetadataMap for the transaction state partition belong to the the bundle.

@nextadmin-beep
Copy link

i have the same problem

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants