feat(inkless): implement InitDisklessLog Controller API#531
Conversation
959f497 to
5e0d79a
Compare
Persist disklessStartOffset and producer states into KRaft metadata. Only the leader is allowed to call InitDisklessLog. New fields are also persisted in the KRaft snapshots through PartitionRegistration. To avoid conflicts with upstream changes to the metadata records, new metadata fields are written directly in the unknown tagged fields of PartitionRecord and PartitionChangeRecord.
5e0d79a to
4573f9f
Compare
metadata/src/main/java/org/apache/kafka/controller/InitDisklessLogRequestData.java
Outdated
Show resolved
Hide resolved
| int partitionId, | ||
| long disklessStartOffset, | ||
| int leaderEpoch, | ||
| List<ProducerState> producerStates |
There was a problem hiding this comment.
Does ProducerState need to be persisted on KRaft? I wonder if it could be sent from the leader to DisklessController directly
There was a problem hiding this comment.
Yes it needs to be because it might happen that the broker that is able to successfully call InitDisklessLog on the controller (so the real leader) is not the same that calls the Diskless Controller (it might be another one, even a zombie leader in a split brain scenario). To keep this protocol 100% safe without any other step involved we also need to "commit" the producer state, so that once InitDisklessLog controller API is called the partition state is completely frozen.
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Show resolved
Hide resolved
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Show resolved
Hide resolved
| buf.putLong(s.assignedOffset()); | ||
| buf.putLong(s.batchMaxTimestamp()); | ||
| } | ||
| buf.flip(); |
There was a problem hiding this comment.
nit: can be dropped as the backing array doesn't care about position
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Outdated
Show resolved
Hide resolved
* feat(inkless): implement InitDisklessLog Controller API Persist disklessStartOffset and producer states into KRaft metadata. Only the leader is allowed to call InitDisklessLog. New fields are also persisted in the KRaft snapshots through PartitionRegistration. To avoid conflicts with upstream changes to the metadata records, new metadata fields are written directly in the unknown tagged fields of PartitionRecord and PartitionChangeRecord. (cherry picked from commit 0070155) # Conflicts: # metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
* feat(inkless): implement InitDisklessLog Controller API Persist disklessStartOffset and producer states into KRaft metadata. Only the leader is allowed to call InitDisklessLog. New fields are also persisted in the KRaft snapshots through PartitionRegistration. To avoid conflicts with upstream changes to the metadata records, new metadata fields are written directly in the unknown tagged fields of PartitionRecord and PartitionChangeRecord.
* feat(inkless): implement InitDisklessLog Controller API Persist disklessStartOffset and producer states into KRaft metadata. Only the leader is allowed to call InitDisklessLog. New fields are also persisted in the KRaft snapshots through PartitionRegistration. To avoid conflicts with upstream changes to the metadata records, new metadata fields are written directly in the unknown tagged fields of PartitionRecord and PartitionChangeRecord. (cherry picked from commit 0070155) # Conflicts: # metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
Persist disklessStartOffset and producer states into KRaft metadata.
Only the leader is allowed to call InitDisklessLog.
To avoid conflicts with upstream changes to the metadata records, new metadata fields are written directly in the unknown tagged fields.
New fields are also persisted in the KRaft snapshots through PartitionRegistration.