Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15194-Prepend-Offset-as-Filename #14057

Merged
merged 3 commits into from Jul 22, 2023

Conversation

Owen-CH-Leung
Copy link
Contributor

Prepend the offset information to the filename.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@divijvaidya divijvaidya added the tiered-storage Pull requests associated with KIP-405 (Tiered Storage) label Jul 20, 2023
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX),
Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()),
Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we want the test implementation to be as close to the actual log file implementation as possible. Considering that, could we use LogFileUtils#logFile(File dir, long offset) here? Same for index file names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya Thanks for your feedback. I think the actual log file was named as [offset.filetype]. Looking at the implementation of LogFileUtils#logFile(File dir, long offset), I don't think it will allow us to insert a uuid in the middle as part of the filename.

If we are to keep the [offset-uuid.filetype] pattern, instead of using LogFileUtils#logFile(File dir, long offset), maybe we should make LogFileUtils#filenamePrefixFromOffset(long offset) as a public method so that we can construct a real offset using this method. What do you think ?

FYI, the method to create these offloaded files is RemoteLogSegmentFileset#openFileset(final File storageDir, final RemoteLogSegmentId id) . Currently my PR has changed this method to accept RemoteLogSegmentMetadata instead of RemoteLogSegmentId , get offset from metadata, and prepend it to the filename. (So yes, it's not close to the actual log file implementation, as the offset was just "0" without formatting, instead of "0000000")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will allow us to insert a uuid in the middle as part of the filename.

Ack. I missed that.

maybe we should make LogFileUtils#filenamePrefixFromOffset(long offset) as a public method so that we can construct a real offset using this method. What do you think ?

Yes please. Let's use that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya I've changed the code to use LogFileUtils#filenamePrefixFromOffset(long offset). The filename now should look like a real log file implementation like 00000000000000000011-oAtiIQ95REujbuzNd_lkLQ.log

* / storage-directory / topic-partition-uuidBase64 / oAtiIQ95REujbuzNd_lkLQ.log
* . oAtiIQ95REujbuzNd_lkLQ.index
* . oAtiIQ95REujbuzNd_lkLQ.timeindex
* / storage-directory / topic-partition-uuidBase64 / startOffset-oAtiIQ95REujbuzNd_lkLQ.log
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Please replace "startOffset" with dummy values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I've added a dummy value there.

@divijvaidya
Copy link
Contributor

Seems like we have compilation errors: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14057/2/pipeline/9

Please fix them. Tag me here. Once this PR is ready for review again.

@Owen-CH-Leung
Copy link
Contributor Author

Owen-CH-Leung commented Jul 21, 2023

Seems like we have compilation errors: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14057/2/pipeline/9

Please fix them. Tag me here. Once this PR is ready for review again.

Seems like the error from the CI job is not related to what I have changed. Here are the 3 errors that are reported from the failing CI job:

[Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14057/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:3526:77: the result type of an implicit conversion must be more specific than Object

[Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14057/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:3530:70: the result type of an implicit conversion must be more specific than Object

[Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14057/core/src/test/scala/unit/kafka/utils/TestUtils.scala:1438:15: ambiguous reference to overloaded definition,
both method doReturn in class Mockito of type (x$1: Any, x$2: Object*)org.mockito.stubbing.Stubber
and  method doReturn in class Mockito of type (x$1: Any)org.mockito.stubbing.Stubber
match argument types (kafka.log.UnifiedLog)

In another PR of mine, the CI job on JDK 8 & Scala 2.12 ran successfully. Here are the PR link & the corresponding jenkin page:

#13773
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13773/11/pipeline/9/

I suspect a rerun could solve the error.

Locally I've ran ./gradlew storage:test and it's successful. So I think it should also pass in Jenkin also.

@divijvaidya
Copy link
Contributor

@divijvaidya
Copy link
Contributor

Actually, you need to rebase with trunk. 4ea9394 is the commit that fixes the build.

@Owen-CH-Leung
Copy link
Contributor Author

I've already merged trunk and this time the JDK 11 + Scala 2.13 failed:

1: Task failed with an exception.
-----------
* What went wrong:
Execution failed for task ':streams:upgrade-system-tests-25:checkstyleTest'.
> A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction
   > Failed to run Gradle Worker Daemon
      > Process 'Gradle Worker Daemon 10' finished with non-zero exit value 1
* Try:
> Run with --stacktrace option to get the stack trace.
> Run with --info or --debug option to get more log output.
> Get more help at https://help.gradle.org/.
==============================================================================
2: Task failed with an exception.
-----------
* What went wrong:
Execution failed for task ':generator:checkstyleMain'.
> A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction
   > Failed to run Gradle Worker Daemon
      > Process 'Gradle Worker Daemon 20' finished with non-zero exit value 1

The specific exception is org.gradle.internal.remote.internal.MessageIOException: Could not read message from '/127.0.0.1:44234'. It seems to me that we just need to restart this failing job and it should succeed. (In my another PR, it passed the gradle test after rebased with trunk

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated errors.

[Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14057/4/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOffsetTranslationBehindReplicationFlow__/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14057/4/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOffsetTranslationBehindReplicationFlow___2/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14057/4/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_17_and_Scala_2_13___testReplicateSourceDefault__/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14057/4/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_17_and_Scala_2_13___testOffsetTranslationBehindReplicationFlow__/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsZombieSinkTasks](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14057/4/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testAlterSinkConnectorOffsetsZombieSinkTasks/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14057/4/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_17_and_Scala_2_13___testBalancePartitionLeaders__/)

The build failure for JDK 11 is due to known flaky problems such as:

Unexpected exception thrown.

org.gradle.internal.remote.internal.MessageIOException: Could not read message from '/127.0.0.1:44234'.

	at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:94)

	at org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270)

	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)

	at org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:47)

	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

	at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: java.lang.IllegalArgumentException

	at org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:72)

	at org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:52)

	at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:81)

	... 6 more

@divijvaidya divijvaidya merged commit a3204ae into apache:trunk Jul 22, 2023
1 check failed
@divijvaidya
Copy link
Contributor

Thank you for your contribution @Owen-CH-Leung! Feel free to pick up more KIP-405 related items from it's parent ticket.

Cerchie pushed a commit to Cerchie/kafka that referenced this pull request Jul 25, 2023
jeqo pushed a commit to aiven/kafka that referenced this pull request Aug 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tiered-storage Pull requests associated with KIP-405 (Tiered Storage)
Projects
None yet
2 participants