Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9087 Replace log high watermark by future log high watermark wh… #13075

Merged
merged 8 commits into from Jan 7, 2023

Conversation

chia7712
Copy link
Contributor

@chia7712 chia7712 commented Jan 4, 2023

https://issues.apache.org/jira/browse/KAFKA-9087

[2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to (kafka.server.ReplicaAlterLogDirsThread)
 org.apache.kafka.common.KafkaException: Error processing data for partition metrics_timers-35 offset 4224887
 at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342)
 at scala.Option.foreach(Option.scala:274)
 at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300)
 at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
 at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
 at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299)
 at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
 at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
 at scala.Option.foreach(Option.scala:274)
 at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
 Caused by: java.lang.IllegalStateException: Offset mismatch for the future replica metrics_timers-35: fetched offset = 4224887, log end offset = 0.
 at kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107)
 at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311)

The race condition of processing LeaderAndIsrRequest and AlterReplicaLogDirsRequest causes above error (on V2 message format). Also, the error can be reproduced easily on V1 since there is no epoch cache. The ReplicaAlterLogDirsThread checks the offset of “future log” rather than “log. Hence, here is my two cents, we can replace log.highWatermark by futureLog.highWatermark to resolve this issue. I tested it on our cluster and it works well (on both V1 and V2).

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ijuma
Copy link
Contributor

ijuma commented Jan 4, 2023

Is it possible to add a test?

@ijuma ijuma requested a review from dajac January 4, 2023 23:57
@jolshan
Copy link
Contributor

jolshan commented Jan 5, 2023

So the error occurs when the fetch offset is not the futureLog's log end offset.

Just curious -- is this because the log's highwatermark is lower or higher than the futureLog's?

It seems that the value we put into the InitialFetchState is the first offset we fetch from, so just wanted to clarify the race and why this fixes the issue. (Not doubting you, just trying to understand 😄 )

@junrao
Copy link
Contributor

junrao commented Jan 5, 2023

@jolshan : It seems that this is a regression introduced in #6841. When a future log is just created, the current log's HWM could be higher than the future replica's HWM (since it's bounded by its log end offset). When the future replica starts fetching, we want it to fetch from its local HWM, not the current log's HWM. Otherwise, there will be holes in the future log.

@chia7712
Copy link
Contributor Author

chia7712 commented Jan 5, 2023

is this because the log's highwatermark is lower or higher than the futureLog's?

higher, and thanks to @junrao for the great explanation.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for the PR. Just one minor comment.

// this method should use hw of future log to create log dir fetcher. Otherwise, it causes offset mismatch error
val result = rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None)
assertEquals(1, result.size)
assertEquals(0L, result(new TopicPartition(topic, 0)).initOffset)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Please take a look at line#258

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for the updated PR. A couple of more comments.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for the updated PR. LGTM if the tests pass.

@chia7712
Copy link
Contributor Author

chia7712 commented Jan 7, 2023

unrelated error

Build / JDK 11 and Scala 2.13 / kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String).quorum=kraft | 26 秒 | 1
Build / JDK 11 and Scala 2.13 / kafka.admin.TopicCommandIntegrationTest.testTopicWithCollidingCharDeletionAndCreateAgain(String).quorum=kraft
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorTopicsIntegrationTest.testGetActiveTopics](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13075/7/testReport/org.apache.kafka.connect.integration/ConnectorTopicsIntegrationTest/Build___JDK_11_and_Scala_2_13___testGetActiveTopics/)

will merge it

@chia7712 chia7712 merged commit b99be76 into apache:trunk Jan 7, 2023
rajinisivaram added a commit to confluentinc/kafka that referenced this pull request Jan 9, 2023
…9-jan-2023

* apache/trunk: (16 commits)
  KAFKA-14570: Fix parenthesis in verifyFullFetchResponsePartitions output (apache#13072)
  MINOR: Remove public mutable fields from ProducerAppendInfo (apache#13091)
  KAFKA-14558: Move/Rewrite LastRecord, TxnMetadata, BatchMetadata, ProducerStateEntry, and ProducerAppendInfo to the storage module (apache#13043)
  KAFKA-14535: Fix flaky EndToEndAuthorization tests which were sensitive to ACL change reordering (apache#13086)
  KAFKA-9087 Replace log high watermark by future log high watermark wh… (apache#13075)
  MINOR: add error reason when controller failed to handle events (apache#13050)
  MINOR: doc: note how JDK-8136913 can affect client SASL (apache#13071)
  2023 (apache#13083)
  KAFKA-14279; Add 3.3.x to core compatibility tests (apache#13076)
  MINOR Fixed doc generation for LogConfig class in genTopicConfigDocs. (apache#13079)
  ...
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
apache#13075)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>
@chia7712 chia7712 deleted the KAFKA-9087-1 branch March 25, 2024 15:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants