-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14053: Transactional producer should bump the epoch and skip ab… #12392
base: trunk
Are you sure you want to change the base?
KAFKA-14053: Transactional producer should bump the epoch and skip ab… #12392
Conversation
…orting when a client side timeout is encountered When a transactional batch encounters delivery or request timeout, it can still be in-flight. In this situation, if the transaction is aborted, the abort marker might get appended to the log earlier than the in-flight batch. This can cause the LSO of a partition to be blocked infinitely, or can violate the processing guarantees. To avoid this situation, on a client side timeout, the transactional producer should skip aborting (EndTxnRequest), and bump the epoch instead. Since this is a fencing bump, the producer cannot safely continue, resulting in a fatal error.
bcb201a
to
99f6fad
Compare
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
@@ -728,18 +779,11 @@ synchronized void maybeResolveSequences() { | |||
} else { | |||
// We would enter this branch if all in flight batches were ultimately expired in the producer. | |||
if (isTransactional()) { | |||
// For the transactional producer, we bump the epoch if possible, otherwise we transition to a fatal error | |||
// For the transactional producer, we bump the epoch if possible, then transition to a fatal error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a behaviour change here, when the transactional producer reaches this state, we'll do an epoch bump and then it'll be a fatal error.
Could you explain how it's changed actually? What's the difference between flipping the epochBumpRequired flag and go to abortable, and going to fatalbumpable?
Was the producer still usable after abortable transition (and the handled abort)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing epochBumpRequired flag is used to bump the epoch after an abort. It is usually used to reset the sequence numbers for the producer, and keeps the producer in a usable state.
In the case I'm trying to fix, we have to skip the abort, and immediately go to the bump. This means that the producer will bump during a transaction, which is handles as a fence by the coordinator. Because of this, there is no way to safely get a new (bumped) epoch with this specific producer instance, and we need to handle this case as a fatal error.
After the InitProducerIDRequest was successful, we transition into the old FATAL_ERROR state.
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Outdated
Show resolved
Hide resolved
…ed when in FATAL_BUMPABLE_ERROR state, fixed transaction state machine transition, fixed error message
3edb282
to
cc29862
Compare
…RROR state in Sender
@hachikuji would you please review this PR as well? |
@showuon @artemlivshits Can you please take a look at this PR? This is the issue we had a thread about on the dev list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So talked this over with Daniel on a call and I approve the changes in this form. The summary is that although there is another option to solve this problem without running into a fatal error state, that would require increasing the epoch in the producer (+working around when it reaches short.max_value) which to me seems like something we shouldn't do (and leave this functionality with the brokers). Therefore overall running into fatal state seems like a safer and easier option from clean code perspectives, although keeping the producer alive might be slightly better from the users' perspective (but they need to manage errors anyway so it doesn't seem to be a huge problem).
I'll review it this week. Sorry for the delay. |
// If an epoch bump is possible, try to fence the current transaction by bumping | ||
if (canBumpEpoch()) { | ||
log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch); | ||
InitProducerIdRequestData requestData = new InitProducerIdRequestData() | ||
.setTransactionalId(transactionalId) | ||
.setTransactionTimeoutMs(transactionTimeoutMs) | ||
.setProducerId(producerIdAndEpoch.producerId) | ||
.setProducerEpoch(producerIdAndEpoch.epoch); | ||
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), | ||
false, true); | ||
enqueueRequest(handler); | ||
} else { | ||
log.info("Cannot bump epoch, transitioning into fatal error"); | ||
transitionToFatalError(failure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me make sure I understand your problem and solution. Are you saying the issue happens only when the "timed out transaction ID" is not re-used, and the abort marker arrived earlier than transaction records. Is my understanding correct?
And what we are trying to do is to force bump the epoch when encountering timeout exception, to let the fence mechanism help us abort previous in-flight transactions. And next, we enter fatal error
state as before. Is that right?
If so, then I have a question: what if the initPid request failed (i.e. failed to bump the epoch), what will happen? The pending transactions will still occur?
Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of order messages can occur even when the transactional.id is reused. The issue I encountered was caused by a valid producer aborting the transaction "too soon" - where too soon means that all of the last batches were timed out due to the delivery.timeout.ms, but they were still in-flight. So the issue occurs with a single producer, without any fencing or transactional.id reuse.
Yes, that summary is right. Bump to fence the in-flight requests, then discard the producer.
If the initPid fails, there can be 2 scenarios:
- Transaction times out due to transaction.timeout.ms - in this case, the coordinator bumps the epoch, practically achieving the same fencing I am trying to implement here.
- Transactional.id is reused by a new producer instance - in this case, the usual fencing happens.
So I believe that the essential change here is that the producer must not abort when encountering a client side timeout.
As for the producer going into fatal state - I was thinking about a possible workaround for that, and I think the producer can be kept in a usable state, but it involves the epoch being increased on the client side. If this fatal state solution is not acceptable, I can work on another version of the change which involves this client-side bump. I was hesitant to do so because I wasn't sure if the protocol allows such things, but since the idempotent producer does the same, my guess is that it is safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @showuon, do you think this explanation and solution makes sense? or should I look into the other solution, in which the producer stays in a usable state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urbandan , thanks for the explanation. I think this is the best solution we can think of so far. But I'd like to hear @hachikuji @dajac @guozhangwang 's thoughts. Thanks.
.setProducerEpoch(producerIdAndEpoch.epoch); | ||
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), | ||
false, true); | ||
enqueueRequest(handler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We enqueue request here, and when will we send out the request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point we enter the FATAL_BUMPABLE_ERROR, which still allows the Sender to send requests - see the changes in the Sender class and in TransactionManager#maybeTerminateRequestWithError.
If the producer is closed gracefully, we will try to send this last InitProducerID request. After the InitProducerID was successful, we transition into FATAL_ERROR, and won't send anything else.
assertTrue(transactionManager.hasAbortableError()); | ||
assertTrue(transactionManager.hasOngoingTransaction()); | ||
assertTrue(transactionManager.hasFatalBumpableError()); | ||
assertFalse(transactionManager.hasOngoingTransaction()); | ||
assertFalse(transactionManager.isCompleting()); | ||
assertTrue(transactionManager.transactionContainsPartition(tp0)); | ||
|
||
TransactionalRequestResult abortResult = transactionManager.beginAbort(); | ||
|
||
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch); | ||
prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1)); | ||
runUntil(abortResult::isCompleted); | ||
assertTrue(abortResult.isSuccessful()); | ||
assertFalse(transactionManager.hasOngoingTransaction()); | ||
assertFalse(transactionManager.transactionContainsPartition(tp0)); | ||
assertThrows(KafkaException.class, () -> transactionManager.beginAbort()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, it looks like after this patch, when batch expiration or timeout error, the producer will enter fatal error state after bumping epoch. But before this patch, the we'll abort it and continue the transaction work. Is that right?
Sorry, I didn't realize this situation. This will impact current user behavior, so we need more discussion. I'll ping some experts in this PR, and hope they will help provide comments.
cc @artemlivshits @ijuma
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is correct. That abort is causing the issue. The producer just assumes that the batches failed, but it is possible that they are still in-flight. When that happens, the abort marker might get processed earlier than the batch. I've seen this in action, and it corrupts the affected partition permanently.
If it is better to keep the producer in a usable state, I can give it a shot. I had one experiment in which I tried keeping the producer usable by increasing the epoch on the client side once. I believe that it is safe to do as the fencing bump will increase the epoch, and the coordinator will never return that to any clients.
Please let me know what you think @ijuma @artemlivshits @showuon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally like the solution to make the producer entering fatal error state. But I'd like to hear others' opinion since it will affect producer's behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering, is there a way that we could mitigate this server side? Is it possible to prevent writing the late records after the abort marker? I might be missing something though, so let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan would that mean that each record sent by the producer would have to include the id of a specific transaction (not just the transactional id of the producer?)
If the transactional producer sends a transaction id to the coordinator with each record rather than just the producer's id (in which case the coordinator determines whether there is a transaction going on by the order of Start Transaction, Send Record, End Transaction), then this could work. Otherwise, I don't think it's possible to mitigate on the server side.
@showuon I believe that I've seen this bug cause violation of EOS with a transactional producer in the case of a broker failure (90% sure). I'd much rather deal with the producer crashing than deal with incorrect behavior. However, if it's possible to fix this issue without causing a producer crash that would be really nice (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking for some longer term work we could potentially distinguish transactions by having perhaps having a bit of extra state server-side and by bumping the epoch after each transaction. But maybe this is too large of a change for now.
I think you also came to the conclusion of an epoch bump but through a different path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan not sure about the impact it would have on the overhead of transactions, but having a unique ID per transaction doesn't really seem necessary to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not suggesting a unique transactional ID, but simply bumping the epoch would give us a unique identifier for the transaction in combination with the producer and/or transaction ID. Again -- this is something I'm considering as a longer term change, and there could be flaws.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan sorry for the confusion, I understood that the uniqueness would be achieved through the epoch bumps - I just don't really see the added value of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. No worries. I think what I was thinking of would require a bit more effort -- but the idea is that if the server knew the difference between individual transactions, then it could make better decisions about new writes and markers. (Ie, potentially we could avoid appending the records of an old transaction after a marker for that transaction is appended.) But I also think this idea needs a bit more thought and could require more work than what you are proposing here.
@artemlivshits @ijuma @hachikuji Can you please take a look at this PR? Trying to fix a bug in the transactional producer. Thanks in advance! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. We'd probably need some committers (I'm not one) to look at it since it involves a slight change to txn producer behavior (fatal error when there's a timeout with in-flight messages as opposed to just continuing on).
Thank you for the PR, Daniel!
assertTrue(transactionManager.hasAbortableError()); | ||
assertTrue(transactionManager.hasOngoingTransaction()); | ||
assertTrue(transactionManager.hasFatalBumpableError()); | ||
assertFalse(transactionManager.hasOngoingTransaction()); | ||
assertFalse(transactionManager.isCompleting()); | ||
assertTrue(transactionManager.transactionContainsPartition(tp0)); | ||
|
||
TransactionalRequestResult abortResult = transactionManager.beginAbort(); | ||
|
||
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch); | ||
prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1)); | ||
runUntil(abortResult::isCompleted); | ||
assertTrue(abortResult.isSuccessful()); | ||
assertFalse(transactionManager.hasOngoingTransaction()); | ||
assertFalse(transactionManager.transactionContainsPartition(tp0)); | ||
assertThrows(KafkaException.class, () -> transactionManager.beginAbort()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jolshan would that mean that each record sent by the producer would have to include the id of a specific transaction (not just the transactional id of the producer?)
If the transactional producer sends a transaction id to the coordinator with each record rather than just the producer's id (in which case the coordinator determines whether there is a transaction going on by the order of Start Transaction, Send Record, End Transaction), then this could work. Otherwise, I don't think it's possible to mitigate on the server side.
@showuon I believe that I've seen this bug cause violation of EOS with a transactional producer in the case of a broker failure (90% sure). I'd much rather deal with the producer crashing than deal with incorrect behavior. However, if it's possible to fix this issue without causing a producer crash that would be really nice (:
@dajac @hachikuji Any chance you can take a look at this? This is a painful issue in transactional producers, with some serious consequences (partition corruption). |
Can you elaborate a bit more on this idea? Is this the implementation in the PR now, or was an idea to avoid the fatal error? |
@jolshan It is an idea, the first version of the PR was trying to implement that, but the current state of the PR is based on the fatal state. The idea about keeping the producer in a reusable state is kind of tricky. The issue is that to fix the bug, we need to bump the epoch instead of aborting. In short, as I wrote in the other thread: |
Thanks for all the discussion here and sorry for the late arrival. I have seen this issue in practice as well, often in the context of hanging transactions. The late-arriving I think the basic idea in the patch here is to bump the epoch when we abort a transaction in order to fence off writes that are in inflight. Do I have that right? This is in the spirit of an idea that's been on my mind for a while. The only difference is that I was considering a server-side implementation. The basic thought is to have the coordinator bump the epoch after every EndTxnResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch The tuple of There is still a hole, however, which I think @jolshan was describing above. We cannot assume clients will always add partitions correctly to the transaction before beginning to write to the partition. We need a server-side validation. Otherwise, hanging transactions will always be possible. We have seen this so many times by now. My suggestion here is to let us get a KIP out in the couple weeks with a good server-side solution. We may still need a client-side approach for compatibility with older brokers though, so maybe we can leave the PR open. |
@hachikuji Thanks for the feedback. Overall I agree that a server-side solution might be safer, and I'm interested in the KIP. |
I'm not sure if the fix addresses the following scenario:
|
@artemlivshits The scenario you mentioned is already covered, even without this change - when a transaction times out, the transaction coordinator bumps the epoch, so it already fences off the "stuck" produce request. |
@urbandan By the way, KIP-890 is now available to review 😄 |
…orting when a delivery timeout is encountered
When a transactional batch encounters delivery or request timeout, it can still be in-flight. In this situation, if the transaction is aborted, the abort marker might get appended to the log earlier than the in-flight batch. This can cause the LSO of a partition to be blocked infinitely, or can violate the processing guarantees.
To avoid this situation, on a client side timeout, the transactional producer should skip aborting (EndTxnRequest), and bump the epoch instead. Since this is a fencing bump, the producer cannot safely continue, resulting in a fatal error.
Committer Checklist (excluded from commit message)