Skip to content

Commit

Permalink
Added message state, unified message struct for streaming and client …
Browse files Browse the repository at this point in the history
…SDK, extended message with state and checksum
  • Loading branch information
spetz committed Aug 22, 2023
1 parent 1de8524 commit b07f23d
Show file tree
Hide file tree
Showing 48 changed files with 260 additions and 265 deletions.
3 changes: 2 additions & 1 deletion iggy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.0.30"
version = "0.0.40"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand All @@ -17,6 +17,7 @@ async-trait = "0.1.68"
base64 = "0.21.2"
bytes = "1.4.0"
clap = { version = "4.1.11", features = ["derive"] }
crc32fast = "1.3.2"
lazy_static = "1.4.0"
regex = "1.9.1"
reqwest = { version = "0.11.18", features = ["json"] }
Expand Down
26 changes: 15 additions & 11 deletions iggy/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use crate::bytes_serializable::BytesSerializable;
use crate::error::Error;
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
use crate::models::message::Message;
use crate::models::message::{Message, MessageState};
use crate::models::offset::Offset;
use crate::models::partition::Partition;
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
use bytes::Bytes;
use std::collections::HashMap;
use std::str::from_utf8;

Expand Down Expand Up @@ -151,36 +152,39 @@ pub fn map_messages(payload: &[u8]) -> Result<Vec<Message>, Error> {
let mut messages = Vec::new();
while position < length {
let offset = u64::from_le_bytes(payload[position..position + 8].try_into()?);
let timestamp = u64::from_le_bytes(payload[position + 8..position + 16].try_into()?);
let id = u128::from_le_bytes(payload[position + 16..position + 32].try_into()?);
let headers_length = u32::from_le_bytes(payload[position + 32..position + 36].try_into()?);
let state = MessageState::from_code(payload[position + 8])?;
let timestamp = u64::from_le_bytes(payload[position + 9..position + 17].try_into()?);
let id = u128::from_le_bytes(payload[position + 17..position + 33].try_into()?);
let checksum = u32::from_le_bytes(payload[position + 33..position + 37].try_into()?);
let headers_length = u32::from_le_bytes(payload[position + 37..position + 41].try_into()?);
let headers = if headers_length > 0 {
let headers_payload = &payload[position + 36..position + 36 + headers_length as usize];
let headers_payload = &payload[position + 41..position + 41 + headers_length as usize];
Some(HashMap::from_bytes(headers_payload)?)
} else {
None
};
position += headers_length as usize;
let message_length = u32::from_le_bytes(payload[position + 36..position + 40].try_into()?);

let payload_range = position + 40..position + 40 + message_length as usize;
let message_length = u32::from_le_bytes(payload[position + 41..position + 45].try_into()?);
let payload_range = position + 45..position + 45 + message_length as usize;
if payload_range.start > length || payload_range.end > length {
break;
}

let payload = payload[payload_range].to_vec();
let total_size = 40 + message_length as usize;
let total_size = 45 + message_length as usize;
position += total_size;
messages.push(Message {
offset,
timestamp,
state,
checksum,
id,
headers,
length: message_length,
payload,
payload: Bytes::from(payload),
});

if position + 40 >= length {
if position + 45 >= length {
break;
}
}
Expand Down
4 changes: 3 additions & 1 deletion iggy/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,13 @@ impl MessageClient for IggyClient {
let payload = encryptor.decrypt(&message.payload)?;
decrypted_messages.push(Message {
id: message.id,
state: message.state,
checksum: message.checksum,
offset: message.offset,
timestamp: message.timestamp,
length: payload.len() as u32,
headers: message.headers,
payload,
payload: Bytes::from(payload),
});
}
Ok(decrypted_messages)
Expand Down
7 changes: 4 additions & 3 deletions iggy/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::topics::create_topic::CreateTopic;
use crate::topics::delete_topic::DeleteTopic;
use crate::topics::get_topic::GetTopic;
use crate::topics::get_topics::GetTopics;
use bytes::BufMut;
use std::fmt::{Display, Formatter};
use std::str::FromStr;

Expand Down Expand Up @@ -217,7 +218,7 @@ impl BytesSerializable for Command {

fn as_bytes(command: u32, payload: &[u8]) -> Vec<u8> {
let mut bytes = Vec::with_capacity(4 + payload.len());
bytes.extend(command.to_le_bytes());
bytes.put_u32_le(command);
bytes.extend(payload);
bytes
}
Expand Down Expand Up @@ -593,7 +594,7 @@ mod tests {
) {
let payload = payload.as_bytes();
let mut bytes = Vec::with_capacity(4 + payload.len());
bytes.extend(command_id.to_le_bytes());
bytes.put_u32_le(command_id);
bytes.extend(payload);
assert_eq!(command.as_bytes(), bytes);
}
Expand All @@ -605,7 +606,7 @@ mod tests {
) {
let payload = payload.as_bytes();
let mut bytes = Vec::with_capacity(4 + payload.len());
bytes.extend(command_id.to_le_bytes());
bytes.put_u32_le(command_id);
bytes.extend(payload);
assert_eq!(&Command::from_bytes(&bytes).unwrap(), command);
}
Expand Down
2 changes: 1 addition & 1 deletion iggy/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl BytesSerializable for Consumer {
fn as_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(5);
bytes.put_u8(self.kind.as_code());
bytes.extend(self.id.to_le_bytes());
bytes.put_u32_le(self.id);
bytes
}

Expand Down
5 changes: 3 additions & 2 deletions iggy/src/consumer_groups/create_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::command::CommandPayload;
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;
Expand Down Expand Up @@ -66,7 +67,7 @@ impl BytesSerializable for CreateConsumerGroup {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(self.consumer_group_id.to_le_bytes());
bytes.put_u32_le(self.consumer_group_id);
bytes
}

Expand Down Expand Up @@ -138,7 +139,7 @@ mod tests {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(consumer_group_id.to_le_bytes());
bytes.put_u32_le(consumer_group_id);
let command = CreateConsumerGroup::from_bytes(&bytes);
assert!(command.is_ok());

Expand Down
5 changes: 3 additions & 2 deletions iggy/src/consumer_groups/delete_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::command::CommandPayload;
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;
Expand Down Expand Up @@ -66,7 +67,7 @@ impl BytesSerializable for DeleteConsumerGroup {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(self.consumer_group_id.to_le_bytes());
bytes.put_u32_le(self.consumer_group_id);
bytes
}

Expand Down Expand Up @@ -138,7 +139,7 @@ mod tests {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(consumer_group_id.to_le_bytes());
bytes.put_u32_le(consumer_group_id);
let command = DeleteConsumerGroup::from_bytes(&bytes);
assert!(command.is_ok());

Expand Down
5 changes: 3 additions & 2 deletions iggy/src/consumer_groups/get_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::command::CommandPayload;
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;
Expand Down Expand Up @@ -66,7 +67,7 @@ impl BytesSerializable for GetConsumerGroup {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(self.consumer_group_id.to_le_bytes());
bytes.put_u32_le(self.consumer_group_id);
bytes
}

Expand Down Expand Up @@ -138,7 +139,7 @@ mod tests {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(consumer_group_id.to_le_bytes());
bytes.put_u32_le(consumer_group_id);
let command = GetConsumerGroup::from_bytes(&bytes);
assert!(command.is_ok());

Expand Down
5 changes: 3 additions & 2 deletions iggy/src/consumer_groups/join_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::command::CommandPayload;
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;
Expand Down Expand Up @@ -66,7 +67,7 @@ impl BytesSerializable for JoinConsumerGroup {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(self.consumer_group_id.to_le_bytes());
bytes.put_u32_le(self.consumer_group_id);
bytes
}

Expand Down Expand Up @@ -138,7 +139,7 @@ mod tests {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(consumer_group_id.to_le_bytes());
bytes.put_u32_le(consumer_group_id);
let command = JoinConsumerGroup::from_bytes(&bytes);
assert!(command.is_ok());

Expand Down
5 changes: 3 additions & 2 deletions iggy/src/consumer_groups/leave_consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::command::CommandPayload;
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;
Expand Down Expand Up @@ -66,7 +67,7 @@ impl BytesSerializable for LeaveConsumerGroup {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(self.consumer_group_id.to_le_bytes());
bytes.put_u32_le(self.consumer_group_id);
bytes
}

Expand Down Expand Up @@ -138,7 +139,7 @@ mod tests {
let mut bytes = Vec::with_capacity(4 + stream_id_bytes.len() + topic_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(consumer_group_id.to_le_bytes());
bytes.put_u32_le(consumer_group_id);
let command = LeaveConsumerGroup::from_bytes(&bytes);
assert!(command.is_ok());

Expand Down
5 changes: 3 additions & 2 deletions iggy/src/consumer_offsets/get_consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::consumer::{Consumer, ConsumerKind};
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;
Expand Down Expand Up @@ -82,7 +83,7 @@ impl BytesSerializable for GetConsumerOffset {
bytes.extend(consumer_bytes);
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(self.partition_id.to_le_bytes());
bytes.put_u32_le(self.partition_id);
bytes
}

Expand Down Expand Up @@ -176,7 +177,7 @@ mod tests {
bytes.extend(consumer_bytes);
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(partition_id.to_le_bytes());
bytes.put_u32_le(partition_id);

let command = GetConsumerOffset::from_bytes(&bytes);
assert!(command.is_ok());
Expand Down
9 changes: 5 additions & 4 deletions iggy/src/consumer_offsets/store_consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::consumer::{Consumer, ConsumerKind};
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;
Expand Down Expand Up @@ -81,8 +82,8 @@ impl BytesSerializable for StoreConsumerOffset {
bytes.extend(consumer_bytes);
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(self.partition_id.to_le_bytes());
bytes.extend(self.offset.to_le_bytes());
bytes.put_u32_le(self.partition_id);
bytes.put_u64_le(self.offset);
bytes
}

Expand Down Expand Up @@ -182,8 +183,8 @@ mod tests {
bytes.extend(consumer_bytes);
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(partition_id.to_le_bytes());
bytes.extend(offset.to_le_bytes());
bytes.put_u32_le(partition_id);
bytes.put_u64_le(offset);

let command = StoreConsumerOffset::from_bytes(&bytes);
assert!(command.is_ok());
Expand Down
2 changes: 1 addition & 1 deletion iggy/src/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl BytesSerializable for Identifier {
fn as_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(2 + self.length as usize);
bytes.put_u8(self.kind.as_code());
bytes.extend(self.length.to_le_bytes());
bytes.put_u8(self.length);
bytes.extend(&self.value);
bytes
}
Expand Down
1 change: 0 additions & 1 deletion iggy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub mod consumer;
pub mod consumer_groups;
pub mod consumer_offsets;
pub mod error;
pub mod header;
pub mod http;
pub mod identifier;
pub mod messages;
Expand Down
16 changes: 8 additions & 8 deletions iggy/src/messages/poll_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ impl BytesSerializable for PollMessages {
bytes.extend(consumer_bytes);
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(self.partition_id.to_le_bytes());
bytes.put_u32_le(self.partition_id);
bytes.extend(strategy_bytes);
bytes.extend(self.count.to_le_bytes());
bytes.put_u32_le(self.count);
if self.auto_commit {
bytes.extend(1u8.to_le_bytes());
bytes.put_u8(1);
} else {
bytes.extend(0u8.to_le_bytes());
bytes.put_u8(0);
}

bytes
Expand Down Expand Up @@ -341,7 +341,7 @@ impl BytesSerializable for PollingStrategy {
fn as_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(9);
bytes.put_u8(self.kind.as_code());
bytes.extend(self.value.to_le_bytes());
bytes.put_u64_le(self.value);
bytes
}

Expand Down Expand Up @@ -435,10 +435,10 @@ mod tests {
bytes.extend(consumer_bytes);
bytes.extend(stream_id_bytes);
bytes.extend(topic_id_bytes);
bytes.extend(partition_id.to_le_bytes());
bytes.put_u32_le(partition_id);
bytes.extend(strategy_bytes);
bytes.extend(count.to_le_bytes());
bytes.extend(auto_commit.to_le_bytes());
bytes.put_u32_le(count);
bytes.put_u8(auto_commit);

let command = PollMessages::from_bytes(&bytes);
assert!(command.is_ok());
Expand Down

0 comments on commit b07f23d

Please sign in to comment.