-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix transaction buffer lookup #10257
Fix transaction buffer lookup #10257
Conversation
CompletableFuture<ClientCnx> siFuture = getClientCnx(topic); | ||
siFuture.whenComplete((si, cause) -> { | ||
if (null != cause) { | ||
cache.asMap().remove(topic, siFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand this "remove"
probably it is not allowed to modify the cache inside the same loader, especially for the same key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may happen that when we execute "whenComplete" are still inside the execution of this loader, for instance if getClientCnx
returns a completed CompletableFuture and so probably we will encounter some bad error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall is good.
I left a question, please take a look before merging
op.recycle(); | ||
} | ||
} else { | ||
cache.refresh(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to use "refresh" or "invalidate" here ?
refresh tries to load a new value, asynchronously, if we received an error, it is probable that we will see an error during refresh as well
https://guava.dev/releases/19.0/api/docs/com/google/common/cache/LoadingCache.html#refresh(K)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"invalidate" is better, i will change. good check thanks.
@@ -90,6 +90,8 @@ | |||
import org.apache.pulsar.broker.admin.AdminResource; | |||
import org.apache.pulsar.broker.cache.ConfigurationCacheService; | |||
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; | |||
import org.apache.pulsar.broker.intercept.BrokerInterceptor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these two lines seems unrelated
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@eolivelli please review again. thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Motivation
now transaction buffer client handle transaction coordinator command by find topic address and create connect. it can't init
PersistentTopic
in broker, so the command will always fail.implement
PersistentTopic
.Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)