[Transaction] Transaction timeout implementation.#8750
[Transaction] Transaction timeout implementation.#8750congbobo184 wants to merge 15 commits intoapache:masterfrom
Conversation
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
…184_transaction_timeout_implementation
…184_transaction_timeout_implementation
…184_transaction_timeout_implementation
…tion_timeout_implementation # Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java # pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java # pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java # pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java # pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
|
/pulsarbot run-failure-checks |
1 similar comment
|
/pulsarbot run-failure-checks |
| } else { | ||
| String[] strings = positionString.split(":"); | ||
| if (strings.length != 2) { | ||
| throw new IndexOutOfBoundsException(); |
There was a problem hiding this comment.
can you add a message ? like "invalid position "+positionString
it will ease debugging issues in the future
There was a problem hiding this comment.
sure, it is a good idea!
| throw new IndexOutOfBoundsException(); | ||
| } | ||
| long ledgerId = Long.parseLong(strings[0]); | ||
| long entryId = Long.parseLong(strings[1]); |
There was a problem hiding this comment.
can you please catch "NumberFormatException" and then rethrow a meaningful error message ?
| messageIdList.add(new MessageIdImpl( | ||
| messageIdData.getLedgerId(), messageIdData.getEntryId(), messageIdData.getPartition())); | ||
| messageIdData.recycle(); | ||
| //TODO when pending ack buffer finish this logic can remove |
There was a problem hiding this comment.
can we link an issue for this TODO ?
| }); | ||
| } catch (Exception e) { | ||
| resultFuture.completeExceptionally(e); | ||
| private void finalityEndTransaction(TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus) { |
There was a problem hiding this comment.
what about renaming "finalityEndTransaction" to "completeEndTransaction" ?
| } | ||
| this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition); | ||
|
|
||
| new Thread(() -> new TopicTransactionBufferReplayer(new TransactionBufferReplayCallback() { |
There was a problem hiding this comment.
can we give a name to this thread ?
probably it should be marked as "deamon"
we should also ensure that the thread ends when we are shutting down this TopicTransactionBuffer
otherwise we will have a zoombie thread that is still retaining references to this object and possibly corrupting its status.
one question isn't it too heavyweight to start a thread per each topic ?
There was a problem hiding this comment.
yes, you are right. we should control the thread start and close.
one question isn't it too heavyweight to start a thread per each topic ?
we async init TopicTransactionBuffer, do you have any better way to do it?
| positionsSort.add((PositionImpl) position); | ||
| } | ||
| } | ||
| }).start()).start(); |
There was a problem hiding this comment.
we should not start a Thread in a constructor, especially when the Thread refers to the object that is beeing created, we could have bad memory leaks in case we are not properly releasing all of the resources
There was a problem hiding this comment.
it is good idea, i will change it. :)
|
@eolivelli thank for you comment :) |
Motivation
in order to handle the transaction timeout.
when aborting and committing finish we should change the status in transaction coordinator.
implement
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)