Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config deserializer crate #234

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -4,5 +4,6 @@ members = [
"springql",
"springql-core",
"foreign-service",
"springql-deconfig",
kazuk marked this conversation as resolved.
Show resolved Hide resolved
"test-logger",
]
58 changes: 58 additions & 0 deletions default_config.toml
@@ -0,0 +1,58 @@
[worker]
# Number of generic worker threads. Generic worker threads deal with internal and sink tasks.
# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
n_generic_worker_threads = 1

# Number of source worker threads. Source worker threads collect rows from foreign source.
# Too many number may may cause row fraud in runtime.
# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
n_source_worker_threads = 1

# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream.
# Small number will improve the initial row's E2E latency but increase the CPU usage.
sleep_msec_no_row = 100

[memory]
# How much memory is allowed to be used in SpringQL streaming runtime.
upper_limit_bytes = 10_000_000

# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe.
# In Severe state, internal scheduler is changed to exhibit memory-resilience.
moderate_to_severe_percent = 60

# Percentage over `upper_limit_bytes` to transit from Severe state to Critical.
# In Critical state, all intermediate rows are purged to release memory.
severe_to_critical_percent = 95

critical_to_severe_percent = 80
severe_to_moderate_percent = 40

# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event.
memory_state_transition_interval_msec = 10

# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event.
performance_metrics_summary_report_interval_msec = 10

[web_console]
# Whether to enable POST API request to web console.
enable_report_post = false

report_interval_msec = 3_000

host = "127.0.0.1"
port = 8050

timeout_msec = 3_000

[source_reader]
net_connect_timeout_msec = 1_000
net_read_timeout_msec = 100

can_read_timeout_msec = 100

[sink_writer]
net_connect_timeout_msec = 1_000
net_write_timeout_msec = 100

http_connect_timeout_msec = 1_000
http_timeout_msec = 100
2 changes: 1 addition & 1 deletion springql-core/Cargo.toml
Expand Up @@ -23,7 +23,6 @@ anyhow = "1.0"
thiserror = "1.0"
serde = {version = "1.0", features = ["derive"], default-features = false}
serde_json = "1.0"
config = {version = "0.13", features = ["toml"], default-features = false}
derive-new = "0.5"
ordered-float = "3.0"
fastrand = "1.5"
Expand All @@ -49,3 +48,4 @@ regex = "1.5"
float-cmp = "0.9"
tempfile = "3.3"
serde_derive = "1.0"
springql-deconfig = { path = "../springql-deconfig" }
135 changes: 6 additions & 129 deletions springql-core/src/api/spring_config.rs
@@ -1,76 +1,8 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use serde::Deserialize;

use crate::api::error::{Result, SpringError};

/// Default configuration.
///
/// Default key-values are overwritten by `overwrite_config_toml` parameter in `SpringConfig::new()`.
const SPRING_CONFIG_DEFAULT: &str = r#"
[worker]
# Number of generic worker threads. Generic worker threads deal with internal and sink tasks.
# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
n_generic_worker_threads = 1

# Number of source worker threads. Source worker threads collect rows from foreign source.
# Too many number may may cause row fraud in runtime.
# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
n_source_worker_threads = 1

# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream.
# Small number will improve the initial row's E2E latency but increase the CPU usage.
sleep_msec_no_row = 100

[memory]
# How much memory is allowed to be used in SpringQL streaming runtime.
upper_limit_bytes = 10_000_000

# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe.
# In Severe state, internal scheduler is changed to exhibit memory-resilience.
moderate_to_severe_percent = 60

# Percentage over `upper_limit_bytes` to transit from Severe state to Critical.
# In Critical state, all intermediate rows are purged to release memory.
severe_to_critical_percent = 95

critical_to_severe_percent = 80
severe_to_moderate_percent = 40

# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event.
memory_state_transition_interval_msec = 10

# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event.
performance_metrics_summary_report_interval_msec = 10

