Skip to content

Commit

Permalink
PolledMessage response model, added created_at to stream, topic, part…
Browse files Browse the repository at this point in the history
…ition, improved handlers and mappers, tests
  • Loading branch information
spetz committed Aug 31, 2023
1 parent a540296 commit 670a8ba
Show file tree
Hide file tree
Showing 55 changed files with 251 additions and 172 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
lto = true

[workspace]
resolver = "2"

members = [
"bench",
Expand Down
16 changes: 8 additions & 8 deletions bench/src/benchmarks/poll_messages_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,32 @@ pub async fn run(
poll_messages.strategy.value = offset;

let latency_start = Instant::now();
let messages = client.poll_messages(&poll_messages).await;
let polled_messages = client.poll_messages(&poll_messages).await;
let latency_end = latency_start.elapsed();
if messages.is_err() {
if polled_messages.is_err() {
trace!("Offset: {} is not available yet, retrying...", offset);
continue;
}

let messages = messages.unwrap();
if messages.is_empty() {
let polled_messages = polled_messages.unwrap();
if polled_messages.messages.is_empty() {
trace!("Messages are empty for offset: {}, retrying...", offset);
continue;
}

if messages.len() != args.messages_per_batch as usize {
if polled_messages.messages.len() != args.messages_per_batch as usize {
trace!(
"Consumer #{} → expected {} messages, but got {} messages, retrying...",
consumer_id,
args.messages_per_batch,
messages.len()
polled_messages.messages.len()
);
continue;
}

latencies.push(latency_end);
received_messages += messages.len() as u64;
for message in messages {
received_messages += polled_messages.messages.len() as u64;
for message in polled_messages.messages {
total_size_bytes += message.get_size_bytes() as u64;
}
current_iteration += 1;
Expand Down
8 changes: 4 additions & 4 deletions cli/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ pub async fn poll_messages(
client: &dyn Client,
) -> Result<(), ClientError> {
let format = command.format;
let messages = client.poll_messages(&command.payload).await?;
if messages.is_empty() {
let polled_messages = client.poll_messages(&command.payload).await?;
if polled_messages.messages.is_empty() {
info!("No messages found");
return Ok(());
}

let mut text = format!("Received {} messages.", messages.len());
let mut text = format!("Received {} messages.", polled_messages.messages.len());
if format == Format::None {
info!("{}", text);
return Ok(());
}

for message in messages {
for message in polled_messages.messages {
text += &format!(
"\noffset: {}, timestamp: {}, ID: {}, length: {}, payload: ",
message.offset, message.timestamp, message.id, message.length
Expand Down
2 changes: 1 addition & 1 deletion iggy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.0.53"
version = "0.0.60"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion iggy/src/binary/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pub async fn get_consumer_offset(
let response = client
.send_with_response(GET_CONSUMER_OFFSET_CODE, &command.as_bytes())
.await?;
mapper::map_offset(&response)
mapper::map_consumer_offset(&response)
}
68 changes: 44 additions & 24 deletions iggy/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::error::Error;
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::message::{Message, MessageState};
use crate::models::messages::{Message, MessageState, PolledMessages};
use crate::models::partition::Partition;
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
Expand Down Expand Up @@ -85,7 +85,7 @@ pub fn map_stats(payload: &[u8]) -> Result<Stats, Error> {
})
}

pub fn map_offset(payload: &[u8]) -> Result<ConsumerOffsetInfo, Error> {
pub fn map_consumer_offset(payload: &[u8]) -> Result<ConsumerOffsetInfo, Error> {
let partition_id = u32::from_le_bytes(payload[..4].try_into()?);
let current_offset = u64::from_le_bytes(payload[4..12].try_into()?);
let stored_offset = u64::from_le_bytes(payload[12..20].try_into()?);
Expand Down Expand Up @@ -144,13 +144,21 @@ pub fn map_clients(payload: &[u8]) -> Result<Vec<ClientInfo>, Error> {
Ok(clients)
}

pub fn map_messages(payload: &[u8]) -> Result<Vec<Message>, Error> {
pub fn map_polled_messages(payload: &[u8]) -> Result<PolledMessages, Error> {
if payload.is_empty() {
return Ok(EMPTY_MESSAGES);
return Ok(PolledMessages {
messages: EMPTY_MESSAGES,
partition_id: 0,
current_offset: 0,
});
}

let length = payload.len();
let mut position = 4;
let partition_id = u32::from_le_bytes(payload[..4].try_into()?);
let current_offset = u64::from_le_bytes(payload[4..12].try_into()?);
// Currently ignored
let _messages_count = u32::from_le_bytes(payload[12..16].try_into()?);
let mut position = 16;
let mut messages = Vec::new();
while position < length {
let offset = u64::from_le_bytes(payload[position..position + 8].try_into()?);
Expand Down Expand Up @@ -192,7 +200,11 @@ pub fn map_messages(payload: &[u8]) -> Result<Vec<Message>, Error> {
}

messages.sort_by(|x, y| x.offset.cmp(&y.offset));
Ok(messages)
Ok(PolledMessages {
messages,
partition_id,
current_offset,
})
}

pub fn map_streams(payload: &[u8]) -> Result<Vec<Stream>, Error> {
Expand Down Expand Up @@ -225,6 +237,7 @@ pub fn map_stream(payload: &[u8]) -> Result<StreamDetails, Error> {
topics.sort_by(|x, y| x.id.cmp(&y.id));
let stream = StreamDetails {
id: stream.id,
created_at: stream.created_at,
topics_count: stream.topics_count,
size_bytes: stream.size_bytes,
messages_count: stream.messages_count,
Expand All @@ -236,16 +249,18 @@ pub fn map_stream(payload: &[u8]) -> Result<StreamDetails, Error> {

fn map_to_stream(payload: &[u8], position: usize) -> Result<(Stream, usize), Error> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let topics_count = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let size_bytes = u64::from_le_bytes(payload[position + 8..position + 16].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 16..position + 24].try_into()?);
let name_length = payload[position + 24];
let created_at = u64::from_le_bytes(payload[position + 4..position + 12].try_into()?);
let topics_count = u32::from_le_bytes(payload[position + 12..position + 16].try_into()?);
let size_bytes = u64::from_le_bytes(payload[position + 16..position + 24].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 24..position + 32].try_into()?);
let name_length = payload[position + 32];
let name =
from_utf8(&payload[position + 25..position + 25 + name_length as usize])?.to_string();
let read_bytes = 4 + 4 + 8 + 8 + 1 + name_length as usize;
from_utf8(&payload[position + 33..position + 33 + name_length as usize])?.to_string();
let read_bytes = 4 + 8 + 4 + 8 + 8 + 1 + name_length as usize;
Ok((
Stream {
id,
created_at,
size_bytes,
messages_count,
topics_count,
Expand Down Expand Up @@ -285,6 +300,7 @@ pub fn map_topic(payload: &[u8]) -> Result<TopicDetails, Error> {
partitions.sort_by(|x, y| x.id.cmp(&y.id));
let topic = TopicDetails {
id: topic.id,
created_at: topic.created_at,
name: topic.name,
size_bytes: topic.size_bytes,
messages_count: topic.messages_count,
Expand All @@ -297,21 +313,23 @@ pub fn map_topic(payload: &[u8]) -> Result<TopicDetails, Error> {

fn map_to_topic(payload: &[u8], position: usize) -> Result<(Topic, usize), Error> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let partitions_count = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let message_expiry = u32::from_le_bytes(payload[position + 8..position + 12].try_into()?);
let created_at = u64::from_le_bytes(payload[position + 4..position + 12].try_into()?);
let partitions_count = u32::from_le_bytes(payload[position + 12..position + 16].try_into()?);
let message_expiry = u32::from_le_bytes(payload[position + 16..position + 20].try_into()?);
let message_expiry = match message_expiry {
0 => None,
_ => Some(message_expiry),
};
let size_bytes = u64::from_le_bytes(payload[position + 12..position + 20].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 20..position + 28].try_into()?);
let name_length = payload[position + 28];
let size_bytes = u64::from_le_bytes(payload[position + 20..position + 28].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 28..position + 36].try_into()?);
let name_length = payload[position + 36];
let name =
from_utf8(&payload[position + 29..position + 29 + name_length as usize])?.to_string();
let read_bytes = 4 + 4 + 4 + 8 + 8 + 1 + name_length as usize;
from_utf8(&payload[position + 37..position + 37 + name_length as usize])?.to_string();
let read_bytes = 4 + 8 + 4 + 4 + 8 + 8 + 1 + name_length as usize;
Ok((
Topic {
id,
created_at,
partitions_count,
size_bytes,
messages_count,
Expand All @@ -324,14 +342,16 @@ fn map_to_topic(payload: &[u8], position: usize) -> Result<(Topic, usize), Error

fn map_to_partition(payload: &[u8], position: usize) -> Result<(Partition, usize), Error> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let segments_count = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let current_offset = u64::from_le_bytes(payload[position + 8..position + 16].try_into()?);
let size_bytes = u64::from_le_bytes(payload[position + 16..position + 24].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 24..position + 32].try_into()?);
let read_bytes = 4 + 4 + 8 + 8 + 8;
let created_at = u64::from_le_bytes(payload[position + 4..position + 12].try_into()?);
let segments_count = u32::from_le_bytes(payload[position + 12..position + 16].try_into()?);
let current_offset = u64::from_le_bytes(payload[position + 16..position + 24].try_into()?);
let size_bytes = u64::from_le_bytes(payload[position + 24..position + 32].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 32..position + 40].try_into()?);
let read_bytes = 4 + 8 + 4 + 8 + 8 + 8;
Ok((
Partition {
id,
created_at,
segments_count,
current_offset,
size_bytes,
Expand Down
6 changes: 3 additions & 3 deletions iggy/src/binary/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use crate::command::{POLL_MESSAGES_CODE, SEND_MESSAGES_CODE};
use crate::error::Error;
use crate::messages::poll_messages::PollMessages;
use crate::messages::send_messages::SendMessages;
use crate::models::message::Message;
use crate::models::messages::PolledMessages;

pub async fn poll_messages(
client: &dyn BinaryClient,
command: &PollMessages,
) -> Result<Vec<Message>, Error> {
) -> Result<PolledMessages, Error> {
let response = client
.send_with_response(POLL_MESSAGES_CODE, &command.as_bytes())
.await?;
mapper::map_messages(&response)
mapper::map_polled_messages(&response)
}

pub async fn send_messages(client: &dyn BinaryClient, command: &SendMessages) -> Result<(), Error> {
Expand Down
4 changes: 2 additions & 2 deletions iggy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::messages::send_messages::SendMessages;
use crate::models::client_info::{ClientInfo, ClientInfoDetails};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::message::Message;
use crate::models::messages::PolledMessages;
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
Expand Down Expand Up @@ -84,7 +84,7 @@ pub trait PartitionClient {

#[async_trait]
pub trait MessageClient {
async fn poll_messages(&self, command: &PollMessages) -> Result<Vec<Message>, Error>;
async fn poll_messages(&self, command: &PollMessages) -> Result<PolledMessages, Error>;
async fn send_messages(&self, command: &mut SendMessages) -> Result<(), Error>;
}

Expand Down
30 changes: 9 additions & 21 deletions iggy/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::messages::send_messages::{Partitioning, PartitioningKind, SendMessage
use crate::models::client_info::{ClientInfo, ClientInfoDetails};
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails};
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
use crate::models::message::Message;
use crate::models::messages::{Message, PolledMessages};
use crate::models::stats::Stats;
use crate::models::stream::{Stream, StreamDetails};
use crate::models::topic::{Topic, TopicDetails};
Expand Down Expand Up @@ -182,13 +182,13 @@ impl IggyClient {
loop {
sleep(interval).await;
let client = client.read().await;
let messages = client.poll_messages(&poll_messages).await;
if let Err(error) = messages {
let polled_messages = client.poll_messages(&poll_messages).await;
if let Err(error) = polled_messages {
error!("There was an error while polling messages: {:?}", error);
continue;
}

let messages = messages.unwrap();
let messages = polled_messages.unwrap().messages;
if messages.is_empty() {
continue;
}
Expand Down Expand Up @@ -429,27 +429,15 @@ impl PartitionClient for IggyClient {

#[async_trait]
impl MessageClient for IggyClient {
async fn poll_messages(&self, command: &PollMessages) -> Result<Vec<Message>, Error> {
let messages = self.client.read().await.poll_messages(command).await?;
async fn poll_messages(&self, command: &PollMessages) -> Result<PolledMessages, Error> {
let mut polled_messages = self.client.read().await.poll_messages(command).await?;
if let Some(ref encryptor) = self.encryptor {
let mut decrypted_messages = Vec::with_capacity(messages.len());
for message in messages {
for message in polled_messages.messages.iter_mut() {
let payload = encryptor.decrypt(&message.payload)?;
decrypted_messages.push(Message {
id: message.id,
state: message.state,
checksum: message.checksum,
offset: message.offset,
timestamp: message.timestamp,
length: payload.len() as u32,
headers: message.headers,
payload: Bytes::from(payload),
});
message.payload = Bytes::from(payload);
}
Ok(decrypted_messages)
} else {
Ok(messages)
}
Ok(polled_messages)
}

async fn send_messages(&self, command: &mut SendMessages) -> Result<(), Error> {
Expand Down
4 changes: 2 additions & 2 deletions iggy/src/http/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use crate::error::Error;
use crate::http::client::HttpClient;
use crate::messages::poll_messages::PollMessages;
use crate::messages::send_messages::SendMessages;
use crate::models::message::Message;
use crate::models::messages::PolledMessages;
use async_trait::async_trait;

#[async_trait]
impl MessageClient for HttpClient {
async fn poll_messages(&self, command: &PollMessages) -> Result<Vec<Message>, Error> {
async fn poll_messages(&self, command: &PollMessages) -> Result<PolledMessages, Error> {
let response = self
.get_with_query(
&get_path(
Expand Down
2 changes: 1 addition & 1 deletion iggy/src/messages/send_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ mod tests {
let message_1 = Message::from_str("hello 1").unwrap();
let message_2 = Message::from_str("2|hello 2").unwrap();
let message_3 = Message::from_str("3|hello 3").unwrap();
let messages = vec![
let messages = [
message_1.as_bytes(),
message_2.as_bytes(),
message_3.as_bytes(),
Expand Down
7 changes: 7 additions & 0 deletions iggy/src/models/message.rs → iggy/src/models/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::str::FromStr;

#[derive(Debug, Serialize, Deserialize)]
pub struct PolledMessages {
pub partition_id: u32,
pub current_offset: u64,
pub messages: Vec<Message>,
}

#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct Message {
Expand Down
2 changes: 1 addition & 1 deletion iggy/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod client_info;
pub mod consumer_group;
pub mod consumer_offset_info;
pub mod header;
pub mod message;
pub mod messages;
pub mod partition;
pub mod stats;
pub mod stream;
Expand Down
1 change: 1 addition & 0 deletions iggy/src/models/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Partition {
pub id: u32,
pub created_at: u64,
pub segments_count: u32,
pub current_offset: u64,
pub size_bytes: u64,
Expand Down

0 comments on commit 670a8ba

Please sign in to comment.