Skip to content

Commit

Permalink
Implement new server storage
Browse files Browse the repository at this point in the history
BREAKING_CHANGES for API and SDK but comes with a built-in data migration for existing Sled DB
  • Loading branch information
spetz committed Jun 25, 2024
1 parent a6230e8 commit 3733b3e
Show file tree
Hide file tree
Showing 218 changed files with 5,461 additions and 3,369 deletions.
25 changes: 17 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bench/src/benchmarks/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::error::IggyError;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use integration::test_server::{login_root, ClientFactory};
use std::{pin::Pin, sync::Arc};
use tracing::info;
Expand Down Expand Up @@ -90,7 +91,7 @@ pub trait Benchmarkable {
None,
None,
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cli"
version = "0.20.1"
version = "0.3.0"
edition = "2021"
authors = ["bartosz.ciesla@gmail.com"]
repository = "https://github.com/iggy-rs/iggy"
Expand Down
6 changes: 3 additions & 3 deletions cli/src/args/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::args::common::ListMode;
use clap::{Args, Subcommand};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::identifier::Identifier;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;

#[derive(Debug, Clone, Subcommand)]
pub(crate) enum TopicAction {
Expand Down Expand Up @@ -102,7 +102,7 @@ pub(crate) struct TopicCreateArgs {
/// ("unlimited" or skipping parameter disables max topic size functionality in topic)
/// Can't be lower than segment size in the config.
#[arg(short, long, default_value = "unlimited", verbatim_doc_comment)]
pub(crate) max_topic_size: IggyByteSize,
pub(crate) max_topic_size: MaxTopicSize,
/// Replication factor for the topic
#[arg(short, long, default_value = "1")]
pub(crate) replication_factor: u8,
Expand Down Expand Up @@ -149,7 +149,7 @@ pub(crate) struct TopicUpdateArgs {
/// ("unlimited" or skipping parameter causes removal of max topic size parameter in topic)
/// Can't be lower than segment size in the config.
#[arg(short, long, default_value = "unlimited", verbatim_doc_comment)]
pub(crate) max_topic_size: IggyByteSize,
pub(crate) max_topic_size: MaxTopicSize,
#[arg(short, long, default_value = "1")]
/// New replication factor for the topic
pub(crate) replication_factor: u8,
Expand Down
6 changes: 1 addition & 5 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"iggy.rs"
],
"access_token_expiry": "1 h",
"refresh_token_expiry": "1 d",
"clock_skew": "5 s",
"not_before": "0 s",
"encoding_secret": "top_secret$iggy.rs$_jwt_HS256_key#!",
Expand Down Expand Up @@ -94,9 +93,6 @@
},
"system": {
"path": "local_data",
"database": {
"path": "database"
},
"backup": {
"path": "backup",
"compatibility": {
Expand All @@ -118,7 +114,7 @@
"size": "4 GB"
},
"retention_policy": {
"message_expiry": "disabled",
"message_expiry": "none",
"max_topic_size": "10 GB"
},
"encryption": {
Expand Down
17 changes: 7 additions & 10 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ valid_audiences = ["iggy.rs"]
# Expiry time for access tokens.
access_token_expiry = "1 h"

# Expiry time for refresh tokens.
refresh_token_expiry = "1 d"

# Tolerance for timing discrepancies during token validation.
clock_skew = "5 s"

Expand Down Expand Up @@ -233,11 +230,11 @@ path = "backup"
# Subpath of the backup directory where converted segment data is stored after compatibility conversion.
path = "compatibility"

# Database configuration.
[system.database]
# Path for storing database files.
# Specifies the directory where database files are stored, relative to `system.path`.
path = "database"
# Legacy database configuration - used only for the migration purposes.
#[system.database]
## Path for storing database files.
## Specifies the directory where database files are stored, relative to `system.path`.
#path = "database"

# Runtime configuration.
[system.runtime]
Expand Down Expand Up @@ -275,10 +272,10 @@ size = "4 GB"
# Data retention policy configuration.
[system.retention_policy]
# Configures the message time-based expiry setting.
# "disabled" means messages are kept indefinitely.
# "none" means messages are kept indefinitely.
# A time value in human-readable format determines the lifespan of messages.
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
message_expiry = "disabled"
message_expiry = "none"

# Configures the topic size-based expiry setting.
# "unlimited" or "0" means topics are kept indefinitely.
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy_examples"
version = "0.0.3"
version = "0.0.4"
edition = "2021"

[[example]]
Expand Down
3 changes: 2 additions & 1 deletion examples/src/getting-started/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::users::defaults::*;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use std::env;
use std::error::Error;
use std::str::FromStr;
Expand Down Expand Up @@ -53,7 +54,7 @@ async fn init_system(client: &IggyClient) {
None,
Some(TOPIC_ID),
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await
{
Expand Down
23 changes: 17 additions & 6 deletions examples/src/shared/messages.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use iggy::utils::timestamp::IggyTimestamp;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug};

Expand Down Expand Up @@ -41,7 +42,7 @@ pub struct OrderCreated {
pub price: f64,
pub quantity: f64,
pub side: String,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
}

impl Debug for OrderCreated {
Expand All @@ -52,7 +53,7 @@ impl Debug for OrderCreated {
.field("price", &format!("{:.2}", self.price))
.field("quantity", &format!("{:.2}", self.quantity))
.field("side", &self.side)
.field("timestamp", &self.timestamp)
.field("timestamp", &self.timestamp.to_micros())
.finish()
}
}
Expand All @@ -61,26 +62,36 @@ impl Debug for OrderCreated {
pub struct OrderConfirmed {
pub order_id: u64,
pub price: f64,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
}

impl Debug for OrderConfirmed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OrderConfirmed")
.field("order_id", &self.order_id)
.field("price", &format!("{:.2}", self.price))
.field("timestamp", &self.timestamp)
.field("timestamp", &self.timestamp.to_micros())
.finish()
}
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Deserialize, Serialize)]
pub struct OrderRejected {
pub order_id: u64,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
pub reason: String,
}

impl Debug for OrderRejected {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OrderRejected")
.field("order_id", &self.order_id)
.field("timestamp", &self.timestamp.to_micros())
.field("reason", &self.reason)
.finish()
}
}

impl SerializableMessage for OrderCreated {
fn get_message_type(&self) -> &str {
ORDER_CREATED_TYPE
Expand Down
8 changes: 4 additions & 4 deletions examples/src/shared/messages_generator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::shared::messages::{OrderConfirmed, OrderCreated, OrderRejected, SerializableMessage};
use crate::shared::utils;
use iggy::utils::timestamp::IggyTimestamp;
use rand::rngs::ThreadRng;
use rand::Rng;

Expand Down Expand Up @@ -32,7 +32,7 @@ impl MessagesGenerator {
self.order_id += 1;
Box::new(OrderCreated {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
currency_pair: CURRENCY_PAIRS[self.rng.gen_range(0..CURRENCY_PAIRS.len())].to_string(),
price: self.rng.gen_range(10.0..=1000.0),
quantity: self.rng.gen_range(0.1..=1.0),
Expand All @@ -47,15 +47,15 @@ impl MessagesGenerator {
fn generate_order_confirmed(&mut self) -> Box<dyn SerializableMessage> {
Box::new(OrderConfirmed {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
price: self.rng.gen_range(10.0..=1000.0),
})
}

fn generate_order_rejected(&mut self) -> Box<dyn SerializableMessage> {
Box::new(OrderRejected {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
reason: match self.rng.gen_range(0..=1) {
0 => "cancelled_by_user",
_ => "other",
Expand Down
1 change: 0 additions & 1 deletion examples/src/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ pub mod args;
pub mod messages;
pub mod messages_generator;
pub mod system;
pub mod utils;
3 changes: 2 additions & 1 deletion examples/src/shared/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use iggy::messages::poll_messages::PollingStrategy;
use iggy::models::messages::PolledMessage;
use iggy::users::defaults::*;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use tracing::info;

type MessageHandler = dyn Fn(&PolledMessage) -> Result<(), Box<dyn std::error::Error>>;
Expand Down Expand Up @@ -74,7 +75,7 @@ pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), Ig
None,
Some(args.topic_id),
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await?;
Ok(())
Expand Down
8 changes: 0 additions & 8 deletions examples/src/shared/utils.rs

This file was deleted.

1 change: 0 additions & 1 deletion integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ predicates = "3.1.0"
regex = "1.10.4"
serial_test = "3.1.1"
server = { path = "../server" }
sled = "0.34.7"
tempfile = "3.10.1"
tokio = { version = "1.38.0", features = ["full"] }
tracing-subscriber = "0.3.18"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use assert_cmd::assert::Assert;
use async_trait::async_trait;
use iggy::client::Client;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use predicates::str::diff;
use serial_test::parallel;

Expand Down Expand Up @@ -83,7 +84,7 @@ impl IggyCmdTestCase for TestConsumerGroupCreateCmd {
None,
Some(self.topic_id),
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await;
assert!(topic.is_ok());
Expand Down
Loading

0 comments on commit 3733b3e

Please sign in to comment.