[web_console]
# Whether to enable POST API request to web console.
enable_report_post = false

report_interval_msec = 3_000

host = "127.0.0.1"
port = 8050

timeout_msec = 3_000

[source_reader]
net_connect_timeout_msec = 1_000
net_read_timeout_msec = 100

can_read_timeout_msec = 100

[sink_writer]
net_connect_timeout_msec = 1_000
net_write_timeout_msec = 100

http_connect_timeout_msec = 1_000
http_timeout_msec = 100
"#;

/// Top-level config.
#[allow(missing_docs)]
#[derive(Clone, Eq, PartialEq, Debug, Deserialize)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct SpringConfig {
pub worker: SpringWorkerConfig,
pub memory: SpringMemoryConfig,
Expand All @@ -79,64 +11,9 @@ pub struct SpringConfig {
pub sink_writer: SpringSinkWriterConfig,
}

impl Default for SpringConfig {
fn default() -> Self {
Self::new("").expect("default configuration must be valid")
}
}

impl SpringConfig {
/// # Failures
///
/// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when:
/// - `overwrite_config_toml` includes invalid key and/or value.
/// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when:
/// - `overwrite_config_toml` is not valid as TOML.
pub fn new(overwrite_config_toml: &str) -> Result<Self> {
let default_conf = config::Config::builder()
.add_source(config::File::from_str(
SPRING_CONFIG_DEFAULT,
config::FileFormat::Toml,
))
.build()
.expect("SPRING_CONFIG_DEFAULT is in wrong format");

let c = config::Config::builder()
.add_source(default_conf)
.add_source(config::File::from_str(
overwrite_config_toml,
config::FileFormat::Toml,
))
.build()
.map_err(|e| SpringError::InvalidFormat {
s: overwrite_config_toml.to_string(),
source: e.into(),
})?;

c.try_deserialize()
.map_err(|e| SpringError::InvalidConfig { source: e.into() })
}

/// Configuration by TOML format string.
///
/// # Parameters
///
/// - `overwrite_config_toml`: TOML format configuration to overwrite default. See `SPRING_CONFIG_DEFAULT` in [spring_config.rs](https://github.com/SpringQL/SpringQL/tree/main/springql-core/src/api/spring_config.rs) for full-set default configuration.
///
/// # Failures
///
/// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when:
/// - `overwrite_config_toml` includes invalid key and/or value.
/// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when:
/// - `overwrite_config_toml` is not valid as TOML.
pub fn from_toml(overwrite_config_toml: &str) -> Result<SpringConfig> {
SpringConfig::new(overwrite_config_toml)
}
}

