Skip to content

Commit

Permalink
Implement new server storage (#1016)
Browse files Browse the repository at this point in the history
This is a new, custom implementation of the server storage based on a
"command-sourcing" log, which is required for the future replication
purposes. The existing streams/topic/partitions/segments storage remains
unchanged. This implementation removes the usage of Sled embedded DB and
comes with a built-in migration.
  • Loading branch information
spetz committed Jul 1, 2024
1 parent 0982a7a commit faf7f9f
Show file tree
Hide file tree
Showing 294 changed files with 6,825 additions and 4,704 deletions.
26 changes: 18 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bench/src/args/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use iggy::utils::duration::IggyDuration;
use integration::test_server::Transport;
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand All @@ -32,7 +33,7 @@ pub struct IggyBenchArgs {
pub verbose: bool,

/// Warmup time
#[arg(long, short = 'w', default_value_t = IggyDuration::from(DEFAULT_WARMUP_TIME_SECONDS))]
#[arg(long, short = 'w', default_value_t = IggyDuration::from_str(DEFAULT_WARMUP_TIME).unwrap())]
pub warmup_time: IggyDuration,

/// Skip server start
Expand Down
2 changes: 1 addition & 1 deletion bench/src/args/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ pub const DEFAULT_PERFORM_CLEANUP: bool = false;
pub const DEFAULT_SERVER_SYSTEM_PATH: &str = "local_data";
pub const DEFAULT_SERVER_STDOUT_VISIBILITY: bool = false;

pub const DEFAULT_WARMUP_TIME_SECONDS: u64 = 1;
pub const DEFAULT_WARMUP_TIME: &str = "1 s";
pub const DEFAULT_SKIP_SERVER_START: bool = false;
3 changes: 2 additions & 1 deletion bench/src/benchmarks/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::error::IggyError;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use integration::test_server::{login_root, ClientFactory};
use std::{pin::Pin, sync::Arc};
use tracing::info;
Expand Down Expand Up @@ -90,7 +91,7 @@ pub trait Benchmarkable {
None,
None,
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cli"
version = "0.20.1"
version = "0.3.0"
edition = "2021"
authors = ["bartosz.ciesla@gmail.com"]
repository = "https://github.com/iggy-rs/iggy"
Expand Down
6 changes: 3 additions & 3 deletions cli/src/args/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::args::common::ListMode;
use clap::{Args, Subcommand};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::identifier::Identifier;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;

#[derive(Debug, Clone, Subcommand)]
pub(crate) enum TopicAction {
Expand Down Expand Up @@ -102,7 +102,7 @@ pub(crate) struct TopicCreateArgs {
/// ("unlimited" or skipping parameter disables max topic size functionality in topic)
/// Can't be lower than segment size in the config.
#[arg(short, long, default_value = "unlimited", verbatim_doc_comment)]
pub(crate) max_topic_size: IggyByteSize,
pub(crate) max_topic_size: MaxTopicSize,
/// Replication factor for the topic
#[arg(short, long, default_value = "1")]
pub(crate) replication_factor: u8,
Expand Down Expand Up @@ -149,7 +149,7 @@ pub(crate) struct TopicUpdateArgs {
/// ("unlimited" or skipping parameter causes removal of max topic size parameter in topic)
/// Can't be lower than segment size in the config.
#[arg(short, long, default_value = "unlimited", verbatim_doc_comment)]
pub(crate) max_topic_size: IggyByteSize,
pub(crate) max_topic_size: MaxTopicSize,
#[arg(short, long, default_value = "1")]
/// New replication factor for the topic
pub(crate) replication_factor: u8,
Expand Down
6 changes: 1 addition & 5 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"iggy.rs"
],
"access_token_expiry": "1 h",
"refresh_token_expiry": "1 d",
"clock_skew": "5 s",
"not_before": "0 s",
"encoding_secret": "top_secret$iggy.rs$_jwt_HS256_key#!",
Expand Down Expand Up @@ -94,9 +93,6 @@
},
"system": {
"path": "local_data",
"database": {
"path": "database"
},
"backup": {
"path": "backup",
"compatibility": {
Expand All @@ -118,7 +114,7 @@
"size": "4 GB"
},
"retention_policy": {
"message_expiry": "disabled",
"message_expiry": "none",
"max_topic_size": "10 GB"
},
"encryption": {
Expand Down
17 changes: 7 additions & 10 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ valid_audiences = ["iggy.rs"]
# Expiry time for access tokens.
access_token_expiry = "1 h"

# Expiry time for refresh tokens.
refresh_token_expiry = "1 d"

# Tolerance for timing discrepancies during token validation.
clock_skew = "5 s"

Expand Down Expand Up @@ -233,11 +230,11 @@ path = "backup"
# Subpath of the backup directory where converted segment data is stored after compatibility conversion.
path = "compatibility"

# Database configuration.
[system.database]
# Path for storing database files.
# Specifies the directory where database files are stored, relative to `system.path`.
path = "database"
# Legacy database configuration - used only for the migration purposes.
#[system.database]
## Path for storing database files.
## Specifies the directory where database files are stored, relative to `system.path`.
#path = "database"

# Runtime configuration.
[system.runtime]
Expand Down Expand Up @@ -275,10 +272,10 @@ size = "4 GB"
# Data retention policy configuration.
[system.retention_policy]
# Configures the message time-based expiry setting.
# "disabled" means messages are kept indefinitely.
# "none" means messages are kept indefinitely.
# A time value in human-readable format determines the lifespan of messages.
# Example: `message_expiry = "2 days 4 hours 15 minutes"` means messages will expire after that duration.
message_expiry = "disabled"
message_expiry = "none"

# Configures the topic size-based expiry setting.
# "unlimited" or "0" means topics are kept indefinitely.
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy_examples"
version = "0.0.3"
version = "0.0.4"
edition = "2021"

[[example]]
Expand Down
3 changes: 2 additions & 1 deletion examples/src/getting-started/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::users::defaults::*;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use std::env;
use std::error::Error;
use std::str::FromStr;
Expand Down Expand Up @@ -53,7 +54,7 @@ async fn init_system(client: &IggyClient) {
None,
Some(TOPIC_ID),
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await
{
Expand Down
23 changes: 17 additions & 6 deletions examples/src/shared/messages.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use iggy::utils::timestamp::IggyTimestamp;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug};

Expand Down Expand Up @@ -41,7 +42,7 @@ pub struct OrderCreated {
pub price: f64,
pub quantity: f64,
pub side: String,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
}

impl Debug for OrderCreated {
Expand All @@ -52,7 +53,7 @@ impl Debug for OrderCreated {
.field("price", &format!("{:.2}", self.price))
.field("quantity", &format!("{:.2}", self.quantity))
.field("side", &self.side)
.field("timestamp", &self.timestamp)
.field("timestamp", &self.timestamp.as_micros())
.finish()
}
}
Expand All @@ -61,26 +62,36 @@ impl Debug for OrderCreated {
pub struct OrderConfirmed {
pub order_id: u64,
pub price: f64,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
}

impl Debug for OrderConfirmed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OrderConfirmed")
.field("order_id", &self.order_id)
.field("price", &format!("{:.2}", self.price))
.field("timestamp", &self.timestamp)
.field("timestamp", &self.timestamp.as_micros())
.finish()
}
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Deserialize, Serialize)]
pub struct OrderRejected {
pub order_id: u64,
pub timestamp: u64,
pub timestamp: IggyTimestamp,
pub reason: String,
}

impl Debug for OrderRejected {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OrderRejected")
.field("order_id", &self.order_id)
.field("timestamp", &self.timestamp.as_micros())
.field("reason", &self.reason)
.finish()
}
}

impl SerializableMessage for OrderCreated {
fn get_message_type(&self) -> &str {
ORDER_CREATED_TYPE
Expand Down
8 changes: 4 additions & 4 deletions examples/src/shared/messages_generator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::shared::messages::{OrderConfirmed, OrderCreated, OrderRejected, SerializableMessage};
use crate::shared::utils;
use iggy::utils::timestamp::IggyTimestamp;
use rand::rngs::ThreadRng;
use rand::Rng;

Expand Down Expand Up @@ -32,7 +32,7 @@ impl MessagesGenerator {
self.order_id += 1;
Box::new(OrderCreated {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
currency_pair: CURRENCY_PAIRS[self.rng.gen_range(0..CURRENCY_PAIRS.len())].to_string(),
price: self.rng.gen_range(10.0..=1000.0),
quantity: self.rng.gen_range(0.1..=1.0),
Expand All @@ -47,15 +47,15 @@ impl MessagesGenerator {
fn generate_order_confirmed(&mut self) -> Box<dyn SerializableMessage> {
Box::new(OrderConfirmed {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
price: self.rng.gen_range(10.0..=1000.0),
})
}

fn generate_order_rejected(&mut self) -> Box<dyn SerializableMessage> {
Box::new(OrderRejected {
order_id: self.order_id,
timestamp: utils::timestamp(),
timestamp: IggyTimestamp::now(),
reason: match self.rng.gen_range(0..=1) {
0 => "cancelled_by_user",
_ => "other",
Expand Down
1 change: 0 additions & 1 deletion examples/src/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ pub mod args;
pub mod messages;
pub mod messages_generator;
pub mod system;
pub mod utils;
3 changes: 2 additions & 1 deletion examples/src/shared/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use iggy::messages::poll_messages::PollingStrategy;
use iggy::models::messages::PolledMessage;
use iggy::users::defaults::*;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::topic_size::MaxTopicSize;
use tracing::info;

type MessageHandler = dyn Fn(&PolledMessage) -> Result<(), Box<dyn std::error::Error>>;
Expand Down Expand Up @@ -74,7 +75,7 @@ pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), Ig
None,
Some(args.topic_id),
IggyExpiry::NeverExpire,
None,
MaxTopicSize::ServerDefault,
)
.await?;
Ok(())
Expand Down
Loading

0 comments on commit faf7f9f

Please sign in to comment.