Skip to content

Commit

Permalink
Initial draft of the message headers implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz committed Aug 16, 2023
1 parent dc6d2d9 commit bc1d0b3
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 24 deletions.
2 changes: 2 additions & 0 deletions iggy/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,14 @@ impl MessageClient for IggyClient {
return self.client.read().await.send_messages(command).await;
}

// TODO: Clone headers
let mut messages = Vec::with_capacity(command.messages.len());
for message in &command.messages {
let message = crate::messages::send_messages::Message {
id: message.id,
length: message.length,
payload: message.payload.clone(),
headers: message.headers.clone(),
};
messages.push(message);
}
Expand Down
34 changes: 22 additions & 12 deletions iggy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ pub enum Error {
TooBigMessagePayload,
#[error("Too many messages")]
TooManyMessages,
#[error("Invalid header key")]
InvalidHeaderKey,
#[error("Invalid header value")]
InvalidHeaderValue,
#[error("Invalid offset: {0}")]
InvalidOffset(u64),
#[error("Failed to read consumers offsets for partition with ID: {0}")]
Expand Down Expand Up @@ -317,12 +321,14 @@ impl Error {
Error::CannotReadMessageLength => 4014,
Error::CannotReadMessagePayload => 4015,
Error::TooBigMessagePayload => 4016,
Error::TooManyMessages => 4017,
Error::EmptyMessagePayload => 4018,
Error::InvalidMessagePayloadLength => 4019,
Error::CannotReadMessageChecksum => 4020,
Error::InvalidMessageChecksum(_, _, _) => 4021,
Error::InvalidKeyValueLength => 4022,
Error::InvalidHeaderKey => 4017,
Error::InvalidHeaderValue => 4018,
Error::TooManyMessages => 4019,
Error::EmptyMessagePayload => 4020,
Error::InvalidMessagePayloadLength => 4021,
Error::CannotReadMessageChecksum => 4022,
Error::InvalidMessageChecksum(_, _, _) => 4023,
Error::InvalidKeyValueLength => 4024,
Error::InvalidOffset(_) => 4100,
Error::CannotReadConsumerOffsets(_) => 4101,
Error::ConsumerGroupNotFound(_, _) => 5000,
Expand Down Expand Up @@ -426,12 +432,14 @@ impl Error {
4014 => "cannot_read_message_length",
4015 => "cannot_read_message_payload",
4016 => "too_big_message_payload",
4017 => "too_many_messages",
4018 => "empty_message_payload",
4019 => "invalid_message_payload_length",
4020 => "cannot_read_message_checksum",
4021 => "invalid_message_checksum",
4022 => "invalid_key_value_length",
4017 => "invalid_header_key",
4018 => "invalid_header_value",
4019 => "too_many_messages",
4020 => "empty_message_payload",
4021 => "invalid_message_payload_length",
4022 => "cannot_read_message_checksum",
4023 => "invalid_message_checksum",
4024 => "invalid_key_value_length",
4100 => "invalid_offset",
4101 => "cannot_read_consumer_offsets",
5000 => "consumer_group_not_found",
Expand Down Expand Up @@ -514,6 +522,8 @@ impl Error {
Error::CannotParseInt(_) => "cannot_parse_int",
Error::CannotParseSlice(_) => "cannot_parse_slice",
Error::TooBigMessagePayload => "too_big_message_payload",
Error::InvalidHeaderKey => "invalid_header_key",
Error::InvalidHeaderValue => "invalid_header_value",
Error::TooManyMessages => "too_many_messages",
Error::WriteError(_) => "write_error",
Error::InvalidOffset(_) => "invalid_offset",
Expand Down
214 changes: 214 additions & 0 deletions iggy/src/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
use crate::error::Error;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct HeaderKey(String);

impl HeaderKey {
pub fn new(key: String) -> Result<Self, Error> {
if key.is_empty() || key.len() > 255 {
return Err(Error::InvalidHeaderKey);
}

Ok(Self(key))
}

pub fn as_str(&self) -> &str {
&self.0
}
}

impl Hash for HeaderKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct HeaderValue {
pub kind: HeaderKind,
pub value: Vec<u8>,
}

#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub enum HeaderKind {
Raw,
String,
Bool,
Int8,
Int16,
Int32,
Int64,
Int128,
Uint8,
Uint16,
Uint32,
Uint64,
Uint128,
Float32,
Float64,
}

impl Display for HeaderValue {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: ", self.kind)?;
match self.kind {
HeaderKind::Raw => write!(f, "{:?}", self.value),
HeaderKind::String => write!(f, "{}", String::from_utf8_lossy(&self.value)),
HeaderKind::Bool => write!(f, "{}", self.value[0] != 0),
HeaderKind::Int8 => write!(
f,
"{}",
i8::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Int16 => write!(
f,
"{}",
i16::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Int32 => write!(
f,
"{}",
i32::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Int64 => write!(
f,
"{}",
i64::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Int128 => write!(
f,
"{}",
i128::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Uint8 => write!(
f,
"{}",
u8::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Uint16 => write!(
f,
"{}",
u16::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Uint32 => write!(
f,
"{}",
u32::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Uint64 => write!(
f,
"{}",
u64::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Uint128 => write!(
f,
"{}",
u128::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Float32 => write!(
f,
"{}",
f32::from_le_bytes(self.value.clone().try_into().unwrap())
),
HeaderKind::Float64 => write!(
f,
"{}",
f64::from_le_bytes(self.value.clone().try_into().unwrap())
),
}
}
}

impl Display for HeaderKind {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match *self {
HeaderKind::Raw => write!(f, "raw"),
HeaderKind::String => write!(f, "string"),
HeaderKind::Bool => write!(f, "bool"),
HeaderKind::Int8 => write!(f, "int8"),
HeaderKind::Int16 => write!(f, "int16"),
HeaderKind::Int32 => write!(f, "int32"),
HeaderKind::Int64 => write!(f, "int64"),
HeaderKind::Int128 => write!(f, "int128"),
HeaderKind::Uint8 => write!(f, "uint8"),
HeaderKind::Uint16 => write!(f, "uint16"),
HeaderKind::Uint32 => write!(f, "uint32"),
HeaderKind::Uint64 => write!(f, "uint64"),
HeaderKind::Uint128 => write!(f, "uint128"),
HeaderKind::Float32 => write!(f, "float32"),
HeaderKind::Float64 => write!(f, "float64"),
}
}
}

impl HeaderValue {
pub fn raw(value: Vec<u8>) -> Result<Self, Error> {
if value.is_empty() || value.len() > 255 {
return Err(Error::InvalidHeaderValue);
}

Ok(Self {
kind: HeaderKind::Raw,
value,
})
}

pub fn string(value: String) -> Result<Self, Error> {
Self::raw(value.into_bytes())
}

pub fn bool(value: bool) -> Result<Self, Error> {
Self::raw(vec![value as u8])
}

pub fn int8(value: i8) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn int16(value: i16) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn int32(value: i32) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn int64(value: i64) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn int128(value: i128) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn uint8(value: u8) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn uint16(value: u16) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn uint32(value: u32) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn uint64(value: u64) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn uint128(value: u128) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn float32(value: f32) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}

pub fn float64(value: f64) -> Result<Self, Error> {
Self::raw(value.to_le_bytes().to_vec())
}
}
1 change: 1 addition & 0 deletions iggy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod consumer;
pub mod consumer_groups;
pub mod consumer_offsets;
pub mod error;
pub mod header;
pub mod http;
pub mod identifier;
pub mod messages;
Expand Down
23 changes: 21 additions & 2 deletions iggy/src/messages/send_messages.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::bytes_serializable::BytesSerializable;
use crate::command::CommandPayload;
use crate::error::Error;
use crate::header::{HeaderKey, HeaderValue};
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_with::base64::Base64;
use serde_with::serde_as;
use std::collections::HashMap;
use std::fmt::Display;
use std::str::FromStr;

Expand Down Expand Up @@ -43,6 +45,7 @@ pub struct Message {
pub length: u32,
#[serde_as(as = "Base64")]
pub payload: Bytes,
pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Copy, Clone)]
Expand Down Expand Up @@ -210,8 +213,19 @@ impl FromStr for PartitioningKind {

impl Message {
pub fn get_size_bytes(&self) -> u32 {
// ID + Length + Payload
16 + 4 + self.payload.len() as u32
// ID + Length + Payload + Headers
16 + 4 + self.payload.len() as u32 + self.get_headers_size_bytes()
}

fn get_headers_size_bytes(&self) -> u32 {
let mut size = 0;
if let Some(headers) = &self.headers {
for (key, value) in headers {
// Kind + Key + Value
size += 1 + key.as_str().len() as u32 + value.value.len() as u32;
}
}
size
}
}

Expand All @@ -222,6 +236,7 @@ impl Default for Message {
id: 0,
length: payload.len() as u32,
payload,
headers: None,
}
}
}
Expand Down Expand Up @@ -264,6 +279,7 @@ impl BytesSerializable for Partitioning {
}
}

// TODO: Implement headers serialization
impl BytesSerializable for Message {
fn as_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(self.get_size_bytes() as usize);
Expand Down Expand Up @@ -293,6 +309,7 @@ impl BytesSerializable for Message {
id,
length,
payload,
headers: None,
})
}
}
Expand All @@ -318,6 +335,7 @@ impl FromStr for Message {
id,
length,
payload,
headers: None,
})
}
}
Expand Down Expand Up @@ -351,6 +369,7 @@ impl FromStr for SendMessages {
id: message_id,
length: payload.len() as u32,
payload,
headers: None,
};

let command = SendMessages {
Expand Down
2 changes: 1 addition & 1 deletion server/server.http
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ POST {{url}}/streams/{{stream_id}}/topics/{{topic_id}}/messages
Content-Type: application/json

{
"key": {
"partitioning": {
"kind": "partition_id",
"value": "{{partition_id_payload_base64}}"
},
Expand Down

0 comments on commit bc1d0b3

Please sign in to comment.