-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile. #11060
Conversation
b666a13
to
bf18df8
Compare
ff4a2eb
to
aa44348
Compare
@junrao This is a minor refactoring. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd : Thanks for the PR. One high level comment below.
|
||
try Files.createFile(file.toPath) // create the file if it doesn't exist | ||
catch { case _: FileAlreadyExistsException => } | ||
logDir: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need CheckpointFile? Could we just extend SnapshotFile to take an IOExceptionHandler and logDir?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started with that while refactoring the existing class. I implemented IOExceptionHandler
class which takes logDir
and logDirFailureChannel
, and reuse it in both LeaderepochCheckpointFile
and OffsetsCheckpointFile
. I preferred to keep SnapshotFile
simple and implementors can have their own logic in extending write and read methods. This allows more custom logic for read/write but not only IOException
handling.
In both cases, I had to create a new class, and I preferred to extend the SnapshotFile
class.
Having said that, I do not have strong opinions on this, I am fine with either way.
Thanks @junrao for the comment, replied inline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd : Thanks for the reply. A couple of more minor comments.
} | ||
|
||
Utils.atomicMoveWithFallback(tempPath, path) | ||
snapshotFile.write(entries.toSeq.asJava); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for ;
* | ||
* @param <T> entry type. | ||
*/ | ||
public class SnapshotFile<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since KRaft introduced Snapshot for metadata, perhaps we could call this CheckpointFile and rename CheckpointFile to sth like CheckpointFileWithFailureHandler.
Thanks @junrao for the comments. Addressed them with the latest commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd : Thanks for the updated PR. The test failures seem to be related to this PR?
[2021-08-27 16ː48ː04,662] ERROR Error while reading checkpoint file /tmp/kafka-14845667023918760346/replication-offset-checkpoint (kafka.server.LogDirFailureChannel:76)
java.nio.file.NoSuchFileException: /tmp/kafka-14845667023918760346/replication-offset-checkpoint
at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
at java.base/java.nio.file.Files.newByteChannel(Files.java:375)
at java.base/java.nio.file.Files.newByteChannel(Files.java:426)
at java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
at java.base/java.nio.file.Files.newInputStream(Files.java:160)
at java.base/java.nio.file.Files.newBufferedReader(Files.java:2916)
at java.base/java.nio.file.Files.newBufferedReader(Files.java:2948)
at org.apache.kafka.server.common.CheckpointFile.read(CheckpointFile.java:107)
at kafka.server.checkpoints.CheckpointFileWithFailureHandler.read(CheckpointFileWithFailureHandler.scala:48)
at kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:70)
at kafka.server.checkpoints.LazyOffsetCheckpointMap.offsets$lzycompute(OffsetCheckpointFile.scala:94)
at kafka.server.checkpoints.LazyOffsetCheckpointMap.offsets(OffsetCheckpointFile.scala:94)
at kafka.server.checkpoints.LazyOffsetCheckpointMap.fetch(OffsetCheckpointFile.scala:97)
at kafka.server.checkpoints.LazyOffsetCheckpoints.fetch(OffsetCheckpointFile.scala:89)
at kafka.cluster.Partition.updateHighWatermark$1(Partition.scala:338)
at kafka.cluster.Partition.createLog(Partition.scala:351)
at kafka.cluster.Partition.maybeCreate$1(Partition.scala:324)
at kafka.cluster.Partition.createLogIfNotExists(Partition.scala:331)
at kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:536)
at kafka.cluster.Partition.makeLeader(Partition.scala:520)
at kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$3(ReplicaManager.scala:2130)
at scala.Option.foreach(Option.scala:437)
at kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$2(ReplicaManager.scala:2127)
at kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$2$adapted(ReplicaManager.scala:2126)
at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
at kafka.server.ReplicaManager.applyLocalLeadersDelta(ReplicaManager.scala:2126)
at kafka.server.ReplicaManager.applyDelta(ReplicaManager.scala:2103)
at kafka.server.ReplicaManagerTest.testDeltaToLeaderOrFollowerMarksPartitionOfflineIfLogCantBeCreated(ReplicaManagerTest.scala:3237)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
… server-common module and introduced it as SnapshotFile. - Refactored CheckpointFile to server-common module as a Java class and it is reused by LeaderCheckpointFile, OffsetCheckpointFile. - This will be used by CommittedOffsetsFile which checkpoints remote log metadata partitions with respective offsets in the default RemoteLogMetadataManager implementation. - Existing tests are available for LeaderCheckpointFile, OffsetCheckpointFile.
e6c5497
to
e38b775
Compare
- This fixes the test failures in ReplicaManagerTest.
7c33cb8
to
0c828ec
Compare
Thanks @junrao for pointing that out. Earlier test failure is fixed with the latest commit and we do not see that failure now. The latest run failures are not related to the changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd : Thanks for the updated PR. LGTM
… server-common module and introduced it as SnapshotFile. (apache#11060) MINOR Refactored the existing CheckpointFile in core module, moved to server-common module. Refactored CheckpointFile to server-common module as a Java class and it is reused by LeaderCheckpointFile, OffsetCheckpointFile. This will be used by CommittedOffsetsFile which checkpoints remote log metadata partitions with respective offsets in the default RemoteLogMetadataManager implementation. Existing tests are available for LeaderCheckpointFile, OffsetCheckpointFile. Reviewers: Jun Rao <junrao@gmail.com>
… server-common module and introduced it as SnapshotFile. (apache#11060) MINOR Refactored the existing CheckpointFile in core module, moved to server-common module. Refactored CheckpointFile to server-common module as a Java class and it is reused by LeaderCheckpointFile, OffsetCheckpointFile. This will be used by CommittedOffsetsFile which checkpoints remote log metadata partitions with respective offsets in the default RemoteLogMetadataManager implementation. Existing tests are available for LeaderCheckpointFile, OffsetCheckpointFile. Reviewers: Jun Rao <junrao@gmail.com>
MINOR Refactored the existing CheckpointFile in core module, moved to server-common module.
Committer Checklist (excluded from commit message)