New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10702; Skip bookkeeping of empty transactions #9632
Conversation
Addressing this problem more generally so that we can also handle small transactions is difficult because of the need to maintain the index. I believe there is still room for improvement by looking only at the aborted transactions, but this is more complex and I need to think it through. |
// without any associated data will not have any impact on the last stable offset | ||
// and would not need to be reflected in the transaction index. | ||
val completedTxn = updatedEntry.currentTxnFirstOffset.map { firstOffset => | ||
CompletedTxn(producerId, firstOffset, offset, endTxnMarker.controlType == ControlRecordType.ABORT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check my understanding? If we have a a non-empty currentTxnFirstOffset value (indicating a non-empty transaction), we'll return a valid CompletedTxn, otherwise we will return None. For the empty transactions this means that we aren't accumulating completed transactions. This saves us from having to call lastStableOffset on every empty completed transaction https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1240?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is right. Additionally, we are not adding the transaction to the list of started transactions which are accumulated in the ProducerAppendInfo
.
Note that this change is causing |
The changes appear to be good to me and resolves the performance issue. I'll hold off on a final verdict until you've investigated the failure. |
For a little more background about the
The idea is that the LSO is stuck if an append to the transaction index fails. However, because we have already updated producer state before the index write, we are left with an inconsistency. The LSO will reflect an ongoing transaction which is not reflected in any of the producer states. The test case that is failing is validating the behavior when the index write fails. It works like this:
The test fails because the second append no longer attempts to write to the transaction index. I can change the test of course, but I was disturbed about the underlying assumption that the write of the transaction marker can be retried on the So I think we need a separate jira to fix this issue. The question then is whether it should block this patch or not. I am thinking not at the moment. The test fails because there is no second append to the transaction index, but this is not required for correctness, and the LSO remains stuck as expected in any case. Basically I'd say we're no worse than before. I will add a commit which alters the test case so that it can pass and we can discuss tightening up the failure logic in a separate jira. |
Note I filed https://issues.apache.org/jira/browse/KAFKA-10778 to introduce synchronous log fencing after IO errors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji : Thanks for the PR. LGTM
…t-for-generated-requests * apache-github/trunk: (405 commits) KAFKA-6687: restrict DSL to allow only Streams from the same source topics (apache#9609) MINOR: Small cleanups in `AlterIsr` handling logic (apache#9663) MINOR: Increase unit test coverage of method ProcessorTopology#updateSourceTopics() (apache#9654) MINOR: fix reading SSH output in Streams system tests (apache#9665) KAFKA-10770: Remove duplicate defination of Metrics#getTags (apache#9659) KAFKA-10722: Described the types of the used state stores (apache#9607) KAFKA-10702; Skip bookkeeping of empty transactions (apache#9632) MINOR: Remove erroneous extra <code> in design doc (apache#9657) KAFKA-10736 Convert transaction coordinator metadata schemas to use g… (apache#9611) MINOR: Update vagrant/tests readme (apache#9650) KAFKA-10720: Document prohibition on header mutation by SMTs (apache#9597) KAFKA-10713: Stricter protocol parsing in hostnames (apache#9593) KAFKA-10565: Only print console producer prompt with a tty (apache#9644) MINOR: fix listeners doc to close <code> properly (apache#9655) MINOR: Remove unnecessary statement from WorkerConnector#doRun (apache#9653) KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (apache#9648) KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert (apache#9629) MINOR: Upgrade to Scala 2.13.4 (apache#9643) MINOR: Update build and test dependencies (apache#9645) MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (apache#9642) ... clients/src/main/java/org/apache/kafka/clients/ClientRequest.java clients/src/main/java/org/apache/kafka/clients/NetworkClient.java clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
Compacted topics can accumulate a large number of empty transaction markers as the data from the transactions gets cleaned. For each transaction, there is some bookkeeping that leaders and followers must do to keep the transaction index up to date. The cost of this overhead can degrade performance when a replica needs to catch up if the log has mostly empty or small transactions. This patch improves the cost by skipping over empty transactions since these will have no effect on the last stable offset and do not need to be reflected in the transaction index.
Committer Checklist (excluded from commit message)