Skip to content

Commit

Permalink
First working implementation of the message headers for all the avail…
Browse files Browse the repository at this point in the history
…able transports
  • Loading branch information
spetz committed Aug 18, 2023
1 parent ba5e968 commit 7c73bd3
Show file tree
Hide file tree
Showing 18 changed files with 574 additions and 132 deletions.
2 changes: 1 addition & 1 deletion configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"allowed_origins": [
"*"
],
"allowed_headers": [],
"allowed_headers": ["content-type"],
"exposed_headers": [],
"allow_credentials": false,
"allow_private_network": false
Expand Down
2 changes: 1 addition & 1 deletion configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ address = "0.0.0.0:3000"
enabled = true
allowed_methods = [ "GET", "POST", "PUT", "DELETE" ]
allowed_origins = [ "*" ]
allowed_headers = [ ]
allowed_headers = [ "content-type" ]
exposed_headers = [ ]
allow_credentials = false
allow_private_network = false
Expand Down
2 changes: 1 addition & 1 deletion iggy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.0.26"
version = "0.0.30"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
24 changes: 16 additions & 8 deletions iggy/src/binary/mapper.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::bytes_serializable::BytesSerializable;
use crate::error::Error;
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
Expand All @@ -7,6 +8,7 @@ use crate::models::partition::Partition;
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
use std::collections::HashMap;
use std::str::from_utf8;

const EMPTY_MESSAGES: Vec<Message> = vec![];
Expand Down Expand Up @@ -144,35 +146,41 @@ pub fn map_messages(payload: &[u8]) -> Result<Vec<Message>, Error> {
return Ok(EMPTY_MESSAGES);
}

