Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new server storage #1016

Merged
merged 13 commits into from
Jul 1, 2024
25 changes: 17 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bench/src/benchmarks/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::error::IggyError;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use integration::test_server::{login_root, ClientFactory};
use std::{pin::Pin, sync::Arc};
use tracing::info;
Expand Down Expand Up @@ -90,7 +91,7 @@ pub trait Benchmarkable {
None,
None,
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cli"
version = "0.20.1"
version = "0.3.0"
edition = "2021"
authors = ["bartosz.ciesla@gmail.com"]
repository = "https://github.com/iggy-rs/iggy"
Expand Down
6 changes: 3 additions & 3 deletions cli/src/args/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::args::common::ListMode;
use clap::{Args, Subcommand};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::identifier::Identifier;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;

#[derive(Debug, Clone, Subcommand)]
pub(crate) enum TopicAction {
Expand Down Expand Up @@ -102,7 +102,7 @@ pub(crate) struct TopicCreateArgs {
/// ("unlimited" or skipping parameter disables max topic size functionality in topic)
/// Can't be lower than segment size in the config.
#[arg(short, long, default_value = "unlimited", verbatim_doc_comment)]
pub(crate) max_topic_size: IggyByteSize,
pub(crate) max_topic_size: MaxTopicSize,
/// Replication factor for the topic
#[arg(short, long, default_value = "1")]
pub(crate) replication_factor: u8,
Expand Down Expand Up @@ -149,7 +149,7 @@ pub(crate) struct TopicUpdateArgs {
/// ("unlimited" or skipping parameter causes removal of max topic size parameter in topic)
/// Can't be lower than segment size in the config.
#[arg(short, long, default_value = "unlimited", verbatim_doc_comment)]
pub(crate) max_topic_size: IggyByteSize,
pub(crate) max_topic_size: MaxTopicSize,
#[arg(short, long, default_value = "1")]
/// New replication factor for the topic
pub(crate) replication_factor: u8,
Expand Down
6 changes: 1 addition & 5 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"iggy.rs"
],
"access_token_expiry": "1 h",
"refresh_token_expiry": "1 d",
"clock_skew": "5 s",
"not_before": "0 s",
"encoding_secret": "top_secret$iggy.rs$_jwt_HS256_key#!",
Expand Down Expand Up @@ -94,9 +93,6 @@
},
"system": {
"path": "local_data",
"database": {
"path": "database"
},
"backup": {
"path": "backup",
"compatibility": {
Expand All @@ -118,7 +114,7 @@
"size": "4 GB"
},
"retention_policy": {
"message_expiry": "disabled",
"message_expiry": "none",
"max_topic_size": "10 GB"
},
"encryption": {
Expand Down
17 changes: 7 additions & 10 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ valid_audiences = ["iggy.rs"]
# Expiry time for access tokens.
access_token_expiry = "1 h"

# Expiry time for refresh tokens.
refresh_token_expiry = "1 d"

# Tolerance for timing discrepancies during token validation.
clock_skew = "5 s"

Expand Down Expand Up @@ -233,11 +230,11 @@ path = "backup"
# Subpath of the backup directory where converted segment data is stored after compatibility conversion.
path = "compatibility"

# Database configuration.
[system.database]
# Path for storing database files.
# Specifies the directory where database files are stored, relative to `system.path`.
path = "database"
# Legacy database configuration - used only for the migration purposes.
#[system.database]
## Path for storing database files.
## Specifies the directory where database files are stored, relative to `system.path`.
#path = "database"

# Runtime configuration.
[system.runtime]
Expand Down Expand Up @@ -275,10 +272,10 @@ size = "4 GB"
# Data retention policy configuration.
[system.retention_policy]
# Configures the message time-based expiry setting.
# "disabled" means messages are kept indefinitely.
# "none" means messages are kept indefinitely.
# A time value in human-readable format determines the lifespan of messages.
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
message_expiry = "disabled"
message_expiry = "none"

# Configures the topic size-based expiry setting.
# "unlimited" or "0" means topics are kept indefinitely.
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy_examples"
version = "0.0.3"
version = "0.0.4"
edition = "2021"

[[example]]
Expand Down
3 changes: 2 additions & 1 deletion examples/src/getting-started/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::users::defaults::*;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use std::env;
use std::error::Error;
use std::str::FromStr;
Expand Down Expand Up @@ -53,7 +54,7 @@ async fn init_system(client: &IggyClient) {
None,
Some(TOPIC_ID),
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await
{
Expand Down
23 changes: 17 additions & 6 deletions examples/src/shared/messages.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use iggy::utils::timestamp::IggyTimestamp;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug};

Expand Down Expand Up @@ -41,7 +42,7 @@ pub struct OrderCreated {
pub price: f64,
pub quantity: f64,
pub side: String,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
}

impl Debug for OrderCreated {
Expand All @@ -52,7 +53,7 @@ impl Debug for OrderCreated {
.field("price", &format!("{:.2}", self.price))
.field("quantity", &format!("{:.2}", self.quantity))
.field("side", &self.side)
.field("timestamp", &self.timestamp)
.field("timestamp", &self.timestamp.as_micros())
.finish()
}
}
Expand All @@ -61,26 +62,36 @@ impl Debug for OrderCreated {
pub struct OrderConfirmed {
pub order_id: u64,
pub price: f64,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
}

impl Debug for OrderConfirmed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OrderConfirmed")
.field("order_id", &self.order_id)
.field("price", &format!("{:.2}", self.price))
.field("timestamp", &self.timestamp)
.field("timestamp", &self.timestamp.as_micros())
.finish()
}
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Deserialize, Serialize)]
pub struct OrderRejected {
pub order_id: u64,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
pub reason: String,
}

impl Debug for OrderRejected {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OrderRejected")
.field("order_id", &self.order_id)
.field("timestamp", &self.timestamp.as_micros())
.field("reason", &self.reason)
.finish()
}
}

impl SerializableMessage for OrderCreated {
fn get_message_type(&self) -> &str {
ORDER_CREATED_TYPE
Expand Down
8 changes: 4 additions & 4 deletions examples/src/shared/messages_generator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::shared::messages::{OrderConfirmed, OrderCreated, OrderRejected, SerializableMessage};
use crate::shared::utils;
use iggy::utils::timestamp::IggyTimestamp;
use rand::rngs::ThreadRng;
use rand::Rng;

Expand Down Expand Up @@ -32,7 +32,7 @@ impl MessagesGenerator {
self.order_id += 1;
Box::new(OrderCreated {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
currency_pair: CURRENCY_PAIRS[self.rng.gen_range(0..CURRENCY_PAIRS.len())].to_string(),
price: self.rng.gen_range(10.0..=1000.0),
quantity: self.rng.gen_range(0.1..=1.0),
Expand All @@ -47,15 +47,15 @@ impl MessagesGenerator {
fn generate_order_confirmed(&mut self) -> Box<dyn SerializableMessage> {
Box::new(OrderConfirmed {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
price: self.rng.gen_range(10.0..=1000.0),
})
}

fn generate_order_rejected(&mut self) -> Box<dyn SerializableMessage> {
Box::new(OrderRejected {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
reason: match self.rng.gen_range(0..=1) {
0 => "cancelled_by_user",
_ => "other",
Expand Down
1 change: 0 additions & 1 deletion examples/src/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ pub mod args;
pub mod messages;
pub mod messages_generator;
pub mod system;
pub mod utils;
3 changes: 2 additions & 1 deletion examples/src/shared/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use iggy::messages::poll_messages::PollingStrategy;
use iggy::models::messages::PolledMessage;
use iggy::users::defaults::*;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use tracing::info;

type MessageHandler = dyn Fn(&PolledMessage) -> Result<(), Box<dyn std::error::Error>>;
Expand Down Expand Up @@ -74,7 +75,7 @@ pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), Ig
None,
Some(args.topic_id),
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await?;
Ok(())
Expand Down
8 changes: 0 additions & 8 deletions examples/src/shared/utils.rs

This file was deleted.

1 change: 0 additions & 1 deletion integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ predicates = "3.1.0"
regex = "1.10.4"
serial_test = "3.1.1"
server = { path = "../server" }
sled = "0.34.7"
tempfile = "3.10.1"
tokio = { version = "1.38.0", features = ["full"] }
tracing-subscriber = "0.3.18"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use assert_cmd::assert::Assert;
use async_trait::async_trait;
use iggy::client::Client;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use predicates::str::diff;
use serial_test::parallel;

Expand Down Expand Up @@ -83,7 +84,7 @@ impl IggyCmdTestCase for TestConsumerGroupCreateCmd {
None,
Some(self.topic_id),
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await;
assert!(topic.is_ok());
Expand Down
Loading
Loading