diff --git a/Cargo.lock b/Cargo.lock index e556030f..66a6670d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3585,6 +3585,7 @@ version = "0.1.0" dependencies = [ "anyhow", "arrow", + "asap_planner", "asap_sketchlib", "async-trait", "axum", diff --git a/asap-common/dependencies/rs/sketch_db_common/src/lib.rs b/asap-common/dependencies/rs/sketch_db_common/src/lib.rs index ee7cdc0c..2ae3471e 100644 --- a/asap-common/dependencies/rs/sketch_db_common/src/lib.rs +++ b/asap-common/dependencies/rs/sketch_db_common/src/lib.rs @@ -6,6 +6,7 @@ pub mod inference_config; pub mod promql_schema; pub mod query_config; pub mod query_requirements; +pub mod streaming_config; pub mod traits; pub mod utils; @@ -17,3 +18,4 @@ pub use inference_config::*; pub use promql_schema::*; pub use query_config::*; pub use query_requirements::*; +pub use streaming_config::*; diff --git a/asap-common/dependencies/rs/sketch_db_common/src/streaming_config.rs b/asap-common/dependencies/rs/sketch_db_common/src/streaming_config.rs new file mode 100644 index 00000000..3532eb94 --- /dev/null +++ b/asap-common/dependencies/rs/sketch_db_common/src/streaming_config.rs @@ -0,0 +1,127 @@ +use anyhow::Result; +use core::panic; +use serde::{Deserialize, Serialize}; +use serde_yaml::Value; +use std::collections::HashMap; +use std::fs::File; +use std::io::BufReader; +use std::ops::Index; + +use crate::aggregation_config::{AggregationConfig, AggregationIdInfo}; +use crate::capability_matching::find_compatible_aggregation as common_find_compatible; +use crate::enums::QueryLanguage; +use crate::inference_config::{InferenceConfig, SchemaConfig}; +use crate::query_requirements::QueryRequirements; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StreamingConfig { + pub aggregation_configs: HashMap, +} + +impl StreamingConfig { + pub fn new(aggregation_configs: HashMap) -> Self { + Self { + aggregation_configs, + } + } + + pub fn get_aggregation_config(&self, aggregation_id: u64) -> Option<&AggregationConfig> { + self.aggregation_configs.get(&aggregation_id) + } + + pub fn get_all_aggregation_configs(&self) -> &HashMap { + &self.aggregation_configs + } + + pub fn contains(&self, aggregation_id: u64) -> bool { + self.aggregation_configs.contains_key(&aggregation_id) + } + + pub fn from_yaml_file(yaml_file: &str) -> Result { + let file = File::open(yaml_file)?; + let reader = BufReader::new(file); + let data: Value = serde_yaml::from_reader(reader)?; + + Self::from_yaml_data(&data, None) + } + + pub fn from_yaml_data( + data: &Value, + inference_config: Option<&InferenceConfig>, + ) -> Result { + let mut retention_map: HashMap = HashMap::new(); + let mut read_count_threshold_map: HashMap = HashMap::new(); + + if let Some(inference_config) = inference_config { + for query_config in &inference_config.query_configs { + for aggregation in &query_config.aggregations { + let aggregation_id = aggregation.aggregation_id; + if let Some(num_aggregates) = aggregation.num_aggregates_to_retain { + // OLD: Keep last value only (for backwards compatibility) + retention_map.insert(aggregation_id, num_aggregates); + + // NEW: Sum up num_aggregates_to_retain across all queries + *read_count_threshold_map.entry(aggregation_id).or_insert(0) += + num_aggregates; + } + } + } + } + + // Derive query_language from inference_config schema + let query_language = inference_config + .map(|ic| match &ic.schema { + SchemaConfig::PromQL(_) => QueryLanguage::promql, + SchemaConfig::SQL(_) => QueryLanguage::sql, + SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl, + SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql, + }) + .unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config + + let mut aggregation_configs: HashMap = HashMap::new(); + + if let Some(aggregations) = data.get("aggregations").and_then(|v| v.as_sequence()) { + for aggregation_data in aggregations { + if let Some(aggregation_id) = aggregation_data.get("aggregationId") { + let aggregation_id_u64 = aggregation_id.as_u64().or_else(|| panic!()).unwrap(); + let num_aggregates_to_retain = retention_map.get(&aggregation_id_u64); + let read_count_threshold = read_count_threshold_map.get(&aggregation_id_u64); + let config = AggregationConfig::from_yaml_data( + aggregation_data, + num_aggregates_to_retain.copied(), + read_count_threshold.copied(), + query_language, + )?; + aggregation_configs.insert(aggregation_id_u64, config); + } + } + } + + Ok(Self::new(aggregation_configs)) + } +} + +impl StreamingConfig { + /// Find a compatible aggregation for the given requirements using capability-based matching. + /// Delegates to `sketch_db_common::find_compatible_aggregation`. + pub fn find_compatible_aggregation( + &self, + requirements: &QueryRequirements, + ) -> Option { + common_find_compatible(&self.aggregation_configs, requirements) + } +} + +impl Index for StreamingConfig { + type Output = AggregationConfig; + + fn index(&self, aggregation_id: u64) -> &Self::Output { + &self.aggregation_configs[&aggregation_id] + } +} + +impl Default for StreamingConfig { + fn default() -> Self { + Self::new(HashMap::new()) + } +} diff --git a/asap-planner-rs/src/lib.rs b/asap-planner-rs/src/lib.rs index 744e7548..996e155c 100644 --- a/asap-planner-rs/src/lib.rs +++ b/asap-planner-rs/src/lib.rs @@ -5,6 +5,9 @@ pub mod planner; pub mod query_log; use serde_yaml::Value as YamlValue; +use sketch_db_common::enums::QueryLanguage; +use sketch_db_common::inference_config::InferenceConfig; +use sketch_db_common::streaming_config::StreamingConfig; use std::path::Path; pub use config::input::ControllerConfig; @@ -17,6 +20,7 @@ pub use output::sql_generator::SQLRuntimeOptions; pub enum StreamingEngine { Arroyo, Flink, + Precompute, } #[derive(Debug, Clone)] @@ -166,6 +170,21 @@ impl PlannerOutput { Ok(serde_yaml::to_string(&self.inference_yaml)?) } + pub fn to_streaming_config( + &self, + query_language: QueryLanguage, + ) -> Result { + let inference_config = self.to_inference_config(query_language)?; + StreamingConfig::from_yaml_data(&self.streaming_yaml, Some(&inference_config)) + } + + pub fn to_inference_config( + &self, + query_language: QueryLanguage, + ) -> Result { + InferenceConfig::from_yaml_data(&self.inference_yaml, query_language) + } + /// Returns the table_name field of the first aggregation matching agg_type. pub fn aggregation_table_name(&self, agg_type: &str) -> Option { if let YamlValue::Mapping(root) = &self.streaming_yaml { @@ -243,6 +262,10 @@ pub struct SQLController { } impl SQLController { + pub fn new(config: SQLControllerConfig, options: SQLRuntimeOptions) -> Self { + Self { config, options } + } + pub fn from_file(path: &Path, opts: SQLRuntimeOptions) -> Result { let yaml_str = std::fs::read_to_string(path)?; Self::from_yaml(&yaml_str, opts) @@ -279,6 +302,10 @@ impl SQLController { } impl Controller { + pub fn new(config: ControllerConfig, options: RuntimeOptions) -> Self { + Self { config, options } + } + pub fn from_file(path: &Path, opts: RuntimeOptions) -> Result { let yaml_str = std::fs::read_to_string(path)?; Self::from_yaml(&yaml_str, opts) diff --git a/asap-planner-rs/src/main.rs b/asap-planner-rs/src/main.rs index 79a54c0f..b6c234d1 100644 --- a/asap-planner-rs/src/main.rs +++ b/asap-planner-rs/src/main.rs @@ -50,6 +50,7 @@ struct Args { enum EngineArg { Arroyo, Flink, + Precompute, } fn main() -> anyhow::Result<()> { @@ -66,6 +67,7 @@ fn main() -> anyhow::Result<()> { let engine = match args.streaming_engine { EngineArg::Arroyo => StreamingEngine::Arroyo, EngineArg::Flink => StreamingEngine::Flink, + EngineArg::Precompute => StreamingEngine::Precompute, }; match args.query_language { diff --git a/asap-query-engine/Cargo.toml b/asap-query-engine/Cargo.toml index d42d24ed..1d703a71 100644 --- a/asap-query-engine/Cargo.toml +++ b/asap-query-engine/Cargo.toml @@ -10,6 +10,7 @@ promql_utilities.workspace = true sql_utilities.workspace = true sketch_db_common.workspace = true datafusion_summary_library.workspace = true +asap_planner.workspace = true # Shared external (workspace) serde.workspace = true diff --git a/asap-query-engine/Dockerfile b/asap-query-engine/Dockerfile index 1d307974..c036e1ad 100644 --- a/asap-query-engine/Dockerfile +++ b/asap-query-engine/Dockerfile @@ -41,9 +41,10 @@ RUN cargo build --release && rm -rf src/ # Copy source code COPY asap-query-engine/src ./src +COPY asap-planner-rs/src /code/asap-planner-rs/src # Build the actual application -RUN touch src/main.rs && cargo build --release +RUN touch src/main.rs && touch /code/asap-planner-rs/src/lib.rs && cargo build --release # Runtime stage with Ubuntu 24.04 (has newer glibc/libstdc++) FROM ubuntu:24.04 diff --git a/asap-query-engine/src/data_model/streaming_config.rs b/asap-query-engine/src/data_model/streaming_config.rs index e3c44501..818623c5 100644 --- a/asap-query-engine/src/data_model/streaming_config.rs +++ b/asap-query-engine/src/data_model/streaming_config.rs @@ -1,127 +1 @@ -use anyhow::Result; -use core::panic; -use serde::{Deserialize, Serialize}; -use serde_yaml::Value; -use std::collections::HashMap; -use std::fs::File; -use std::io::BufReader; -use std::ops::Index; - -use crate::data_model::aggregation_config::{AggregationConfig, AggregationIdInfo}; -use crate::data_model::enums::QueryLanguage; -use crate::data_model::inference_config::{InferenceConfig, SchemaConfig}; -use sketch_db_common::capability_matching::find_compatible_aggregation as common_find_compatible; -use sketch_db_common::query_requirements::QueryRequirements; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StreamingConfig { - pub aggregation_configs: HashMap, -} - -impl StreamingConfig { - pub fn new(aggregation_configs: HashMap) -> Self { - Self { - aggregation_configs, - } - } - - pub fn get_aggregation_config(&self, aggregation_id: u64) -> Option<&AggregationConfig> { - self.aggregation_configs.get(&aggregation_id) - } - - pub fn get_all_aggregation_configs(&self) -> &HashMap { - &self.aggregation_configs - } - - pub fn contains(&self, aggregation_id: u64) -> bool { - self.aggregation_configs.contains_key(&aggregation_id) - } - - pub fn from_yaml_file(yaml_file: &str) -> Result { - let file = File::open(yaml_file)?; - let reader = BufReader::new(file); - let data: Value = serde_yaml::from_reader(reader)?; - - Self::from_yaml_data(&data, None) - } - - pub fn from_yaml_data( - data: &Value, - inference_config: Option<&InferenceConfig>, - ) -> Result { - let mut retention_map: HashMap = HashMap::new(); - let mut read_count_threshold_map: HashMap = HashMap::new(); - - if let Some(inference_config) = inference_config { - for query_config in &inference_config.query_configs { - for aggregation in &query_config.aggregations { - let aggregation_id = aggregation.aggregation_id; - if let Some(num_aggregates) = aggregation.num_aggregates_to_retain { - // OLD: Keep last value only (for backwards compatibility) - retention_map.insert(aggregation_id, num_aggregates); - - // NEW: Sum up num_aggregates_to_retain across all queries - *read_count_threshold_map.entry(aggregation_id).or_insert(0) += - num_aggregates; - } - } - } - } - - // Derive query_language from inference_config schema - let query_language = inference_config - .map(|ic| match &ic.schema { - SchemaConfig::PromQL(_) => QueryLanguage::promql, - SchemaConfig::SQL(_) => QueryLanguage::sql, - SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl, - SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql, - }) - .unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config - - let mut aggregation_configs: HashMap = HashMap::new(); - - if let Some(aggregations) = data.get("aggregations").and_then(|v| v.as_sequence()) { - for aggregation_data in aggregations { - if let Some(aggregation_id) = aggregation_data.get("aggregationId") { - let aggregation_id_u64 = aggregation_id.as_u64().or_else(|| panic!()).unwrap(); - let num_aggregates_to_retain = retention_map.get(&aggregation_id_u64); - let read_count_threshold = read_count_threshold_map.get(&aggregation_id_u64); - let config = AggregationConfig::from_yaml_data( - aggregation_data, - num_aggregates_to_retain.copied(), - read_count_threshold.copied(), - query_language, - )?; - aggregation_configs.insert(aggregation_id_u64, config); - } - } - } - - Ok(Self::new(aggregation_configs)) - } -} - -impl StreamingConfig { - /// Find a compatible aggregation for the given requirements using capability-based matching. - /// Delegates to `sketch_db_common::find_compatible_aggregation`. - pub fn find_compatible_aggregation( - &self, - requirements: &QueryRequirements, - ) -> Option { - common_find_compatible(&self.aggregation_configs, requirements) - } -} - -impl Index for StreamingConfig { - type Output = AggregationConfig; - - fn index(&self, aggregation_id: u64) -> &Self::Output { - &self.aggregation_configs[&aggregation_id] - } -} - -impl Default for StreamingConfig { - fn default() -> Self { - Self::new(HashMap::new()) - } -} +pub use sketch_db_common::streaming_config::*; diff --git a/asap-query-engine/src/lib.rs b/asap-query-engine/src/lib.rs index e27a799a..72d78416 100644 --- a/asap-query-engine/src/lib.rs +++ b/asap-query-engine/src/lib.rs @@ -14,6 +14,7 @@ fn init_sketch_backend_for_tests() { pub mod data_model; pub mod drivers; pub mod engines; +pub mod planner_client; pub mod precompute_engine; pub mod precompute_operators; pub mod stores; diff --git a/asap-query-engine/src/planner_client.rs b/asap-query-engine/src/planner_client.rs new file mode 100644 index 00000000..2d812ded --- /dev/null +++ b/asap-query-engine/src/planner_client.rs @@ -0,0 +1,136 @@ +use anyhow::Result; +use asap_planner::{Controller, ControllerConfig, PlannerOutput, RuntimeOptions}; +use sketch_db_common::enums::QueryLanguage; +use sketch_db_common::inference_config::InferenceConfig; +use sketch_db_common::streaming_config::StreamingConfig; + +pub struct PlannerResult { + pub streaming_config: StreamingConfig, + pub inference_config: InferenceConfig, + pub punted_queries: Vec, +} + +#[async_trait::async_trait] +pub trait PlannerClient: Send + Sync { + async fn plan(&self, config: ControllerConfig) -> Result; +} + +pub struct LocalPlannerClient { + runtime_options: RuntimeOptions, + query_language: QueryLanguage, +} + +impl LocalPlannerClient { + pub fn new(runtime_options: RuntimeOptions, query_language: QueryLanguage) -> Self { + Self { + runtime_options, + query_language, + } + } +} + +#[async_trait::async_trait] +impl PlannerClient for LocalPlannerClient { + async fn plan(&self, config: ControllerConfig) -> Result { + let opts = self.runtime_options.clone(); + let query_language = self.query_language; + + let output: PlannerOutput = tokio::task::spawn_blocking(move || { + let controller = Controller::new(config, opts); + controller.generate() + }) + .await??; + + let inference_config = output.to_inference_config(query_language)?; + let streaming_config = output.to_streaming_config(query_language)?; + let punted_queries = output + .punted_queries + .iter() + .map(|p| p.query.clone()) + .collect(); + + Ok(PlannerResult { + streaming_config, + inference_config, + punted_queries, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use asap_planner::config::input::{ControllerOptions, MetricDefinition, QueryGroup}; + use asap_planner::ControllerConfig; + use asap_planner::{RuntimeOptions, StreamingEngine}; + + fn sample_controller_config() -> ControllerConfig { + ControllerConfig { + query_groups: vec![QueryGroup { + id: Some(1), + queries: vec!["sum(rate(http_requests_total[5m]))".to_string()], + repetition_delay: 60, + controller_options: ControllerOptions::default(), + step: None, + range_duration: None, + }], + metrics: vec![MetricDefinition { + metric: "http_requests_total".to_string(), + labels: vec!["method".to_string(), "status".to_string()], + }], + sketch_parameters: None, + aggregate_cleanup: None, + } + } + + fn sample_runtime_options() -> RuntimeOptions { + RuntimeOptions { + prometheus_scrape_interval: 15, + streaming_engine: StreamingEngine::Precompute, + enable_punting: false, + range_duration: 300, + step: 15, + } + } + + #[test] + fn test_controller_new_generate() { + let config = sample_controller_config(); + let opts = sample_runtime_options(); + let controller = Controller::new(config, opts); + let output = controller.generate().expect("generate should succeed"); + + assert!(output.streaming_aggregation_count() > 0); + assert!(output.inference_query_count() > 0); + } + + #[test] + fn test_planner_output_struct_accessors() { + let config = sample_controller_config(); + let opts = sample_runtime_options(); + let controller = Controller::new(config, opts); + let output = controller.generate().expect("generate should succeed"); + + let inference = output + .to_inference_config(QueryLanguage::promql) + .expect("to_inference_config should succeed"); + assert!(!inference.query_configs.is_empty()); + + let streaming = output + .to_streaming_config(QueryLanguage::promql) + .expect("to_streaming_config should succeed"); + assert!(!streaming.aggregation_configs.is_empty()); + } + + #[tokio::test] + async fn test_local_planner_client() { + let client = LocalPlannerClient::new(sample_runtime_options(), QueryLanguage::promql); + let config = sample_controller_config(); + + let result = client.plan(config).await.expect("plan should succeed"); + + assert!(!result.streaming_config.aggregation_configs.is_empty()); + assert!(!result.inference_config.query_configs.is_empty()); + assert!(result.punted_queries.is_empty()); + } +}