const PROPERTIES_SIZE: usize = 36;
let length = payload.len();
let mut position = 4;
let mut messages = Vec::new();
while position < length {
let offset = u64::from_le_bytes(payload[position..position + 8].try_into()?);
let timestamp = u64::from_le_bytes(payload[position + 8..position + 16].try_into()?);
let id = u128::from_le_bytes(payload[position + 16..position + 32].try_into()?);
let message_length =
u32::from_le_bytes(payload[position + 32..position + PROPERTIES_SIZE].try_into()?);

let payload_range =
position + PROPERTIES_SIZE..position + PROPERTIES_SIZE + message_length as usize;
let headers_length = u32::from_le_bytes(payload[position + 32..position + 36].try_into()?);
let headers = if headers_length > 0 {
let headers_payload = &payload[position + 36..position + 36 + headers_length as usize];
Some(HashMap::from_bytes(headers_payload)?)
} else {
None
};
position += headers_length as usize;
let message_length = u32::from_le_bytes(payload[position + 36..position + 40].try_into()?);

let payload_range = position + 40..position + 40 + message_length as usize;
if payload_range.start > length || payload_range.end > length {
break;
}

let payload = payload[payload_range].to_vec();
let total_size = PROPERTIES_SIZE + message_length as usize;
let total_size = 40 + message_length as usize;
position += total_size;
messages.push(Message {
offset,
timestamp,
id,
headers,
length: message_length,
payload,
});

if position + PROPERTIES_SIZE >= length {
if position + 40 >= length {
break;
}
}
Expand Down
1 change: 1 addition & 0 deletions iggy/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ impl MessageClient for IggyClient {
offset: message.offset,
timestamp: message.timestamp,
length: payload.len() as u32,
headers: message.headers,
payload,
});
}
Expand Down
55 changes: 35 additions & 20 deletions iggy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ pub enum Error {
CannotReadMessageChecksum,
#[error("Invalid message checksum: {0}, expected: {1}, for offset: {2}")]
InvalidMessageChecksum(u32, u32, u64),
#[error("Cannot read headers length")]
CannotReadHeadersLength,
#[error("Cannot read headers payload")]
CannotReadHeadersPayload,
#[error("Cannot read message length")]
CannotReadMessageLength,
#[error("Cannot read message payload")]
Expand All @@ -204,6 +208,8 @@ pub enum Error {
InvalidHeaderKey,
#[error("Invalid header value")]
InvalidHeaderValue,
#[error("Too big headers payload")]
TooBigHeadersPayload,
#[error("Invalid offset: {0}")]
InvalidOffset(u64),
#[error("Failed to read consumers offsets for partition with ID: {0}")]
Expand Down Expand Up @@ -318,17 +324,20 @@ impl Error {
Error::CannotReadMessage => 4011,
Error::CannotReadMessageTimestamp => 4012,
Error::CannotReadMessageId => 4013,
Error::CannotReadMessageLength => 4014,
Error::CannotReadMessagePayload => 4015,
Error::TooBigMessagePayload => 4016,
Error::CannotReadHeadersLength => 4014,
Error::CannotReadHeadersPayload => 4015,
Error::TooBigHeadersPayload => 4016,
Error::InvalidHeaderKey => 4017,
Error::InvalidHeaderValue => 4018,
Error::TooManyMessages => 4019,
Error::EmptyMessagePayload => 4020,
Error::InvalidMessagePayloadLength => 4021,
Error::CannotReadMessageChecksum => 4022,
Error::InvalidMessageChecksum(_, _, _) => 4023,
Error::InvalidKeyValueLength => 4024,
Error::CannotReadMessageLength => 4019,
Error::CannotReadMessagePayload => 4020,
Error::TooBigMessagePayload => 4021,
Error::TooManyMessages => 4022,
Error::EmptyMessagePayload => 4023,
Error::InvalidMessagePayloadLength => 4024,
Error::CannotReadMessageChecksum => 4025,
Error::InvalidMessageChecksum(_, _, _) => 4026,
Error::InvalidKeyValueLength => 4027,
Error::InvalidOffset(_) => 4100,
Error::CannotReadConsumerOffsets(_) => 4101,
Error::ConsumerGroupNotFound(_, _) => 5000,
Expand Down Expand Up @@ -429,17 +438,20 @@ impl Error {
4011 => "cannot_read_message",
4012 => "cannot_read_message_timestamp",
4013 => "cannot_read_message_id",
4014 => "cannot_read_message_length",
4015 => "cannot_read_message_payload",
4016 => "too_big_message_payload",
4017 => "invalid_header_key",
4014 => "cannot_read_headers_length",
4015 => "cannot_read_headers_payload",
4016 => "invalid_header_key",
4017 => "too_big_headers_payload",
4018 => "invalid_header_value",
4019 => "too_many_messages",
4020 => "empty_message_payload",
4021 => "invalid_message_payload_length",
4022 => "cannot_read_message_checksum",
4023 => "invalid_message_checksum",
4024 => "invalid_key_value_length",
4019 => "cannot_read_message_length",
4020 => "cannot_read_message_payload",
4021 => "too_big_message_payload",
4022 => "too_many_messages",
4023 => "empty_message_payload",
4024 => "invalid_message_payload_length",
4025 => "cannot_read_message_checksum",
4026 => "invalid_message_checksum",
4027 => "invalid_key_value_length",
4100 => "invalid_offset",
4101 => "cannot_read_consumer_offsets",
5000 => "consumer_group_not_found",
Expand Down Expand Up @@ -513,6 +525,8 @@ impl Error {
Error::CannotReadMessage => "cannot_read_message",
Error::CannotReadMessageTimestamp => "cannot_read_message_timestamp",
Error::CannotReadMessageId => "cannot_read_message_id",
Error::CannotReadHeadersLength => "cannot_read_headers_length",
Error::CannotReadHeadersPayload => "cannot_read_headers_payload",
Error::CannotReadMessageLength => "cannot_read_message_length",
Error::CannotReadMessagePayload => "cannot_read_message_payload",
Error::CannotSaveMessagesToSegment => "cannot_save_messages_to_segment",
Expand All @@ -521,7 +535,7 @@ impl Error {
Error::CannotParseUtf8(_) => "cannot_parse_utf8",
Error::CannotParseInt(_) => "cannot_parse_int",
Error::CannotParseSlice(_) => "cannot_parse_slice",
Error::TooBigMessagePayload => "too_big_message_payload",
Error::TooBigHeadersPayload => "too_big_headers_payload",
Error::InvalidHeaderKey => "invalid_header_key",
Error::InvalidHeaderValue => "invalid_header_value",
Error::TooManyMessages => "too_many_messages",
Expand All @@ -531,6 +545,7 @@ impl Error {
Error::CannotDeletePartition(_, _, _) => "cannot_delete_partition",
Error::CannotDeletePartitionDirectory(_, _, _) => "cannot_delete_partition_directory",
Error::InvalidMessagePayloadLength => "invalid_message_payload_length",
Error::TooBigMessagePayload => "too_big_message_payload",
Error::EmptyMessagePayload => "empty_message_payload",
Error::CannotReadStreams => "cannot_read_streams",
Error::CannotReadTopics(_) => "cannot_read_topics",
Expand Down

0 comments on commit 7c73bd3

Please sign in to comment.