-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Transaction pending ack persistent #8881
[Transaction] Transaction pending ack persistent #8881
Conversation
…tion_pendingack_persistent # Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a great work.
I believe this is a fundamental implementation in order to ship Transactions to the users
pulsar-broker/generate_protobuf.sh
Outdated
|
||
|
||
PROTOC=${PROTOC:-protoc} | ||
${PROTOC} --java_out=pulsar-broker/src/main/java pulsar-broker/src/main/proto/TransactionPendingAck.proto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this file ?
docker pull $IMAGE | ||
|
||
WORKDIR=/workdir | ||
docker run -i \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this file ?
@@ -161,6 +162,12 @@ void setReplicated(boolean replicated) { | |||
|
|||
@Override | |||
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { | |||
if (pendingAckHandle instanceof PendingAckHandleImpl) { | |||
if (!((PendingAckHandleImpl) pendingAckHandle).checkIfReady()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add checkIdReady
to PendingAckHandle
?
using instanceof
is not a good practice, we should leverage polymorphism
...r/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
Show resolved
Hide resolved
...r/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
Show resolved
Hide resolved
.../main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
Show resolved
Hide resolved
try { | ||
Thread.sleep(1); | ||
} catch (InterruptedException e) { | ||
//no-op |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not good,
the contract is to exit the current activity in case of InterrupedException or at least set Thread.interrupted
flag
...er/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
Show resolved
Hide resolved
InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() { | ||
@Override | ||
public void openCursorComplete(ManagedCursor cursor, Object ctx) { | ||
if (timer == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about creating a method in parent class MLPendingAckStoreProvider
void ensureTimer() {
if (timer == null) {
synchronized(this) {
initialize();
}
}
}
IMHO code will be more readable
this.timer.newTimeout(this, intervalTime, TimeUnit.MILLISECONDS); | ||
} catch (Exception e) { | ||
log.error("PendingAck timer task error!", e); | ||
if ("Cursor was already closed".equals(e.getCause().getMessage())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very error prone,
can we detect an instance of a specific class of exception ?
…tion_pendingack_persistent # Conflicts: # pom.xml # pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
…://github.com/congbobo184/pulsar into congbobo184_transaction_pendingack_persistent # Conflicts: # pom.xml # pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTimerTask.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
…tion_pendingack_persistent
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@gaoran10 Please help review this PR. |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
# Conflicts: # pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java delete pending ack recover exception
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java # pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@codelipenghui @eolivelli please review again. thanks. |
} catch (BrokerServiceException e) { | ||
if (e instanceof ConsumerBusyException) { | ||
log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, | ||
consumerName); | ||
} else if (e instanceof SubscriptionBusyException) { | ||
log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); | ||
} | ||
|
||
decrementUsageCount(); | ||
future.completeExceptionally(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to catch the exception here? If the exception occurs, you already have .exceptionally
to deal with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the close is Is a synchronous method, so we should catch the exception.
} catch (BrokerServiceException e) { | ||
if (e instanceof ConsumerBusyException) { | ||
}).exceptionally(e -> { | ||
if (e.getCause() instanceof ConsumerBusyException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to get the Cause first and use it in the followings.
|
||
public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pendingack"; | ||
|
||
private static final String PENDING_ACK_STORE_CURSOR_NAME = "pendingack"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static final String PENDING_ACK_STORE_CURSOR_NAME = "pendingack"; | |
private static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state"; |
|
||
private final ManagedCursor cursor; | ||
|
||
public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pendingack"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pendingack"; | |
public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; |
Motivation
in order to handle pending ack persistent.
implement
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)