Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15511: Catch CorruptIndexException instead of CorruptRecordException #14459

Merged
merged 4 commits into from Sep 29, 2023

Conversation

iit2009060
Copy link
Contributor

@iit2009060 iit2009060 commented Sep 27, 2023

Scenario
The bug occurs when RemoteIndexCache does not have an entry but CorruptedIndex File exists in the desired path.
Current behaviour
It will throw the CorruptIndex Exception and does not recover or replace the CorruptedIndexFile with new File fetch from remote storage.
Desired Behaviour
It should catch the CorruptIndex Exception and should refetch it from remote storage and overwrite the corrupted file.

Testing
Written Unit Test to test the above scenario

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Left some minor comments.

@divijvaidya divijvaidya added the tiered-storage Pull requests associated with KIP-405 (Tiered Storage) label Sep 27, 2023
@@ -310,7 +310,7 @@ private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegment
if (Files.exists(indexFile.toPath())) {
try {
index = readIndex.apply(indexFile);
} catch (CorruptRecordException ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can CorruptRecordException be thrown in some scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hangleton The RemoteIndexCache caches only index files which needs to be checked for corruption. There is no such record concept in RemoteIndexCache.
@divijvaidya @satishd can add more details here.

Copy link
Contributor

@Hangleton Hangleton Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I wanted to confirm that it is indeed the case although we are dealing with indexes here. The rationale being that CorruptRecordException may have been introduced purposefully to cover a use case. Was this not covered by a test in the initial PR, and if it was, did the test make an incorrect assumption, or exercised an invalid (that is, impossible) scenario?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this not covered by a test in the initial PR

@Hangleton no we don't have a test case to execute this code. In fact, the bug fixed this PR was discovered while working on the task [1] to add more unit test for remote index cache.

[1] https://issues.apache.org/jira/browse/KAFKA-15169

@iit2009060 iit2009060 force-pushed the KAFKA-15511 branch 2 times, most recently from e81ca46 to 5bc38e4 Compare September 27, 2023 16:31
@divijvaidya
Copy link
Contributor

Test failures are unrelated to Tiered Storage code.

[Build / JDK 21 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing__/)
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/org.apache.kafka.connect.integration/ConnectWorkerIntegrationTest/Build___JDK_11_and_Scala_2_13___testBrokerCoordinator/)
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/org.apache.kafka.connect.integration/ConnectorRestartApiIntegrationTest/Build___JDK_11_and_Scala_2_13___testMultiWorkerRestartOnlyConnector/)
[Build / JDK 11 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_11_and_Scala_2_13___testNoConsumeWithDescribeAclViaAssign_String__quorum_kraft/)
[Build / JDK 11 and Scala 2.13 / kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/kafka.server/DescribeClusterRequestTest/Build___JDK_11_and_Scala_2_13___testDescribeClusterRequestExcludingClusterAuthorizedOperations_String__quorum_kraft/)
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_8_and_Scala_2_12___testReplicateSourceDefault__/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_8_and_Scala_2_12___testReplicateSourceDefault___2/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testOffsetSyncsTopicsOnTarget()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testOffsetSyncsTopicsOnTarget__/)
[Build / JDK 8 and Scala 2.12 / kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitions(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDescribeUnderMinIsrPartitions_String__quorum_kraft/)
[Build / JDK 8 and Scala 2.12 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoGroupAcl(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_8_and_Scala_2_12___testNoGroupAcl_String__quorum_kraft/)
[Build / JDK 8 and Scala 2.12 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserFails(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_8_and_Scala_2_12___testDescribeTokenForOtherUserFails_String__quorum_kraft/)
[Build / JDK 8 and Scala 2.12 / kafka.api.GroupEndToEndAuthorizationTest.testProduceConsumeTopicAutoCreateTopicCreateAcl(String).quorum=zk](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/kafka.api/GroupEndToEndAuthorizationTest/Build___JDK_8_and_Scala_2_12___testProduceConsumeTopicAutoCreateTopicCreateAcl_String__quorum_zk/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/6/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/)

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change looks good to me.

@divijvaidya
Copy link
Contributor

@Hangleton @satishd since you folks participated in the review for this PR, do you have any additional comments?

Copy link
Collaborator

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for fixing this issue!

Do you know the reason behind CorruptIndexException directly extends the RuntimeException instead of KafkaException?

@@ -598,4 +615,13 @@ class RemoteIndexCacheTest {
timeIndex.flush()
}
}

private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
val pw = new PrintWriter((remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME),rlsMetadata)))
Copy link
Collaborator

@kamalcph kamalcph Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after comma and remove extra brackets

@divijvaidya
Copy link
Contributor

Unrelated test failures

[Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchFromFollowerIntegrationTest.testFollowerCompleteDelayedFetchesOnReplication(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/7/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testFollowerCompleteDelayedFetchesOnReplication_String__quorum_kraft/)
[Build / JDK 21 and Scala 2.13 / kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserPasses(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/7/testReport/junit/kafka.api/DelegationTokenEndToEndAuthorizationWithOwnerTest/Build___JDK_21_and_Scala_2_13___testDescribeTokenForOtherUserPasses_String__quorum_kraft/)
[Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/7/testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testBumpTransactionalEpoch_String__quorum_kraft/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/7/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/)
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testNoOpRecordWriteAfterTimeout()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/7/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testNoOpRecordWriteAfterTimeout__/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14459/7/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)

@iit2009060
Copy link
Contributor Author

@divijvaidya It seems i don't have permission to merge it even after the approval.

@divijvaidya divijvaidya merged commit 13b119a into apache:trunk Sep 29, 2023
1 check failed
divijvaidya pushed a commit that referenced this pull request Sep 29, 2023
A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
@divijvaidya
Copy link
Contributor

Merged to trunk and 3.6.

Thank you for the contribution @iit2009060. For Apache projects, only the committers can merge code into the repository.

@showuon
Copy link
Contributor

showuon commented Oct 2, 2023

Thanks for the fix @iit2009060 ! Nice find!

rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request Oct 3, 2023
…#14459)

A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
k-wall pushed a commit to k-wall/kafka that referenced this pull request Nov 21, 2023
…#14459)

A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Nov 22, 2023
…#14459)

A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.

Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tiered-storage Pull requests associated with KIP-405 (Tiered Storage)
Projects
None yet
6 participants