-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Transaction coordinator fence mechanism. #11357
[Transaction] Transaction coordinator fence mechanism. #11357
Conversation
stores.put(tcId, store); | ||
LOG.info("Added new transaction meta store {}", tcId); | ||
while (true) { | ||
CompletableFuture<Void> future = deque.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a busy loop.
Can we add a timeout to poll? This way the thread may suspend
tcLoadSemaphore.release(); | ||
|
||
while (true) { | ||
CompletableFuture<Void> future = deque.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
Show resolved
Hide resolved
|| e instanceof RequestTimeoutException | ||
|| e instanceof ManagedLedgerException | ||
|| e instanceof BrokerPersistenceException | ||
|| e instanceof LookupException | ||
|| e instanceof ReachMaxPendingOpsException | ||
|| e instanceof ConnectException; | ||
|| e instanceof ConnectException) | ||
&& !(e instanceof ManagedLedgerException.ManagedLedgerFencedException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is it possible that an object is an instance of the list of classes above and of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is ManagedLedgerException.ManagedLedgerNotFoundException and many more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add some "isRetryable" method to ManagedLedgerException
otherwise this code will be hard to be maintained
cc @merlimat
@@ -1675,6 +1677,17 @@ public boolean isTopicNsOwnedByBroker(TopicName topicName) { | |||
: CompletableFuture.completedFuture(null))); | |||
} | |||
}); | |||
if (getPulsar().getConfig().isTransactionCoordinatorEnabled() | |||
&& serviceUnit.getNamespaceObject().toString().equals(NamespaceName.SYSTEM_NAMESPACE.toString())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to not compare on the result of toString.
It may result to unpredictable results.
It is better to add an explicit method to verify the equality
final long requestId = command.getRequestId(); | ||
if (log.isDebugEnabled()) { | ||
command.getPartitionsList().forEach(partion -> | ||
log.debug("Receive add published partition to txn request {} " | ||
+ "from {} with txnId {}, topic: [{}]", requestId, remoteAddress, txnID, partion)); | ||
} | ||
|
||
if (!service.getPulsar().getConfig().isTransactionCoordinatorEnabled()) { | ||
BrokerServiceException.NotAllowedException ex = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create a method for this code that is repeated more times
Thanks for your contribution. For this PR, do we need to update docs? (The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are Pulsar 2.8.0 clients able to connect to a 2.9.0 if they enable transactions ?
it looks like there is a wire protocol change (tcClientConnect).
I am aware of projects that are starting to use transactions, as we declared them "ready" in 2.8.0.
We should not break compatibility with 2.8 clients
|| e instanceof RequestTimeoutException | ||
|| e instanceof ManagedLedgerException | ||
|| e instanceof BrokerPersistenceException | ||
|| e instanceof LookupException | ||
|| e instanceof ReachMaxPendingOpsException | ||
|| e instanceof ConnectException; | ||
|| e instanceof ConnectException) | ||
&& !(e instanceof ManagedLedgerException.ManagedLedgerFencedException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add some "isRetryable" method to ManagedLedgerException
otherwise this code will be hard to be maintained
cc @merlimat
@@ -691,7 +692,11 @@ protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authorit | |||
} | |||
// If the topic name is a partition name, no need to get partition topic metadata again | |||
if (topicName.isPartitioned()) { | |||
internalUnloadNonPartitionedTopic(asyncResponse, authoritative); | |||
if (topicName.toString().startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have a better way to detect a system topic, IIRC we added something about this
&& serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) { | ||
TransactionMetadataStoreService metadataStoreService = | ||
this.getPulsar().getTransactionMetadataStoreService(); | ||
// if have store belongs to this bundle, remove and close the store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo "if the store" and not "if have store"
@@ -396,6 +396,7 @@ private static void verifyCoordinatorStats(String state, | |||
private void initTransaction(int coordinatorSize) throws Exception { | |||
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), coordinatorSize); | |||
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); | |||
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please close this client, otherwise the new variable below will override the reference and we will leak a PulsarClient
@@ -66,11 +75,17 @@ protected void cleanup() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void testAddAndRemoveTransactionMetadataStore() { | |||
public void testCloseLock() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty test case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…mand. (#12301) ## Motivation now #11357 has merged, #11357 use `SUCCESS` command handle tcClientConnectRequest, This is not conducive to later expansion. So add a individual response for `CommandTcClientConnectRequest` ## implement add command ``` message CommandTcClientConnectResponse { required uint64 request_id = 1; optional ServerError error = 2; optional string message = 3; } ``` In order to ensure that the new client is compatible with the old broker, I update protocol version to 19. ``` v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse ``` if broker protocol version > 18 we should send TcClientConnectCommand if broker protocol version <= 18 we don't need to send TcClientConnectCommand
…mand. (apache#12301) ## Motivation now apache#11357 has merged, apache#11357 use `SUCCESS` command handle tcClientConnectRequest, This is not conducive to later expansion. So add a individual response for `CommandTcClientConnectRequest` ## implement add command ``` message CommandTcClientConnectResponse { required uint64 request_id = 1; optional ServerError error = 2; optional string message = 3; } ``` In order to ensure that the new client is compatible with the old broker, I update protocol version to 19. ``` v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse ``` if broker protocol version > 18 we should send TcClientConnectCommand if broker protocol version <= 18 we don't need to send TcClientConnectCommand
link PIP: 95
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (yes)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)