Skip to content

Commit

Permalink
Extended CreateTopic command with optional message expiry, improved c…
Browse files Browse the repository at this point in the history
…reate topic and stream commands by adding name length to the binary serialization, improved stream and topic details structs
  • Loading branch information
spetz committed Aug 27, 2023
1 parent ee870f3 commit ea3bf9c
Show file tree
Hide file tree
Showing 23 changed files with 152 additions and 37 deletions.
4 changes: 2 additions & 2 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ Get stream details (ID 1):

`stream.get|1`

Create a topic named `sample` with ID 1 and 2 partitions (IDs 1 and 2) for stream `dev` (ID 1):
Create a topic named `sample` with ID 1, 2 partitions (IDs 1 and 2) and disabled message expiry (0 seconds) for stream `dev` (ID 1):

`topic.create|1|1|2|sample`
`topic.create|1|1|2|0|sample`

List available topics for stream `dev` (ID 1):

Expand Down
1 change: 1 addition & 0 deletions bench/src/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub async fn init_streams(
topic_id,
partitions_count,
name,
message_expiry: None,
})
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion iggy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.0.41"
version = "0.0.50"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
27 changes: 17 additions & 10 deletions iggy/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ fn map_to_stream(payload: &[u8], position: usize) -> Result<(Stream, usize), Err
let topics_count = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let size_bytes = u64::from_le_bytes(payload[position + 8..position + 16].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 16..position + 24].try_into()?);
let name_length =
u32::from_le_bytes(payload[position + 24..position + 28].try_into()?) as usize;
let name = from_utf8(&payload[position + 28..position + 28 + name_length])?.to_string();
let read_bytes = 4 + 4 + 8 + 8 + 4 + name_length;
let name_length = payload[position + 24];
let name =
from_utf8(&payload[position + 25..position + 25 + name_length as usize])?.to_string();
let read_bytes = 4 + 4 + 8 + 8 + 1 + name_length as usize;
Ok((
Stream {
id,
Expand Down Expand Up @@ -286,6 +286,7 @@ pub fn map_topic(payload: &[u8]) -> Result<TopicDetails, Error> {
name: topic.name,
size_bytes: topic.size_bytes,
messages_count: topic.messages_count,
message_expiry: topic.message_expiry,
partitions_count: partitions.len() as u32,
partitions,
};
Expand All @@ -295,18 +296,24 @@ pub fn map_topic(payload: &[u8]) -> Result<TopicDetails, Error> {
fn map_to_topic(payload: &[u8], position: usize) -> Result<(Topic, usize), Error> {
let id = u32::from_le_bytes(payload[position..position + 4].try_into()?);
let partitions_count = u32::from_le_bytes(payload[position + 4..position + 8].try_into()?);
let size_bytes = u64::from_le_bytes(payload[position + 8..position + 16].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 16..position + 24].try_into()?);
let name_length =
u32::from_le_bytes(payload[position + 24..position + 28].try_into()?) as usize;
let name = from_utf8(&payload[position + 28..position + 28 + name_length])?.to_string();
let read_bytes = 4 + 4 + 8 + 8 + 4 + name_length;
let message_expiry = u32::from_le_bytes(payload[position + 8..position + 12].try_into()?);
let message_expiry = match message_expiry {
0 => None,
_ => Some(message_expiry),
};
let size_bytes = u64::from_le_bytes(payload[position + 12..position + 20].try_into()?);
let messages_count = u64::from_le_bytes(payload[position + 20..position + 28].try_into()?);
let name_length = payload[position + 28];
let name =
from_utf8(&payload[position + 29..position + 29 + name_length as usize])?.to_string();
let read_bytes = 4 + 4 + 4 + 8 + 8 + 1 + name_length as usize;
Ok((
Topic {
id,
partitions_count,
size_bytes,
messages_count,
message_expiry,
name,
},
read_bytes,
Expand Down
2 changes: 2 additions & 0 deletions iggy/src/models/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub struct Topic {
pub id: u32,
pub name: String,
pub size_bytes: u64,
pub message_expiry: Option<u32>,
pub messages_count: u64,
pub partitions_count: u32,
}
Expand All @@ -15,6 +16,7 @@ pub struct TopicDetails {
pub id: u32,
pub name: String,
pub size_bytes: u64,
pub message_expiry: Option<u32>,
pub messages_count: u64,
pub partitions_count: u32,
pub partitions: Vec<Partition>,
Expand Down
20 changes: 15 additions & 5 deletions iggy/src/streams/create_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,25 @@ impl FromStr for CreateStream {

impl BytesSerializable for CreateStream {
fn as_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(4 + self.name.len());
let mut bytes = Vec::with_capacity(5 + self.name.len());
bytes.put_u32_le(self.stream_id);
bytes.put_u8(self.name.len() as u8);
bytes.extend(self.name.as_bytes());
bytes
}

fn from_bytes(bytes: &[u8]) -> Result<CreateStream, Error> {
if bytes.len() < 5 {
if bytes.len() < 6 {
return Err(Error::InvalidCommand);
}

let stream_id = u32::from_le_bytes(bytes[..4].try_into()?);
let name = from_utf8(&bytes[4..])?.to_string();
let name_length = bytes[4];
let name = from_utf8(&bytes[5..5 + name_length as usize])?.to_string();
if name.len() != name_length as usize {
return Err(Error::InvalidCommand);
}

let command = CreateStream { stream_id, name };
command.validate()?;
Ok(command)
Expand All @@ -101,7 +107,8 @@ mod tests {

let bytes = command.as_bytes();
let stream_id = u32::from_le_bytes(bytes[..4].try_into().unwrap());
let name = from_utf8(&bytes[4..]).unwrap();
let name_length = bytes[4];
let name = from_utf8(&bytes[5..5 + name_length as usize]).unwrap();

assert!(!bytes.is_empty());
assert_eq!(stream_id, command.stream_id);
Expand All @@ -112,7 +119,10 @@ mod tests {
fn should_be_deserialized_from_bytes() {
let stream_id = 1u32;
let name = "test".to_string();
let bytes = [&stream_id.to_le_bytes(), name.as_bytes()].concat();
let mut bytes = Vec::new();
bytes.put_u32_le(stream_id);
bytes.put_u8(name.len() as u8);
bytes.extend(name.as_bytes());
let command = CreateStream::from_bytes(&bytes);
assert!(command.is_ok());

Expand Down
73 changes: 64 additions & 9 deletions iggy/src/topics/create_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct CreateTopic {
pub stream_id: Identifier,
pub topic_id: u32,
pub partitions_count: u32,
pub message_expiry: Option<u32>,
pub name: String,
}

Expand All @@ -29,6 +30,7 @@ impl Default for CreateTopic {
stream_id: Identifier::default(),
topic_id: 1,
partitions_count: 1,
message_expiry: None,
name: "topic".to_string(),
}
}
Expand Down Expand Up @@ -60,18 +62,27 @@ impl FromStr for CreateTopic {
type Err = Error;
fn from_str(input: &str) -> Result<Self, Self::Err> {
let parts = input.split('|').collect::<Vec<&str>>();
if parts.len() != 4 {
if parts.len() != 5 {
return Err(Error::InvalidCommand);
}

let stream_id = parts[0].parse::<Identifier>()?;
let topic_id = parts[1].parse::<u32>()?;
let partitions_count = parts[2].parse::<u32>()?;
let name = parts[3].to_string();
let message_expiry = parts[3].parse::<u32>();
let message_expiry = match message_expiry {
Ok(message_expiry) => match message_expiry {
0 => None,
_ => Some(message_expiry),
},
Err(_) => None,
};
let name = parts[4].to_string();
let command = CreateTopic {
stream_id,
topic_id,
partitions_count,
message_expiry,
name,
};
command.validate()?;
Expand All @@ -82,16 +93,21 @@ impl FromStr for CreateTopic {
impl BytesSerializable for CreateTopic {
fn as_bytes(&self) -> Vec<u8> {
let stream_id_bytes = self.stream_id.as_bytes();
let mut bytes = Vec::with_capacity(8 + stream_id_bytes.len() + self.name.len());
let mut bytes = Vec::with_capacity(13 + stream_id_bytes.len() + self.name.len());
bytes.extend(stream_id_bytes);
bytes.put_u32_le(self.topic_id);
bytes.put_u32_le(self.partitions_count);
match self.message_expiry {
Some(message_expiry) => bytes.put_u32_le(message_expiry),
None => bytes.put_u32_le(0),
}
bytes.put_u8(self.name.len() as u8);
bytes.extend(self.name.as_bytes());
bytes
}

fn from_bytes(bytes: &[u8]) -> Result<CreateTopic, Error> {
if bytes.len() < 13 {
if bytes.len() < 17 {
return Err(Error::InvalidCommand);
}

Expand All @@ -100,11 +116,22 @@ impl BytesSerializable for CreateTopic {
position += stream_id.get_size_bytes() as usize;
let topic_id = u32::from_le_bytes(bytes[position..position + 4].try_into()?);
let partitions_count = u32::from_le_bytes(bytes[position + 4..position + 8].try_into()?);
let name = from_utf8(&bytes[position + 8..])?.to_string();
let message_expiry = u32::from_le_bytes(bytes[position + 8..position + 12].try_into()?);
let message_expiry = match message_expiry {
0 => None,
_ => Some(message_expiry),
};
let name_length = bytes[position + 12];
let name =
from_utf8(&bytes[position + 13..position + 13 + name_length as usize])?.to_string();
if name.len() != name_length as usize {
return Err(Error::InvalidCommand);
}
let command = CreateTopic {
stream_id,
topic_id,
partitions_count,
message_expiry,
name,
};
command.validate()?;
Expand All @@ -116,8 +143,12 @@ impl Display for CreateTopic {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}|{}|{}|{}",
self.stream_id, self.topic_id, self.partitions_count, self.name
"{}|{}|{}|{}|{}",
self.stream_id,
self.topic_id,
self.partitions_count,
self.message_expiry.unwrap_or(0),
self.name
)
}
}
Expand All @@ -133,6 +164,7 @@ mod tests {
stream_id: Identifier::numeric(1).unwrap(),
topic_id: 2,
partitions_count: 3,
message_expiry: Some(10),
name: "test".to_string(),
};

Expand All @@ -143,12 +175,23 @@ mod tests {
let topic_id = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap());
let partitions_count =
u32::from_le_bytes(bytes[position + 4..position + 8].try_into().unwrap());
let name = from_utf8(&bytes[position + 8..]).unwrap().to_string();
let message_expiry =
u32::from_le_bytes(bytes[position + 8..position + 12].try_into().unwrap());
let message_expiry = match message_expiry {
0 => None,
_ => Some(message_expiry),
};
let name_length = bytes[position + 12];
let name = from_utf8(&bytes[position + 13..position + 13 + name_length as usize])
.unwrap()
.to_string();

assert!(!bytes.is_empty());
assert_eq!(stream_id, command.stream_id);
assert_eq!(topic_id, command.topic_id);
assert_eq!(partitions_count, command.partitions_count);
assert_eq!(message_expiry, command.message_expiry);
assert_eq!(name.len() as u8, command.name.len() as u8);
assert_eq!(name, command.name);
}

Expand All @@ -158,12 +201,15 @@ mod tests {
let topic_id = 2u32;
let partitions_count = 3u32;
let name = "test".to_string();
let message_expiry = Some(10);

let stream_id_bytes = stream_id.as_bytes();
let mut bytes = Vec::with_capacity(8 + stream_id_bytes.len() + name.len());
bytes.extend(stream_id_bytes);
bytes.put_u32_le(topic_id);
bytes.put_u32_le(partitions_count);
bytes.put_u32_le(message_expiry.unwrap());
bytes.put_u8(name.len() as u8);
bytes.extend(name.as_bytes());

let command = CreateTopic::from_bytes(&bytes);
Expand All @@ -181,15 +227,24 @@ mod tests {
let stream_id = Identifier::numeric(1).unwrap();
let topic_id = 2u32;
let partitions_count = 3u32;
let message_expiry = Some(10);
let name = "test".to_string();
let input = format!("{}|{}|{}|{}", stream_id, topic_id, partitions_count, name);
let input = format!(
"{}|{}|{}|{}|{}",
stream_id,
topic_id,
partitions_count,
message_expiry.unwrap(),
name
);
let command = CreateTopic::from_str(&input);
assert!(command.is_ok());

let command = command.unwrap();
assert_eq!(command.stream_id, stream_id);
assert_eq!(command.topic_id, topic_id);
assert_eq!(command.partitions_count, partitions_count);
assert_eq!(command.message_expiry, message_expiry);
assert_eq!(command.name, name);
}
}
1 change: 1 addition & 0 deletions samples/src/getting-started/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn init_system(client: &dyn Client) {
topic_id: TOPIC_ID,
partitions_count: 1,
name: "sample-topic".to_string(),
message_expiry: None,
})
.await
{
Expand Down
1 change: 1 addition & 0 deletions samples/src/shared/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), Er
topic_id: args.topic_id,
partitions_count: args.partition_id,
name: "orders".to_string(),
message_expiry: None,
})
.await?;
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion server/server.http
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ Content-Type: application/json
{
"topic_id": {{topic_id}},
"name": "topic1",
"partitions_count": 3
"partitions_count": 3,
"message_expiry": 0
}

###
Expand Down
7 changes: 6 additions & 1 deletion server/src/binary/handlers/topics/create_topic_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ pub async fn handle(
let mut system = system.write().await;
system
.get_stream_mut(&command.stream_id)?
.create_topic(command.topic_id, &command.name, command.partitions_count)
.create_topic(
command.topic_id,
&command.name,
command.partitions_count,
command.message_expiry,
)
.await?;
sender.send_empty_ok_response().await?;
Ok(())
Expand Down
8 changes: 6 additions & 2 deletions server/src/binary/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,20 @@ async fn extend_stream(stream: &Stream, bytes: &mut Vec<u8>) {
bytes.put_u32_le(stream.get_topics().len() as u32);
bytes.put_u64_le(stream.get_size_bytes().await);
bytes.put_u64_le(stream.get_messages_count().await);
bytes.put_u32_le(stream.name.len() as u32);
bytes.put_u8(stream.name.len() as u8);
bytes.extend(stream.name.as_bytes());
}

async fn extend_topic(topic: &Topic, bytes: &mut Vec<u8>) {
bytes.put_u32_le(topic.id);
bytes.put_u32_le(topic.get_partitions().len() as u32);
match topic.message_expiry {
Some(message_expiry) => bytes.put_u32_le(message_expiry),
None => bytes.put_u32_le(0),
};
bytes.put_u64_le(topic.get_size_bytes().await);
bytes.put_u64_le(topic.get_messages_count().await);
bytes.put_u32_le(topic.name.len() as u32);
bytes.put_u8(topic.name.len() as u8);
bytes.extend(topic.name.as_bytes());
}

Expand Down

0 comments on commit ea3bf9c

Please sign in to comment.