Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.1-edge.1" }
integration = { path = "core/integration" }
keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
lazy_static = "1.5.0"
left-right = "0.11"
log = "0.4.29"
metadata = { path = "core/metadata" }
mimalloc = "0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::str::from_utf8;
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `name` - unique consumer group name, max length is 255 characters.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreateConsumerGroup {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::fmt::Display;
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `group_id` - unique consumer group ID (numeric or name).
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct DeleteConsumerGroup {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/partitions/create_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::fmt::Display;
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `partitions_count` - number of partitions in the topic to create, max value is 1000.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreatePartitions {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/partitions/delete_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::fmt::Display;
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `partitions_count` - number of partitions in the topic to delete, max value is 1000.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct DeletePartitions {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::str::from_utf8;
/// It has additional payload:
/// - `name` - unique name of the token, must be between 3 and 30 characters long.
/// - `expiry` - expiry of the token.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CreatePersonalAccessToken {
/// Unique name of the token, must be between 3 and 30 characters long.
pub name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::str::from_utf8;
/// `DeletePersonalAccessToken` command is used to delete a personal access token for the authenticated user.
/// It has additional payload:
/// - `name` - unique name of the token, must be between 3 and 30 characters long.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DeletePersonalAccessToken {
/// Unique name of the token, must be between 3 and 30 characters long.
pub name: String,
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/segments/delete_segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::fmt::Display;
/// - `topic_id` - unique topic ID (numeric or name).
/// - `partition_id` - unique partition ID (numeric or name).
/// - `segments_count` - number of segments in the partition to delete.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct DeleteSegments {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/streams/create_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::str::from_utf8;
/// `CreateStream` command is used to create a new stream.
/// It has additional payload:
/// - `name` - unique stream name (string), max length is 255 characters.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreateStream {
/// Unique stream name (string), max length is 255 characters.
pub name: String,
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/streams/delete_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::fmt::Display;
/// `DeleteStream` command is used to delete an existing stream.
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct DeleteStream {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/streams/purge_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::fmt::Display;
/// `PurgeStream` command is used to purge stream data (all the messages from its topics).
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct PurgeStream {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/streams/update_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::str::from_utf8;
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
/// - `name` - unique stream name (string), max length is 255 characters.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct UpdateStream {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/topics/create_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::str::from_utf8;
/// Can't be lower than segment size in the config.
/// - `replication_factor` - replication factor for the topic.
/// - `name` - unique topic name, max length is 255 characters.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreateTopic {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/topics/delete_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::fmt::Display;
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct DeleteTopic {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/topics/purge_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::fmt::Display;
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct PurgeTopic {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/topics/update_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::str::from_utf8;
/// Can't be lower than segment size in the config.
/// - `replication_factor` - replication factor for the topic.
/// - `name` - unique topic name, max length is 255 characters.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct UpdateTopic {
/// Unique stream ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/users/change_password.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::str::from_utf8;
/// - `user_id` - unique user ID (numeric or name).
/// - `current_password` - current password, must be between 3 and 100 characters long.
/// - `new_password` - new password, must be between 3 and 100 characters long.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct ChangePassword {
/// Unique user ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/users/create_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::str::from_utf8;
/// - `password` - password of the user, must be between 3 and 100 characters long.
/// - `status` - status of the user, can be either `active` or `inactive`.
/// - `permissions` - optional permissions of the user. If not provided, user will have no permissions.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreateUser {
/// Unique name of the user, must be between 3 and 50 characters long.
pub username: String,
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/users/delete_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::fmt::Display;
/// `DeleteUser` command is used to delete a user by unique ID.
/// It has additional payload:
/// - `user_id` - unique user ID (numeric or name).
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct DeleteUser {
/// Unique user ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/users/update_permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::fmt::Display;
/// It has additional payload:
/// - `user_id` - unique user ID (numeric or name).
/// - `permissions` - new permissions (optional)
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct UpdatePermissions {
/// Unique user ID (numeric or name).
#[serde(skip)]
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/commands/users/update_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::str::from_utf8;
/// - `user_id` - unique user ID (numeric or name).
/// - `username` - new username (optional), if provided, must be between 3 and 50 characters long.
/// - `status` - new status (optional)
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct UpdateUser {
#[serde(skip)]
pub user_id: Identifier,
Expand Down
16 changes: 16 additions & 0 deletions core/common/src/types/consensus/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ where
}
}

/// Get the message body as zero-copy `Bytes`.
///
/// Returns an empty `Bytes` if there is no body.
#[inline]
#[allow(unused)]
pub fn body_bytes(&self) -> Bytes {
let header_size = size_of::<H>();
let total_size = self.header().size() as usize;

if total_size > header_size {
self.buffer.slice(header_size..total_size)
} else {
Bytes::new()
}
}

/// Get the complete message as bytes (header + body).
#[inline]
#[allow(unused)]
Expand Down
3 changes: 2 additions & 1 deletion core/metadata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ readme = "../../../README.md"

[dependencies]
ahash = { workspace = true }
bytes = { workspace = true }
consensus = { path = "../consensus" }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
left-right = { workspace = true }
message_bus = { path = "../message_bus" }
paste = "1.0"
slab = "0.4.11"
tracing = { workspace = true }
Loading
Loading