/// Config related to worker threads.
#[allow(missing_docs)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct SpringWorkerConfig {
pub n_generic_worker_threads: u16,
pub n_source_worker_threads: u16,
Expand All @@ -145,7 +22,7 @@ pub struct SpringWorkerConfig {

/// Config related to memory management.
#[allow(missing_docs)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct SpringMemoryConfig {
pub upper_limit_bytes: u64,

Expand All @@ -161,7 +38,7 @@ pub struct SpringMemoryConfig {

/// Config related to web console.
#[allow(missing_docs)]
#[derive(Clone, Eq, PartialEq, Debug, Deserialize)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct SpringWebConsoleConfig {
pub enable_report_post: bool,

Expand All @@ -175,7 +52,7 @@ pub struct SpringWebConsoleConfig {

/// Config related to source reader
#[allow(missing_docs)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct SpringSourceReaderConfig {
pub net_connect_timeout_msec: u32,
pub net_read_timeout_msec: u32,
Expand All @@ -185,7 +62,7 @@ pub struct SpringSourceReaderConfig {

/// Config related to sink writer.
#[allow(missing_docs)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct SpringSinkWriterConfig {
pub net_connect_timeout_msec: u32,
pub net_write_timeout_msec: u32,
Expand Down
61 changes: 59 additions & 2 deletions springql-core/src/pipeline/test_support/fixture.rs
Expand Up @@ -3,7 +3,10 @@
use std::{net::IpAddr, sync::Arc};

use crate::{
api::{SpringConfig, SpringSinkWriterConfig, SpringSourceReaderConfig},
api::{
SpringConfig, SpringMemoryConfig, SpringSinkWriterConfig, SpringSourceReaderConfig,
SpringWebConsoleConfig, SpringWorkerConfig,
},
pipeline::{
field::ColumnReference,
name::{ColumnName, SinkWriterName, SourceReaderName, StreamName},
Expand All @@ -16,10 +19,64 @@ use crate::{
Pipeline,
},
};
use springql_deconfig::SpringConfigExt;

fn default_config() -> SpringConfig {
let deconfig = springql_deconfig::SpringConfig::from_toml("").unwrap();

/*
can not direct return deconfig,

return deconfig;
23 | return deconfig;
| ^^^^^^^^ expected struct `spring_config::SpringConfig`, found struct `springql_deconfig::SpringConfig`

because no gurantee for same as SpringConfig defined and springql_deconfig export SpringConfig
*/

SpringConfig {
worker: SpringWorkerConfig {
n_generic_worker_threads: deconfig.worker.n_generic_worker_threads,
n_source_worker_threads: deconfig.worker.n_source_worker_threads,
sleep_msec_no_row: deconfig.worker.sleep_msec_no_row,
},
memory: SpringMemoryConfig {
upper_limit_bytes: deconfig.memory.upper_limit_bytes,
moderate_to_severe_percent: deconfig.memory.moderate_to_severe_percent,
severe_to_critical_percent: deconfig.memory.severe_to_critical_percent,
critical_to_severe_percent: deconfig.memory.critical_to_severe_percent,
severe_to_moderate_percent: deconfig.memory.severe_to_moderate_percent,
memory_state_transition_interval_msec: deconfig
.memory
.memory_state_transition_interval_msec,
performance_metrics_summary_report_interval_msec: deconfig
.memory
.performance_metrics_summary_report_interval_msec,
},
web_console: SpringWebConsoleConfig {
enable_report_post: deconfig.web_console.enable_report_post,
report_interval_msec: deconfig.web_console.report_interval_msec,
host: deconfig.web_console.host,
port: deconfig.web_console.port,
timeout_msec: deconfig.web_console.timeout_msec,
},
source_reader: SpringSourceReaderConfig {
net_connect_timeout_msec: deconfig.source_reader.net_connect_timeout_msec,
net_read_timeout_msec: deconfig.source_reader.net_read_timeout_msec,
can_read_timeout_msec: deconfig.source_reader.can_read_timeout_msec,
},
sink_writer: SpringSinkWriterConfig {
net_connect_timeout_msec: deconfig.sink_writer.net_connect_timeout_msec,
net_write_timeout_msec: deconfig.sink_writer.net_write_timeout_msec,
http_timeout_msec: deconfig.sink_writer.http_timeout_msec,
http_connect_timeout_msec: deconfig.sink_writer.http_connect_timeout_msec,
},
}
}

impl SpringConfig {
pub fn fx_default() -> Self {
Self::new("").unwrap()
default_config()
}
}

Expand Down
@@ -1,8 +1,8 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

mod http_client;
mod in_memory_queue;
mod net;
mod http_client;
mod sink_writer_factory;
mod sink_writer_repository;

Expand Down
11 changes: 11 additions & 0 deletions springql-deconfig/Cargo.toml
@@ -0,0 +1,11 @@
[package]
name = "springql-deconfig"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
springql-core = { version = "0.17.1", path = "../springql-core" }
serde = {version = "1.0", features = ["derive"], default-features = false}
config = {version = "0.13", features = ["toml"], default-features = false}
8 changes: 8 additions & 0 deletions springql-deconfig/README.md
@@ -0,0 +1,8 @@
# springql DEserialize CONFIGulations

This crate supports configuration deserialize from file.

Supported file formats.

- TOML