Skip to content

Commit

Permalink
Consumer identifier instead of u32 closes #94 (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz committed Oct 2, 2023
1 parent 0bf4cc5 commit 655f9b6
Show file tree
Hide file tree
Showing 17 changed files with 75 additions and 61 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bench/src/benchmarks/poll_messages_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn run(
);

let mut poll_messages = PollMessages {
consumer: Consumer::new(consumer_id),
consumer: Consumer::new(Identifier::numeric(consumer_id).unwrap()),
stream_id: Identifier::numeric(stream_id).unwrap(),
topic_id: Identifier::numeric(topic_id).unwrap(),
partition_id: Some(partition_id),
Expand Down
2 changes: 1 addition & 1 deletion examples/src/basic/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn consume_messages(args: &Args, client: &dyn Client) -> Result<(), Box<dy
.poll_messages(&PollMessages {
consumer: Consumer {
kind: ConsumerKind::from_code(args.consumer_kind)?,
id: args.consumer_id,
id: Identifier::numeric(args.consumer_id).unwrap(),
},
stream_id: Identifier::numeric(args.stream_id)?,
topic_id: Identifier::numeric(args.topic_id)?,
Expand Down
2 changes: 1 addition & 1 deletion examples/src/message-envelope/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn consume_messages(args: &Args, client: &IggyClient) -> Result<(), Box<dy
PollMessages {
consumer: Consumer {
kind: ConsumerKind::from_code(args.consumer_kind)?,
id: args.consumer_id,
id: Identifier::numeric(args.consumer_id).unwrap(),
},
stream_id: Identifier::numeric(args.stream_id)?,
topic_id: Identifier::numeric(args.topic_id)?,
Expand Down
2 changes: 1 addition & 1 deletion examples/src/message-headers/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn consume_messages(args: &Args, client: &IggyClient) -> Result<(), Box<dy
PollMessages {
consumer: Consumer {
kind: ConsumerKind::from_code(args.consumer_kind)?,
id: args.consumer_id,
id: Identifier::numeric(args.consumer_id).unwrap(),
},
stream_id: Identifier::numeric(args.stream_id)?,
topic_id: Identifier::numeric(args.topic_id)?,
Expand Down
2 changes: 1 addition & 1 deletion iggy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.0.91"
version = "0.0.92"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
24 changes: 13 additions & 11 deletions iggy/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::bytes_serializable::BytesSerializable;
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use bytes::BufMut;
use serde::{Deserialize, Serialize};
Expand All @@ -14,7 +15,7 @@ pub struct Consumer {
pub kind: ConsumerKind,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "default_id")]
pub id: u32,
pub id: Identifier,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Copy, Clone)]
Expand All @@ -25,8 +26,8 @@ pub enum ConsumerKind {
ConsumerGroup,
}

fn default_id() -> u32 {
0
fn default_id() -> Identifier {
Identifier::numeric(1).unwrap()
}

impl Validatable<Error> for Consumer {
Expand All @@ -39,18 +40,18 @@ impl Consumer {
pub fn from_consumer(consumer: &Consumer) -> Self {
Self {
kind: consumer.kind,
id: consumer.id,
id: consumer.id.clone(),
}
}

pub fn new(id: u32) -> Self {
pub fn new(id: Identifier) -> Self {
Self {
kind: ConsumerKind::Consumer,
id,
}
}

pub fn group(id: u32) -> Self {
pub fn group(id: Identifier) -> Self {
Self {
kind: ConsumerKind::ConsumerGroup,
id,
Expand All @@ -60,22 +61,23 @@ impl Consumer {

impl BytesSerializable for Consumer {
fn as_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(5);
let id_bytes = self.id.as_bytes();
let mut bytes = Vec::with_capacity(1 + id_bytes.len());
bytes.put_u8(self.kind.as_code());
bytes.put_u32_le(self.id);
bytes.extend(id_bytes);
bytes
}

fn from_bytes(bytes: &[u8]) -> Result<Self, Error>
where
Self: Sized,
{
if bytes.len() != 5 {
if bytes.len() < 4 {
return Err(Error::InvalidCommand);
}

let kind = ConsumerKind::from_code(bytes[0])?;
let id = u32::from_le_bytes(bytes[1..5].try_into()?);
let id = Identifier::from_bytes(&bytes[1..])?;
let consumer = Consumer { kind, id };
consumer.validate()?;
Ok(consumer)
Expand Down Expand Up @@ -108,7 +110,7 @@ impl FromStr for Consumer {
}

let kind = parts[0].parse::<ConsumerKind>()?;
let id = parts[1].parse::<u32>()?;
let id = parts[1].parse::<Identifier>()?;
let consumer = Consumer { kind, id };
consumer.validate()?;
Ok(consumer)
Expand Down
16 changes: 8 additions & 8 deletions iggy/src/consumer_offsets/get_consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl FromStr for GetConsumerOffset {
}

let consumer_kind = ConsumerKind::from_str(parts[0])?;
let consumer_id = parts[1].parse::<u32>()?;
let consumer_id = parts[1].parse::<Identifier>()?;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
Expand Down Expand Up @@ -98,12 +98,12 @@ impl BytesSerializable for GetConsumerOffset {

let mut position = 0;
let consumer_kind = ConsumerKind::from_code(bytes[0])?;
let consumer_id = u32::from_le_bytes(bytes[1..5].try_into()?);
let consumer_id = Identifier::from_bytes(&bytes[1..])?;
position += 1 + consumer_id.get_size_bytes() as usize;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
};
position += 5;
let stream_id = Identifier::from_bytes(&bytes[position..])?;
position += stream_id.get_size_bytes() as usize;
let topic_id = Identifier::from_bytes(&bytes[position..])?;
Expand Down Expand Up @@ -145,7 +145,7 @@ mod tests {
#[test]
fn should_be_serialized_as_bytes() {
let command = GetConsumerOffset {
consumer: Consumer::new(1),
consumer: Consumer::new(Identifier::numeric(1).unwrap()),
stream_id: Identifier::numeric(2).unwrap(),
topic_id: Identifier::numeric(3).unwrap(),
partition_id: Some(4),
Expand All @@ -154,12 +154,12 @@ mod tests {
let bytes = command.as_bytes();
let mut position = 0;
let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap();
let consumer_id = u32::from_le_bytes(bytes[1..5].try_into().unwrap());
let consumer_id = Identifier::from_bytes(&bytes[1..]).unwrap();
position += 1 + consumer_id.get_size_bytes() as usize;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
};
position += 5;
let stream_id = Identifier::from_bytes(&bytes[position..]).unwrap();
position += stream_id.get_size_bytes() as usize;
let topic_id = Identifier::from_bytes(&bytes[position..]).unwrap();
Expand All @@ -175,7 +175,7 @@ mod tests {

#[test]
fn should_be_deserialized_from_bytes() {
let consumer = Consumer::new(1);
let consumer = Consumer::new(Identifier::numeric(1).unwrap());
let stream_id = Identifier::numeric(2).unwrap();
let topic_id = Identifier::numeric(3).unwrap();
let partition_id = 4u32;
Expand Down Expand Up @@ -203,7 +203,7 @@ mod tests {

#[test]
fn should_be_read_from_string() {
let consumer = Consumer::new(1);
let consumer = Consumer::new(Identifier::numeric(1).unwrap());
let stream_id = Identifier::numeric(2).unwrap();
let topic_id = Identifier::numeric(3).unwrap();
let partition_id = 4u32;
Expand Down
16 changes: 8 additions & 8 deletions iggy/src/consumer_offsets/store_consumer_offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl FromStr for StoreConsumerOffset {
}

let consumer_kind = ConsumerKind::from_str(parts[0])?;
let consumer_id = parts[1].parse::<u32>()?;
let consumer_id = parts[1].parse::<Identifier>()?;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
Expand Down Expand Up @@ -98,12 +98,12 @@ impl BytesSerializable for StoreConsumerOffset {

let mut position = 0;
let consumer_kind = ConsumerKind::from_code(bytes[0])?;
let consumer_id = u32::from_le_bytes(bytes[1..5].try_into()?);
let consumer_id = Identifier::from_bytes(&bytes[1..])?;
position += 1 + consumer_id.get_size_bytes() as usize;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
};
position += 5;
let stream_id = Identifier::from_bytes(&bytes[position..])?;
position += stream_id.get_size_bytes() as usize;
let topic_id = Identifier::from_bytes(&bytes[position..])?;
Expand Down Expand Up @@ -148,7 +148,7 @@ mod tests {
#[test]
fn should_be_serialized_as_bytes() {
let command = StoreConsumerOffset {
consumer: Consumer::new(1),
consumer: Consumer::new(Identifier::numeric(1).unwrap()),
stream_id: Identifier::numeric(2).unwrap(),
topic_id: Identifier::numeric(3).unwrap(),
partition_id: Some(4),
Expand All @@ -158,12 +158,12 @@ mod tests {
let bytes = command.as_bytes();
let mut position = 0;
let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap();
let consumer_id = u32::from_le_bytes(bytes[1..5].try_into().unwrap());
let consumer_id = Identifier::from_bytes(&bytes[1..]).unwrap();
position += 1 + consumer_id.get_size_bytes() as usize;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
};
position += 5;
let stream_id = Identifier::from_bytes(&bytes[position..]).unwrap();
position += stream_id.get_size_bytes() as usize;
let topic_id = Identifier::from_bytes(&bytes[position..]).unwrap();
Expand All @@ -181,7 +181,7 @@ mod tests {

#[test]
fn should_be_deserialized_from_bytes() {
let consumer = Consumer::new(1);
let consumer = Consumer::new(Identifier::numeric(1).unwrap());
let stream_id = Identifier::numeric(2).unwrap();
let topic_id = Identifier::numeric(3).unwrap();
let partition_id = 4u32;
Expand Down Expand Up @@ -212,7 +212,7 @@ mod tests {

#[test]
fn should_be_read_from_string() {
let consumer = Consumer::new(1);
let consumer = Consumer::new(Identifier::numeric(1).unwrap());
let stream_id = Identifier::numeric(2).unwrap();
let topic_id = Identifier::numeric(3).unwrap();
let partition_id = 4u32;
Expand Down
16 changes: 8 additions & 8 deletions iggy/src/messages/poll_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl FromStr for PollMessages {
}

let consumer_kind = ConsumerKind::from_str(parts[0])?;
let consumer_id = parts[1].parse::<u32>()?;
let consumer_id = parts[1].parse::<Identifier>()?;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
Expand Down Expand Up @@ -272,12 +272,12 @@ impl BytesSerializable for PollMessages {

let mut position = 0;
let consumer_kind = ConsumerKind::from_code(bytes[0])?;
let consumer_id = u32::from_le_bytes(bytes[1..5].try_into()?);
let consumer_id = Identifier::from_bytes(&bytes[1..])?;
position += 1 + consumer_id.get_size_bytes() as usize;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
};
position += 5;
let stream_id = Identifier::from_bytes(&bytes[position..])?;
position += stream_id.get_size_bytes() as usize;
let topic_id = Identifier::from_bytes(&bytes[position..])?;
Expand Down Expand Up @@ -368,7 +368,7 @@ mod tests {
#[test]
fn should_be_serialized_as_bytes() {
let command = PollMessages {
consumer: Consumer::new(1),
consumer: Consumer::new(Identifier::numeric(1).unwrap()),
stream_id: Identifier::numeric(2).unwrap(),
topic_id: Identifier::numeric(3).unwrap(),
partition_id: Some(4),
Expand All @@ -380,12 +380,12 @@ mod tests {
let bytes = command.as_bytes();
let mut position = 0;
let consumer_kind = ConsumerKind::from_code(bytes[0]).unwrap();
let consumer_id = u32::from_le_bytes(bytes[1..5].try_into().unwrap());
let consumer_id = Identifier::from_bytes(&bytes[1..]).unwrap();
position += 1 + consumer_id.get_size_bytes() as usize;
let consumer = Consumer {
kind: consumer_kind,
id: consumer_id,
};
position += 5;
let stream_id = Identifier::from_bytes(&bytes[position..]).unwrap();
position += stream_id.get_size_bytes() as usize;
let topic_id = Identifier::from_bytes(&bytes[position..]).unwrap();
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {

#[test]
fn should_be_deserialized_from_bytes() {
let consumer = Consumer::new(1);
let consumer = Consumer::new(Identifier::numeric(1).unwrap());
let stream_id = Identifier::numeric(2).unwrap();
let topic_id = Identifier::numeric(3).unwrap();
let partition_id = 4u32;
Expand Down Expand Up @@ -457,7 +457,7 @@ mod tests {

#[test]
fn should_be_read_from_string() {
let consumer = Consumer::new(1);
let consumer = Consumer::new(Identifier::numeric(1).unwrap());
let stream_id = Identifier::numeric(2).unwrap();
let topic_id = Identifier::numeric(3).unwrap();
let partition_id = 4u32;
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.0.32"
version = "0.0.33"
edition = "2021"
build = "src/build.rs"

Expand Down
7 changes: 4 additions & 3 deletions server/src/http/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ async fn get_consumer_offset(
query.stream_id = Identifier::from_str_value(&stream_id)?;
query.topic_id = Identifier::from_str_value(&topic_id)?;
query.validate()?;
let consumer = PollingConsumer::Consumer(query.consumer.id, query.partition_id.unwrap_or(0));
let consumer_id = PollingConsumer::resolve_consumer_id(&query.consumer.id);
let consumer = PollingConsumer::Consumer(consumer_id, query.partition_id.unwrap_or(0));
let system = state.system.read().await;
let offset = system
.get_consumer_offset(
Expand All @@ -51,8 +52,8 @@ async fn store_consumer_offset(
command.stream_id = Identifier::from_str_value(&stream_id)?;
command.topic_id = Identifier::from_str_value(&topic_id)?;
command.validate()?;
let consumer =
PollingConsumer::Consumer(command.consumer.id, command.partition_id.unwrap_or(0));
let consumer_id = PollingConsumer::resolve_consumer_id(&command.consumer.id);
let consumer = PollingConsumer::Consumer(consumer_id, command.partition_id.unwrap_or(0));
let system = state.system.read().await;
system
.store_consumer_offset(
Expand Down
3 changes: 2 additions & 1 deletion server/src/http/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ async fn poll_messages(
query.validate()?;

let partition_id = query.partition_id.unwrap_or(0);
let consumer = PollingConsumer::Consumer(query.consumer.id, partition_id);
let consumer_id = PollingConsumer::resolve_consumer_id(&query.consumer.id);
let consumer = PollingConsumer::Consumer(consumer_id, partition_id);
let system = state.system.read().await;
let polled_messages = system
.poll_messages(
Expand Down

0 comments on commit 655f9b6

Please sign in to comment.