diff --git a/CHANGELOG.md b/CHANGELOG.md index b21744e0..c3d4ba35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ All other sections are for end-users. ## [Unreleased] +### Changed + +- move Configuration toml support from `springql-core`, into new crate `springql-configloader` ([#234](https://github.com/SpringQL/SpringQL/pull/234)) + ## [v0.17.2] - 2022-08-03 ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 8c9f85c8..6ad9ad87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,5 +4,6 @@ members = [ "springql", "springql-core", "foreign-service", + "springql-configloader", "test-logger", ] diff --git a/default_config.toml b/default_config.toml new file mode 100644 index 00000000..55337787 --- /dev/null +++ b/default_config.toml @@ -0,0 +1,58 @@ +[worker] +# Number of generic worker threads. Generic worker threads deal with internal and sink tasks. +# Setting this to > 1 may improve throughput but lead to out-of-order stream processing. +n_generic_worker_threads = 1 + +# Number of source worker threads. Source worker threads collect rows from foreign source. +# Too many number may may cause row fraud in runtime. +# Setting this to > 1 may improve throughput but lead to out-of-order stream processing. +n_source_worker_threads = 1 + +# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream. +# Small number will improve the initial row's E2E latency but increase the CPU usage. +sleep_msec_no_row = 100 + +[memory] +# How much memory is allowed to be used in SpringQL streaming runtime. +upper_limit_bytes = 10_000_000 + +# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe. +# In Severe state, internal scheduler is changed to exhibit memory-resilience. +moderate_to_severe_percent = 60 + +# Percentage over `upper_limit_bytes` to transit from Severe state to Critical. +# In Critical state, all intermediate rows are purged to release memory. +severe_to_critical_percent = 95 + +critical_to_severe_percent = 80 +severe_to_moderate_percent = 40 + +# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event. +memory_state_transition_interval_msec = 10 + +# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event. +performance_metrics_summary_report_interval_msec = 10 + +[web_console] +# Whether to enable POST API request to web console. +enable_report_post = false + +report_interval_msec = 3_000 + +host = "127.0.0.1" +port = 8050 + +timeout_msec = 3_000 + +[source_reader] +net_connect_timeout_msec = 1_000 +net_read_timeout_msec = 100 + +can_read_timeout_msec = 100 + +[sink_writer] +net_connect_timeout_msec = 1_000 +net_write_timeout_msec = 100 + +http_connect_timeout_msec = 1_000 +http_timeout_msec = 100 diff --git a/springql-configloader/Cargo.toml b/springql-configloader/Cargo.toml new file mode 100644 index 00000000..d7d01547 --- /dev/null +++ b/springql-configloader/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "springql-configloader" +version = "0.1.0" +edition = "2021" +authors = ["Sho Nakatani "] +license = "MIT OR Apache-2.0" +repository = "https://github.com/SpringQL/SpringQL" +keywords = ["springql", "stream-processing"] # up to 5 keywords, each keyword should have <= 20 chars + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +springql-core = { version = "0.17.1", path = "../springql-core" } +serde = {version = "1.0", features = ["derive"], default-features = false} +config = {version = "0.13", features = ["toml"], default-features = false} diff --git a/springql-configloader/README.md b/springql-configloader/README.md new file mode 100644 index 00000000..13038c32 --- /dev/null +++ b/springql-configloader/README.md @@ -0,0 +1,8 @@ +# springql DEserialize CONFIGulations + + This crate supports configuration deserialize from file. + + Supported file formats. + + - TOML + \ No newline at end of file diff --git a/springql-configloader/src/lib.rs b/springql-configloader/src/lib.rs new file mode 100644 index 00000000..46ddc431 --- /dev/null +++ b/springql-configloader/src/lib.rs @@ -0,0 +1,149 @@ +// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. + +use serde::Deserialize; +pub use springql_core::api::{ + Result, SpringConfig, SpringError, SpringMemoryConfig, SpringSinkWriterConfig, + SpringSourceReaderConfig, SpringWebConsoleConfig, SpringWorkerConfig, +}; + +const SPRING_CONFIG_DEFAULT: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/../default_config.toml" +)); + +#[allow(missing_docs)] +#[derive(Clone, Eq, PartialEq, Debug, Deserialize)] +struct SpringConfigDeserialize { + #[serde(with = "SpringWorkerConfigDeserialize")] + pub worker: SpringWorkerConfig, + #[serde(with = "SpringMemoryConfigDeserialize")] + pub memory: SpringMemoryConfig, + #[serde(with = "SpringWebConsoleConfigDeserialize")] + pub web_console: SpringWebConsoleConfig, + #[serde(with = "SpringSourceReaderConfigDeserialize")] + pub source_reader: SpringSourceReaderConfig, + #[serde(with = "SpringSinkWriterConfigDeserialize")] + pub sink_writer: SpringSinkWriterConfig, +} + +#[allow(missing_docs)] +#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] +#[serde(remote = "SpringWorkerConfig")] +struct SpringWorkerConfigDeserialize { + pub n_generic_worker_threads: u16, + pub n_source_worker_threads: u16, + pub sleep_msec_no_row: u64, +} + +#[allow(missing_docs)] +#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] +#[serde(remote = "SpringMemoryConfig")] +struct SpringMemoryConfigDeserialize { + pub upper_limit_bytes: u64, + + pub moderate_to_severe_percent: u8, + pub severe_to_critical_percent: u8, + + pub critical_to_severe_percent: u8, + pub severe_to_moderate_percent: u8, + + pub memory_state_transition_interval_msec: u32, + pub performance_metrics_summary_report_interval_msec: u32, +} + +/// Config related to web console. +#[allow(missing_docs)] +#[derive(Clone, Eq, PartialEq, Debug, Deserialize)] +#[serde(remote = "SpringWebConsoleConfig")] +struct SpringWebConsoleConfigDeserialize { + pub enable_report_post: bool, + + pub report_interval_msec: u32, + + pub host: String, + pub port: u16, + + pub timeout_msec: u32, +} + +#[allow(missing_docs)] +#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] +#[serde(remote = "SpringSourceReaderConfig")] +pub struct SpringSourceReaderConfigDeserialize { + pub net_connect_timeout_msec: u32, + pub net_read_timeout_msec: u32, + + pub can_read_timeout_msec: u32, +} + +/// Config related to sink writer. +#[allow(missing_docs)] +#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] +#[serde(remote = "SpringSinkWriterConfig")] +pub struct SpringSinkWriterConfigDeserialize { + pub net_connect_timeout_msec: u32, + pub net_write_timeout_msec: u32, + + pub http_timeout_msec: u32, + pub http_connect_timeout_msec: u32, +} + +impl SpringConfigDeserialize { + /// Configuration by TOML format string. + /// + /// # Parameters + /// + /// - `overwrite_config_toml`: TOML format configuration to overwrite default. See `SPRING_CONFIG_DEFAULT` in [spring_config.rs](https://github.com/SpringQL/SpringQL/tree/main/springql-core/src/api/spring_config.rs) for full-set default configuration. + /// + /// # Failures + /// + /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when: + /// - `overwrite_config_toml` includes invalid key and/or value. + /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when: + /// - `overwrite_config_toml` is not valid as TOML. + pub fn from_toml(overwrite_config_toml: &str) -> Result { + let default_conf = config::Config::builder() + .add_source(config::File::from_str( + SPRING_CONFIG_DEFAULT, + config::FileFormat::Toml, + )) + .build() + .expect("SPRING_CONFIG_DEFAULT is in wrong format"); + + let c = config::Config::builder() + .add_source(default_conf) + .add_source(config::File::from_str( + overwrite_config_toml, + config::FileFormat::Toml, + )) + .build() + .map_err(|e| SpringError::InvalidFormat { + s: overwrite_config_toml.to_string(), + source: e.into(), + })?; + + let de = c + .try_deserialize::<'_, Self>() + .map_err(|e| SpringError::InvalidConfig { source: e.into() })?; + + Ok(SpringConfig { + worker: de.worker, + memory: de.memory, + web_console: de.web_console, + source_reader: de.source_reader, + sink_writer: de.sink_writer, + }) + } +} + +/// trait for deserialize configuration from file +pub trait SpringConfigExt { + /// Create StringConfig from toml + fn from_toml(toml: &str) -> Result; +} + +impl SpringConfigExt for SpringConfig { + fn from_toml(toml: &str) -> Result { + SpringConfigDeserialize::from_toml(toml) + } +} diff --git a/springql-core/Cargo.toml b/springql-core/Cargo.toml index 9605e8f6..2be26006 100644 --- a/springql-core/Cargo.toml +++ b/springql-core/Cargo.toml @@ -23,7 +23,6 @@ anyhow = "1.0" thiserror = "1.0" serde = {version = "1.0", features = ["derive"], default-features = false} serde_json = "1.0" -config = {version = "0.13", features = ["toml"], default-features = false} derive-new = "0.5" ordered-float = "3.0" fastrand = "1.5" @@ -49,3 +48,4 @@ regex = "1.5" float-cmp = "0.9" tempfile = "3.3" serde_derive = "1.0" +springql-configloader = { path = "../springql-configloader" } diff --git a/springql-core/src/api/spring_config.rs b/springql-core/src/api/spring_config.rs index 77b5da17..85320e3d 100644 --- a/springql-core/src/api/spring_config.rs +++ b/springql-core/src/api/spring_config.rs @@ -1,76 +1,8 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. -use serde::Deserialize; - -use crate::api::error::{Result, SpringError}; - -/// Default configuration. -/// -/// Default key-values are overwritten by `overwrite_config_toml` parameter in `SpringConfig::new()`. -const SPRING_CONFIG_DEFAULT: &str = r#" -[worker] -# Number of generic worker threads. Generic worker threads deal with internal and sink tasks. -# Setting this to > 1 may improve throughput but lead to out-of-order stream processing. -n_generic_worker_threads = 1 - -# Number of source worker threads. Source worker threads collect rows from foreign source. -# Too many number may may cause row fraud in runtime. -# Setting this to > 1 may improve throughput but lead to out-of-order stream processing. -n_source_worker_threads = 1 - -# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream. -# Small number will improve the initial row's E2E latency but increase the CPU usage. -sleep_msec_no_row = 100 - -[memory] -# How much memory is allowed to be used in SpringQL streaming runtime. -upper_limit_bytes = 10_000_000 - -# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe. -# In Severe state, internal scheduler is changed to exhibit memory-resilience. -moderate_to_severe_percent = 60 - -# Percentage over `upper_limit_bytes` to transit from Severe state to Critical. -# In Critical state, all intermediate rows are purged to release memory. -severe_to_critical_percent = 95 - -critical_to_severe_percent = 80 -severe_to_moderate_percent = 40 - -# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event. -memory_state_transition_interval_msec = 10 - -# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event. -performance_metrics_summary_report_interval_msec = 10 - -[web_console] -# Whether to enable POST API request to web console. -enable_report_post = false - -report_interval_msec = 3_000 - -host = "127.0.0.1" -port = 8050 - -timeout_msec = 3_000 - -[source_reader] -net_connect_timeout_msec = 1_000 -net_read_timeout_msec = 100 - -can_read_timeout_msec = 100 - -[sink_writer] -net_connect_timeout_msec = 1_000 -net_write_timeout_msec = 100 - -http_connect_timeout_msec = 1_000 -http_timeout_msec = 100 -"#; - /// Top-level config. #[allow(missing_docs)] -#[derive(Clone, Eq, PartialEq, Debug, Deserialize)] +#[derive(Clone, Eq, PartialEq, Debug)] pub struct SpringConfig { pub worker: SpringWorkerConfig, pub memory: SpringMemoryConfig, @@ -79,64 +11,9 @@ pub struct SpringConfig { pub sink_writer: SpringSinkWriterConfig, } -impl Default for SpringConfig { - fn default() -> Self { - Self::new("").expect("default configuration must be valid") - } -} - -impl SpringConfig { - /// # Failures - /// - /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when: - /// - `overwrite_config_toml` includes invalid key and/or value. - /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when: - /// - `overwrite_config_toml` is not valid as TOML. - pub fn new(overwrite_config_toml: &str) -> Result { - let default_conf = config::Config::builder() - .add_source(config::File::from_str( - SPRING_CONFIG_DEFAULT, - config::FileFormat::Toml, - )) - .build() - .expect("SPRING_CONFIG_DEFAULT is in wrong format"); - - let c = config::Config::builder() - .add_source(default_conf) - .add_source(config::File::from_str( - overwrite_config_toml, - config::FileFormat::Toml, - )) - .build() - .map_err(|e| SpringError::InvalidFormat { - s: overwrite_config_toml.to_string(), - source: e.into(), - })?; - - c.try_deserialize() - .map_err(|e| SpringError::InvalidConfig { source: e.into() }) - } - - /// Configuration by TOML format string. - /// - /// # Parameters - /// - /// - `overwrite_config_toml`: TOML format configuration to overwrite default. See `SPRING_CONFIG_DEFAULT` in [spring_config.rs](https://github.com/SpringQL/SpringQL/tree/main/springql-core/src/api/spring_config.rs) for full-set default configuration. - /// - /// # Failures - /// - /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when: - /// - `overwrite_config_toml` includes invalid key and/or value. - /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when: - /// - `overwrite_config_toml` is not valid as TOML. - pub fn from_toml(overwrite_config_toml: &str) -> Result { - SpringConfig::new(overwrite_config_toml) - } -} - /// Config related to worker threads. #[allow(missing_docs)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] pub struct SpringWorkerConfig { pub n_generic_worker_threads: u16, pub n_source_worker_threads: u16, @@ -145,7 +22,7 @@ pub struct SpringWorkerConfig { /// Config related to memory management. #[allow(missing_docs)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] pub struct SpringMemoryConfig { pub upper_limit_bytes: u64, @@ -161,7 +38,7 @@ pub struct SpringMemoryConfig { /// Config related to web console. #[allow(missing_docs)] -#[derive(Clone, Eq, PartialEq, Debug, Deserialize)] +#[derive(Clone, Eq, PartialEq, Debug)] pub struct SpringWebConsoleConfig { pub enable_report_post: bool, @@ -175,7 +52,7 @@ pub struct SpringWebConsoleConfig { /// Config related to source reader #[allow(missing_docs)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] pub struct SpringSourceReaderConfig { pub net_connect_timeout_msec: u32, pub net_read_timeout_msec: u32, @@ -185,7 +62,7 @@ pub struct SpringSourceReaderConfig { /// Config related to sink writer. #[allow(missing_docs)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] pub struct SpringSinkWriterConfig { pub net_connect_timeout_msec: u32, pub net_write_timeout_msec: u32, diff --git a/springql-core/src/pipeline/test_support/fixture.rs b/springql-core/src/pipeline/test_support/fixture.rs index 521ae038..979280ce 100644 --- a/springql-core/src/pipeline/test_support/fixture.rs +++ b/springql-core/src/pipeline/test_support/fixture.rs @@ -3,7 +3,10 @@ use std::{net::IpAddr, sync::Arc}; use crate::{ - api::{SpringConfig, SpringSinkWriterConfig, SpringSourceReaderConfig}, + api::{ + SpringConfig, SpringMemoryConfig, SpringSinkWriterConfig, SpringSourceReaderConfig, + SpringWebConsoleConfig, SpringWorkerConfig, + }, pipeline::{ field::ColumnReference, name::{ColumnName, SinkWriterName, SourceReaderName, StreamName}, @@ -16,10 +19,64 @@ use crate::{ Pipeline, }, }; +use springql_configloader::SpringConfigExt; + +fn default_config() -> SpringConfig { + let deconfig = springql_configloader::SpringConfig::from_toml("").unwrap(); + + /* + can not direct return deconfig, + + return deconfig; + 23 | return deconfig; + | ^^^^^^^^ expected struct `spring_config::SpringConfig`, found struct `springql_deconfig::SpringConfig` + + because no gurantee for same as SpringConfig defined and springql_deconfig export SpringConfig + */ + + SpringConfig { + worker: SpringWorkerConfig { + n_generic_worker_threads: deconfig.worker.n_generic_worker_threads, + n_source_worker_threads: deconfig.worker.n_source_worker_threads, + sleep_msec_no_row: deconfig.worker.sleep_msec_no_row, + }, + memory: SpringMemoryConfig { + upper_limit_bytes: deconfig.memory.upper_limit_bytes, + moderate_to_severe_percent: deconfig.memory.moderate_to_severe_percent, + severe_to_critical_percent: deconfig.memory.severe_to_critical_percent, + critical_to_severe_percent: deconfig.memory.critical_to_severe_percent, + severe_to_moderate_percent: deconfig.memory.severe_to_moderate_percent, + memory_state_transition_interval_msec: deconfig + .memory + .memory_state_transition_interval_msec, + performance_metrics_summary_report_interval_msec: deconfig + .memory + .performance_metrics_summary_report_interval_msec, + }, + web_console: SpringWebConsoleConfig { + enable_report_post: deconfig.web_console.enable_report_post, + report_interval_msec: deconfig.web_console.report_interval_msec, + host: deconfig.web_console.host, + port: deconfig.web_console.port, + timeout_msec: deconfig.web_console.timeout_msec, + }, + source_reader: SpringSourceReaderConfig { + net_connect_timeout_msec: deconfig.source_reader.net_connect_timeout_msec, + net_read_timeout_msec: deconfig.source_reader.net_read_timeout_msec, + can_read_timeout_msec: deconfig.source_reader.can_read_timeout_msec, + }, + sink_writer: SpringSinkWriterConfig { + net_connect_timeout_msec: deconfig.sink_writer.net_connect_timeout_msec, + net_write_timeout_msec: deconfig.sink_writer.net_write_timeout_msec, + http_timeout_msec: deconfig.sink_writer.http_timeout_msec, + http_connect_timeout_msec: deconfig.sink_writer.http_connect_timeout_msec, + }, + } +} impl SpringConfig { pub fn fx_default() -> Self { - Self::new("").unwrap() + default_config() } } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/sink_task/sink_writer.rs b/springql-core/src/stream_engine/autonomous_executor/task/sink_task/sink_writer.rs index 3edbb7c6..c4abd3c4 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/sink_task/sink_writer.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/sink_task/sink_writer.rs @@ -1,8 +1,8 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. +mod http_client; mod in_memory_queue; mod net; -mod http_client; mod sink_writer_factory; mod sink_writer_repository; diff --git a/springql/Cargo.toml b/springql/Cargo.toml index 940327ec..f2f6f04e 100644 --- a/springql/Cargo.toml +++ b/springql/Cargo.toml @@ -16,6 +16,7 @@ repository = "https://github.com/SpringQL/SpringQL" [dependencies] springql-core = {version="0.17.1", path="../springql-core"} +springql-configloader = { version = "0.1.0", path = "../springql-configloader" } [dev-dependencies] springql-foreign-service = {path = "../foreign-service"} diff --git a/springql/examples/can_source_reader.rs b/springql/examples/can_source_reader.rs index b4895c6d..c29652df 100644 --- a/springql/examples/can_source_reader.rs +++ b/springql/examples/can_source_reader.rs @@ -10,6 +10,7 @@ use std::env; +use springql_configloader::SpringConfigExt; use springql_core::api::{SpringConfig, SpringPipeline}; fn parse_can_interface_arg() -> String { @@ -25,7 +26,7 @@ fn parse_can_interface_arg() -> String { fn main() { let can_interface = parse_can_interface_arg(); - let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap(); + let pipeline = SpringPipeline::new(&SpringConfig::from_toml("").unwrap()).unwrap(); pipeline .command( diff --git a/springql/examples/doc_app1.rs b/springql/examples/doc_app1.rs index 849280a7..accaf5a6 100644 --- a/springql/examples/doc_app1.rs +++ b/springql/examples/doc_app1.rs @@ -13,6 +13,7 @@ //! ``` use springql::{SpringConfig, SpringPipeline}; +use springql_configloader::SpringConfigExt; use std::process::Command; fn send_data_to_pipeline() { @@ -26,7 +27,7 @@ fn send_data_to_pipeline() { fn main() { const SOURCE_PORT: u16 = 54300; - let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap(); + let pipeline = SpringPipeline::new(&SpringConfig::from_toml("").unwrap()).unwrap(); pipeline .command( diff --git a/springql/examples/doc_app2.rs b/springql/examples/doc_app2.rs index 499286f6..ee915518 100644 --- a/springql/examples/doc_app2.rs +++ b/springql/examples/doc_app2.rs @@ -25,6 +25,7 @@ use std::{ }; use springql::{SpringConfig, SpringPipeline}; +use springql_configloader::SpringConfigExt; fn send_data_to_pipeline() { fn send_row(json: &str) { @@ -48,7 +49,7 @@ fn main() { const SOURCE_PORT: u16 = 54300; // Using Arc to share the reference between threads feeding sink rows. - let pipeline = Arc::new(SpringPipeline::new(&SpringConfig::default()).unwrap()); + let pipeline = Arc::new(SpringPipeline::new(&SpringConfig::from_toml("").unwrap()).unwrap()); pipeline .command( diff --git a/springql/examples/http_client_sink_writer.rs b/springql/examples/http_client_sink_writer.rs index 9bd60b10..7f416fd9 100644 --- a/springql/examples/http_client_sink_writer.rs +++ b/springql/examples/http_client_sink_writer.rs @@ -9,6 +9,7 @@ use std::{env, process::Command, thread, time::Duration}; use springql::SpringSourceRowBuilder; +use springql_configloader::SpringConfigExt; use springql_core::api::{SpringConfig, SpringPipeline}; use springql_test_logger::setup_test_logger; @@ -38,7 +39,7 @@ fn main() { launch_http_server(sink_port); - let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap(); + let pipeline = SpringPipeline::new(&SpringConfig::from_toml("").unwrap()).unwrap(); pipeline .command( diff --git a/springql/examples/in_memory.rs b/springql/examples/in_memory.rs index a3dcade2..340be03e 100644 --- a/springql/examples/in_memory.rs +++ b/springql/examples/in_memory.rs @@ -9,6 +9,7 @@ //! ``` use springql::{SpringConfig, SpringPipeline, SpringSourceRowBuilder}; +use springql_configloader::SpringConfigExt; fn push_row_to_pipeline(pipeline: &SpringPipeline, queue_name: &str) { let row = SpringSourceRowBuilder::default() @@ -22,7 +23,7 @@ fn push_row_to_pipeline(pipeline: &SpringPipeline, queue_name: &str) { } fn main() { - let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap(); + let pipeline = SpringPipeline::new(&SpringConfig::from_toml("").unwrap()).unwrap(); pipeline .command( diff --git a/springql/examples/in_vehicle_pipeline.rs b/springql/examples/in_vehicle_pipeline.rs index 44ed4aec..75a3104f 100644 --- a/springql/examples/in_vehicle_pipeline.rs +++ b/springql/examples/in_vehicle_pipeline.rs @@ -8,6 +8,7 @@ use std::{ }; use springql::{SpringConfig, SpringPipeline}; +use springql_configloader::SpringConfigExt; use springql_foreign_service::sink::ForeignSink; use tempfile::NamedTempFile; @@ -78,7 +79,7 @@ fn main() { let sink_engine_wheel_speed = ForeignSink::start().unwrap(); let sink_vehicle_speed = ForeignSink::start().unwrap(); - let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap(); + let pipeline = SpringPipeline::new(&SpringConfig::from_toml("").unwrap()).unwrap(); pipeline .command( " diff --git a/springql/src/lib.md b/springql/src/lib.md index 158bb0d7..6b6c3226 100644 --- a/springql/src/lib.md +++ b/springql/src/lib.md @@ -15,12 +15,13 @@ ```rust use springql::{SpringPipeline, SpringConfig}; +use springql_configloader::SpringConfigExt; fn main() { const SOURCE_PORT: u16 = 54300; // create pipeline instans - let pipeline = SpringPipeline::new(&SpringConfig::default()).unwrap(); + let pipeline = SpringPipeline::new(&SpringConfig::from_toml("").unwrap()).unwrap(); // execute DDLs for build pipeline @@ -89,12 +90,13 @@ echo '{"ts": "2022-01-01 13:00:00.000000000", "temperature": 5.3}' | nc localhos ```rust use std::{sync::Arc, thread, time::Duration}; use springql::{SpringPipeline, SpringConfig}; +use springql_configloader::SpringConfigExt; fn main() { const SOURCE_PORT: u16 = 54300; // Using Arc to share the reference between threads feeding sink rows. - let pipeline = Arc::new(SpringPipeline::new(&SpringConfig::default()).unwrap()); + let pipeline = Arc::new(SpringPipeline::new(&SpringConfig::from_toml("").unwrap()).unwrap()); pipeline.command( "CREATE SOURCE STREAM source_trade ( diff --git a/springql/tests/e2e_connect_2_pipelines.rs b/springql/tests/e2e_connect_2_pipelines.rs index 009fa839..c654412e 100644 --- a/springql/tests/e2e_connect_2_pipelines.rs +++ b/springql/tests/e2e_connect_2_pipelines.rs @@ -3,7 +3,7 @@ mod test_support; use serde_json::json; -use springql::{SpringConfig, SpringPipeline}; +use springql::SpringPipeline; use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, @@ -65,7 +65,7 @@ fn pipeline1(test_source: &ForeignSource) -> SpringPipeline { ), ]; - apply_ddls(&ddls, SpringConfig::default()) + apply_ddls(&ddls, default_config()) } fn pipeline2(test_sink: &ForeignSink) -> SpringPipeline { @@ -112,7 +112,7 @@ fn pipeline2(test_sink: &ForeignSink) -> SpringPipeline { ), ]; - apply_ddls(&ddls, SpringConfig::default()) + apply_ddls(&ddls, default_config()) } #[test] diff --git a/springql/tests/e2e_high_level_rs.rs b/springql/tests/e2e_high_level_rs.rs index 2a3da426..7346858e 100644 --- a/springql/tests/e2e_high_level_rs.rs +++ b/springql/tests/e2e_high_level_rs.rs @@ -5,7 +5,7 @@ mod test_support; use pretty_assertions::assert_eq; use serde_json::json; -use springql::{Result, SpringConfig}; +use springql::Result; use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, @@ -87,7 +87,7 @@ fn test_e2e_source_sink() -> Result<()> { ), ]; - let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + let _pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input.clone())); let sink_received = drain_from_sink(&test_sink); @@ -160,7 +160,7 @@ fn test_e2e_projection() -> Result<()> { ), ]; - let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + let _pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(vec![json_oracle])); let sink_received = drain_from_sink(&test_sink); @@ -238,7 +238,7 @@ fn test_e2e_pop_from_in_memory_queue() { ), ]; - let pipeline = apply_ddls(&ddls, SpringConfig::default()); + let pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch( (0..trade_times) .into_iter() @@ -316,7 +316,7 @@ fn test_e2e_pop_non_blocking_from_in_memory_queue() { ), ]; - let pipeline = apply_ddls(&ddls, SpringConfig::default()); + let pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch( (0..trade_times) .into_iter() diff --git a/springql/tests/e2e_sampling.rs b/springql/tests/e2e_sampling.rs index 7e39c92e..96e470f9 100644 --- a/springql/tests/e2e_sampling.rs +++ b/springql/tests/e2e_sampling.rs @@ -4,7 +4,7 @@ mod test_support; use pretty_assertions::assert_eq; use serde_json::json; -use springql_core::api::{error::Result, *}; +use springql_core::api::error::Result; use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, @@ -44,7 +44,7 @@ fn run_and_drain( test_source: ForeignSource, test_sink: &ForeignSink, ) -> Vec { - let _pipeline = apply_ddls(ddls, SpringConfig::default()); + let _pipeline = apply_ddls(ddls, default_config()); test_source.start(source_input); let mut sink_received = drain_from_sink(test_sink); sink_received.sort_by_key(|r| { diff --git a/springql/tests/feat_aggregation.rs b/springql/tests/feat_aggregation.rs index 9091a01d..4db6a851 100644 --- a/springql/tests/feat_aggregation.rs +++ b/springql/tests/feat_aggregation.rs @@ -4,7 +4,7 @@ mod test_support; use pretty_assertions::assert_eq; use serde_json::json; -use springql_core::api::{error::Result, *}; +use springql_core::api::error::Result; use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, @@ -44,7 +44,7 @@ fn run_and_drain( test_source: ForeignSource, test_sink: &ForeignSink, ) -> Vec { - let _pipeline = apply_ddls(ddls, SpringConfig::default()); + let _pipeline = apply_ddls(ddls, default_config()); test_source.start(source_input); drain_from_sink(test_sink) } diff --git a/springql/tests/feat_join.rs b/springql/tests/feat_join.rs index bc286760..c60b8101 100644 --- a/springql/tests/feat_join.rs +++ b/springql/tests/feat_join.rs @@ -4,7 +4,6 @@ mod test_support; use pretty_assertions::assert_eq; use serde_json::json; -use springql_core::api::*; use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, @@ -56,7 +55,7 @@ fn run_and_drain( test_source_city_temperature: ForeignSource, test_sink: &ForeignSink, ) -> Vec { - let _pipeline = apply_ddls(ddls, SpringConfig::default()); + let _pipeline = apply_ddls(ddls, default_config()); test_source_trade.start(test_source_trade_input); test_source_city_temperature.start(test_source_city_temperature_input); diff --git a/springql/tests/feat_logical_ops.rs b/springql/tests/feat_logical_ops.rs index 2ab0e35b..2b11474a 100644 --- a/springql/tests/feat_logical_ops.rs +++ b/springql/tests/feat_logical_ops.rs @@ -4,14 +4,13 @@ mod test_support; use pretty_assertions::assert_eq; use serde_json::json; -use springql::*; use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, }; use springql_test_logger::setup_test_logger; -use crate::test_support::{apply_ddls, drain_from_sink}; +use crate::test_support::{apply_ddls, default_config, drain_from_sink}; #[test] fn test_feat_and() { @@ -73,7 +72,7 @@ fn test_feat_and() { ), ]; - let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + let _pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); let sink_received = drain_from_sink(&test_sink); let r = sink_received.get(0).unwrap(); diff --git a/springql/tests/feat_numerical_ops.rs b/springql/tests/feat_numerical_ops.rs index f2f8f95b..fc55c69c 100644 --- a/springql/tests/feat_numerical_ops.rs +++ b/springql/tests/feat_numerical_ops.rs @@ -5,14 +5,13 @@ mod test_support; use float_cmp::approx_eq; use pretty_assertions::assert_eq; use serde_json::json; -use springql::*; use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, }; use springql_test_logger::setup_test_logger; -use crate::test_support::{apply_ddls, drain_from_sink}; +use crate::test_support::{apply_ddls, default_config, drain_from_sink}; #[test] fn test_feat_add_mul_integer() { @@ -73,7 +72,7 @@ fn test_feat_add_mul_integer() { ), ]; - let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + let _pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); let sink_received = drain_from_sink(&test_sink); let r = sink_received.get(0).unwrap(); @@ -141,7 +140,7 @@ fn test_feat_add_mul_float() { ), ]; - let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + let _pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); let sink_received = drain_from_sink(&test_sink); let r = sink_received.get(0).unwrap(); @@ -208,7 +207,7 @@ fn test_feat_unsigned_integer() { ), ]; - let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + let _pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); let sink_received = drain_from_sink(&test_sink); let r = sink_received.get(0).unwrap(); diff --git a/springql/tests/feat_pipeline.rs b/springql/tests/feat_pipeline.rs index bb1c6e46..2c0196a2 100644 --- a/springql/tests/feat_pipeline.rs +++ b/springql/tests/feat_pipeline.rs @@ -4,11 +4,10 @@ mod test_support; use pretty_assertions::assert_eq; use serde_json::json; -use springql::*; use springql_foreign_service::source::{ForeignSource, ForeignSourceInput}; use springql_test_logger::setup_test_logger; -use crate::test_support::apply_ddls; +use crate::test_support::{apply_ddls, default_config}; /// See: #[test] @@ -87,7 +86,7 @@ fn test_feat_split_from_source() { ), ]; - let pipeline = apply_ddls(&ddls, SpringConfig::default()); + let pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); let row = pipeline.pop("q1").unwrap(); diff --git a/springql/tests/feat_processing_time.rs b/springql/tests/feat_processing_time.rs index fa7e3905..5be877a0 100644 --- a/springql/tests/feat_processing_time.rs +++ b/springql/tests/feat_processing_time.rs @@ -41,7 +41,7 @@ fn run_and_drain( test_source: ForeignSource, test_sink: &ForeignSink, ) -> Vec { - let _pipeline = apply_ddls(ddls, SpringConfig::default()); + let _pipeline = apply_ddls(ddls, default_config()); test_source.start(source_input); drain_from_sink(test_sink) } diff --git a/springql/tests/feat_projection.rs b/springql/tests/feat_projection.rs index 987fd13e..9d3c5b4b 100644 --- a/springql/tests/feat_projection.rs +++ b/springql/tests/feat_projection.rs @@ -4,7 +4,6 @@ mod test_support; use pretty_assertions::assert_eq; use serde_json::json; -use springql_core::api::*; use springql_foreign_service::source::{ForeignSource, ForeignSourceInput}; use springql_test_logger::setup_test_logger; @@ -91,7 +90,7 @@ fn test_select_list_order_with_aggr() { ), ]; - let pipeline = apply_ddls(&ddls, SpringConfig::default()); + let pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); let row = pipeline.pop(QUEUE_NAME).unwrap(); diff --git a/springql/tests/feat_purger.rs b/springql/tests/feat_purger.rs index 844a6ec9..4de10980 100644 --- a/springql/tests/feat_purger.rs +++ b/springql/tests/feat_purger.rs @@ -7,7 +7,7 @@ use std::time::Duration; use log::LevelFilter; use serde_json::json; -use springql_core::api::*; + use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, @@ -117,7 +117,7 @@ fn t(n_in_rows: u64, upper_limit_bytes: u64) { ), ]; - let mut config = SpringConfig::default(); + let mut config = default_config(); config.memory.upper_limit_bytes = upper_limit_bytes; config.memory.severe_to_critical_percent = 60; config.memory.moderate_to_severe_percent = 30; diff --git a/springql/tests/feat_source_row.rs b/springql/tests/feat_source_row.rs index aa9ad639..465fe0b2 100644 --- a/springql/tests/feat_source_row.rs +++ b/springql/tests/feat_source_row.rs @@ -5,8 +5,7 @@ mod test_support; use std::str::FromStr; use springql::{ - SpringConfig, SpringError, SpringPipeline, SpringSourceRow, SpringSourceRowBuilder, - SpringTimestamp, + SpringError, SpringPipeline, SpringSourceRow, SpringSourceRowBuilder, SpringTimestamp, }; use crate::test_support::*; @@ -54,7 +53,7 @@ fn pipeline(source_queue_name: &str, sink_queue_name: &str) -> SpringPipeline { ), ]; - apply_ddls(&ddls, SpringConfig::default()) + apply_ddls(&ddls, default_config()) } #[test] diff --git a/springql/tests/feat_spring_open_twice.rs b/springql/tests/feat_spring_open_twice.rs index d04aea31..9c2b6aea 100644 --- a/springql/tests/feat_spring_open_twice.rs +++ b/springql/tests/feat_spring_open_twice.rs @@ -1,10 +1,12 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. -use springql::{SpringConfig, SpringPipeline}; +mod test_support; +use crate::test_support::*; +use springql::SpringPipeline; #[test] fn test_spring_open_twice() { - let config = SpringConfig::default(); + let config = default_config(); SpringPipeline::new(&config).unwrap(); SpringPipeline::new(&config).unwrap(); } diff --git a/springql/tests/feat_timestamp_ops.rs b/springql/tests/feat_timestamp_ops.rs index 77256010..3076273e 100644 --- a/springql/tests/feat_timestamp_ops.rs +++ b/springql/tests/feat_timestamp_ops.rs @@ -4,14 +4,13 @@ mod test_support; use pretty_assertions::assert_eq; use serde_json::json; -use springql::*; use springql_foreign_service::{ sink::ForeignSink, source::{ForeignSource, ForeignSourceInput}, }; use springql_test_logger::setup_test_logger; -use crate::test_support::{apply_ddls, drain_from_sink}; +use crate::test_support::{apply_ddls, default_config, drain_from_sink}; #[test] fn test_feat_floor_time() { @@ -76,7 +75,7 @@ fn test_feat_floor_time() { ), ]; - let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + let _pipeline = apply_ddls(&ddls, default_config()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); let sink_received = drain_from_sink(&test_sink); let r = sink_received.get(0).unwrap(); diff --git a/springql/tests/feat_worker_config.rs b/springql/tests/feat_worker_config.rs index 5e3840ee..60613ed9 100644 --- a/springql/tests/feat_worker_config.rs +++ b/springql/tests/feat_worker_config.rs @@ -2,7 +2,7 @@ mod test_support; -use crate::test_support::{apply_ddls, drain_from_sink}; +use crate::test_support::{apply_ddls, default_config, drain_from_sink}; use serde_json::json; use springql::*; use springql_foreign_service::{ @@ -89,10 +89,8 @@ fn t(n_generic_worker_threads: u16, n_source_worker_threads: u16) { ), ]; - let config = SpringConfig { - worker: worker_config, - ..Default::default() - }; + let mut config = default_config(); + config.worker = worker_config; let _pipeline = apply_ddls(&ddls, config); test_source.start(ForeignSourceInput::new_fifo_batch(source_input.clone())); diff --git a/springql/tests/test_support/mod.rs b/springql/tests/test_support/mod.rs index 5cae06be..94ea2e78 100644 --- a/springql/tests/test_support/mod.rs +++ b/springql/tests/test_support/mod.rs @@ -3,6 +3,7 @@ use std::time::Duration; use springql::{SpringConfig, SpringPipeline}; +use springql_configloader::SpringConfigExt; use springql_foreign_service::sink::ForeignSink; pub mod request_body; @@ -24,3 +25,8 @@ pub fn drain_from_sink(sink: &ForeignSink) -> Vec { } received } + +#[allow(dead_code)] +pub fn default_config() -> SpringConfig { + SpringConfig::from_toml("").unwrap() +}