Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions core/binary_protocol/src/consensus/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum Operation {
/// but skips state machine dispatch at commit time, the metadata
/// plane calls `commit_register` directly. Session number = commit op.
Register = 1,
// Internal metadata operations (journal / replica-only)
CreateTopicWithAssignments = 64,
CreatePartitionsWithAssignments = 65,

// Metadata operations (shard 0)
CreateStream = 128,
Expand Down Expand Up @@ -65,10 +68,25 @@ pub enum Operation {
}

impl Operation {
pub const INTERNAL_START: u8 = Self::CreateTopicWithAssignments as u8;
pub const METADATA_START: u8 = Self::CreateStream as u8;
pub const PARTITION_START: u8 = Self::SendMessages as u8;

/// Internal-only operations reserved for replica / journal use.
#[must_use]
#[inline]
pub const fn is_internal(&self) -> bool {
(*self as u8) >= Self::INTERNAL_START && (*self as u8) < Self::METADATA_START
}

/// Metadata / control-plane operations handled by shard 0.
#[must_use]
#[inline]
pub const fn is_metadata(&self) -> bool {
if self.is_internal() {
return true;
}

matches!(
self,
Self::CreateStream
Expand Down Expand Up @@ -105,13 +123,14 @@ impl Operation {
#[must_use]
#[inline]
pub const fn is_partition(&self) -> bool {
matches!(
self,
Self::SendMessages
| Self::StoreConsumerOffset
| Self::DeleteConsumerOffset
| Self::DeleteSegments
)
matches!(self, Self::DeleteSegments) || (*self as u8) >= Self::PARTITION_START
}

/// Operations clients are allowed to send directly.
#[must_use]
#[inline]
pub const fn is_client_allowed(&self) -> bool {
!matches!(self, Self::Reserved) && !self.is_internal()
}

/// Bidirectional mapping: `Operation` -> client command code.
Expand All @@ -120,7 +139,10 @@ impl Operation {
#[must_use]
pub const fn to_command_code(&self) -> Option<u32> {
match self {
Self::Reserved | Self::Register => None,
Self::Reserved
| Self::Register
| Self::CreateTopicWithAssignments
| Self::CreatePartitionsWithAssignments => None,
Self::CreateStream
| Self::UpdateStream
| Self::DeleteStream
Expand Down Expand Up @@ -217,6 +239,14 @@ mod tests {
assert!(!Operation::SendMessages.is_vsr_reserved());
assert!(!Operation::Register.is_metadata());
assert!(!Operation::Register.is_partition());
assert_eq!(
Operation::CreateTopicWithAssignments.to_command_code(),
None
);
assert_eq!(
Operation::CreatePartitionsWithAssignments.to_command_code(),
None
);
}

#[test]
Expand All @@ -230,8 +260,12 @@ mod tests {

#[test]
fn metadata_vs_partition() {
assert!(Operation::CreateTopicWithAssignments.is_internal());
assert!(Operation::CreateTopicWithAssignments.is_metadata());
assert!(!Operation::CreateTopicWithAssignments.is_client_allowed());
assert!(Operation::CreateStream.is_metadata());
assert!(!Operation::CreateStream.is_partition());
assert!(Operation::CreateStream.is_client_allowed());
assert!(Operation::SendMessages.is_partition());
assert!(!Operation::SendMessages.is_metadata());
assert!(Operation::DeleteSegments.is_partition());
Expand Down
5 changes: 4 additions & 1 deletion core/binary_protocol/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta>
Operation::SendMessages => 22,
Operation::StoreConsumerOffset => 25,
Operation::DeleteConsumerOffset => 26,
Operation::Reserved | Operation::Register => return None,
Operation::CreateTopicWithAssignments
| Operation::CreatePartitionsWithAssignments
| Operation::Reserved
| Operation::Register => return None,
};
Some(&COMMAND_TABLE[idx])
}
Expand Down
1 change: 1 addition & 0 deletions core/binary_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub use message_view::{
};
pub use primitives::consumer::WireConsumer;
pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, WireName};
pub use primitives::partition_assignment::CreatedPartitionAssignment;
pub use primitives::partitioning::{MAX_MESSAGES_KEY_LENGTH, WirePartitioning};
pub use primitives::permissions::{
WireGlobalPermissions, WirePermissions, WireStreamPermissions, WireTopicPermissions,
Expand Down
1 change: 1 addition & 0 deletions core/binary_protocol/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
pub mod consumer;
pub mod identifier;
pub mod partition_assignment;
pub mod partitioning;
pub mod permissions;
pub mod polling_strategy;
Expand Down
75 changes: 75 additions & 0 deletions core/binary_protocol/src/primitives/partition_assignment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::WireError;
use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le};
use bytes::{BufMut, BytesMut};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreatedPartitionAssignment {
Comment thread
numinnex marked this conversation as resolved.
pub partition_id: u32,
pub consensus_group_id: u64,
}

impl WireEncode for CreatedPartitionAssignment {
fn encoded_size(&self) -> usize {
12
}

fn encode(&self, buf: &mut BytesMut) {
buf.put_u32_le(self.partition_id);
buf.put_u64_le(self.consensus_group_id);
}
}

impl WireDecode for CreatedPartitionAssignment {
fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
if buf.len() < 12 {
return Err(WireError::UnexpectedEof {
offset: 0,
need: 12,
have: buf.len(),
});
}

Ok((
Self {
partition_id: read_u32_le(buf, 0)?,
consensus_group_id: read_u64_le(buf, 4)?,
},
12,
))
}
}

#[cfg(test)]
mod tests {
use super::CreatedPartitionAssignment;
use crate::codec::{WireDecode, WireEncode};

#[test]
fn roundtrip() {
let request = CreatedPartitionAssignment {
partition_id: 7,
consensus_group_id: 42,
};
let bytes = request.to_bytes();
let (decoded, consumed) = CreatedPartitionAssignment::decode(&bytes).unwrap();
assert_eq!(consumed, bytes.len());
assert_eq!(decoded, request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::WireError;
use crate::codec::{WireDecode, WireEncode, read_u32_le};
use crate::primitives::partition_assignment::CreatedPartitionAssignment;
use crate::requests::partitions::CreatePartitionsRequest;
use bytes::{BufMut, BytesMut};

fn usize_to_u32(value: usize, context: &str) -> u32 {
u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32"))
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreatePartitionsWithAssignmentsRequest {
pub request: CreatePartitionsRequest,
pub partitions: Vec<CreatedPartitionAssignment>,
}

impl WireEncode for CreatePartitionsWithAssignmentsRequest {
fn encoded_size(&self) -> usize {
4 + self.request.encoded_size()
+ 4
+ self
.partitions
.iter()
.map(WireEncode::encoded_size)
.sum::<usize>()
}

fn encode(&self, buf: &mut BytesMut) {
buf.put_u32_le(usize_to_u32(
self.request.encoded_size(),
"create partitions request size",
));
self.request.encode(buf);
buf.put_u32_le(usize_to_u32(
self.partitions.len(),
"create partitions partition count",
));
for partition in &self.partitions {
partition.encode(buf);
}
}
}

impl WireDecode for CreatePartitionsWithAssignmentsRequest {
fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
let request_size = read_u32_le(buf, 0)? as usize;
let request_start = 4;
let request_end = request_start + request_size;
if buf.len() < request_end {
return Err(WireError::UnexpectedEof {
offset: request_start,
need: request_size,
have: buf.len().saturating_sub(request_start),
});
}

let request = CreatePartitionsRequest::decode_from(&buf[request_start..request_end])?;
let partitions_count = read_u32_le(buf, request_end)? as usize;
let mut offset = request_end + 4;
let mut partitions = Vec::with_capacity(partitions_count);
for _ in 0..partitions_count {
let (partition, consumed) = CreatedPartitionAssignment::decode(&buf[offset..])?;
offset += consumed;
partitions.push(partition);
}

Ok((
Self {
request,
partitions,
},
offset,
))
}
}

#[cfg(test)]
mod tests {
use super::CreatePartitionsWithAssignmentsRequest;
use crate::WireIdentifier;
use crate::codec::{WireDecode, WireEncode};
use crate::primitives::partition_assignment::CreatedPartitionAssignment;
use crate::requests::partitions::CreatePartitionsRequest;

#[test]
fn roundtrip() {
let request = CreatePartitionsWithAssignmentsRequest {
request: CreatePartitionsRequest {
stream_id: WireIdentifier::numeric(1),
topic_id: WireIdentifier::numeric(2),
partitions_count: 2,
},
partitions: vec![
CreatedPartitionAssignment {
partition_id: 3,
consensus_group_id: 11,
},
CreatedPartitionAssignment {
partition_id: 4,
consensus_group_id: 12,
},
],
};
let bytes = request.to_bytes();
let (decoded, consumed) = CreatePartitionsWithAssignmentsRequest::decode(&bytes).unwrap();
assert_eq!(consumed, bytes.len());
assert_eq!(decoded, request);
}
}
3 changes: 3 additions & 0 deletions core/binary_protocol/src/requests/partitions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
// under the License.

pub mod create_partitions;
pub mod create_partitions_with_assignments;
pub mod delete_partitions;

pub use crate::primitives::partition_assignment::CreatedPartitionAssignment;
pub use create_partitions::CreatePartitionsRequest;
pub use create_partitions_with_assignments::CreatePartitionsWithAssignmentsRequest;
pub use delete_partitions::DeletePartitionsRequest;
Loading
Loading