diff --git a/packages/cubejs-backend-native/Cargo.lock b/packages/cubejs-backend-native/Cargo.lock index adff1c4afa07a..2740ac228d8fd 100644 --- a/packages/cubejs-backend-native/Cargo.lock +++ b/packages/cubejs-backend-native/Cargo.lock @@ -136,12 +136,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" dependencies = [ "concurrent-queue", - "event-listener", - "event-listener-strategy", + "event-listener 4.0.0", + "event-listener-strategy 0.4.0", "futures-core", "pin-project-lite", ] +[[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener 5.4.1", + "event-listener-strategy 0.5.4", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -620,6 +631,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -805,6 +825,7 @@ dependencies = [ "log", "lru", "minijinja", + "moka", "paste", "pg-srv", "postgres-types", @@ -1040,13 +1061,34 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + [[package]] name = "event-listener-strategy" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" dependencies = [ - "event-listener", + "event-listener 4.0.0", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener 5.4.1", "pin-project-lite", ] @@ -1996,6 +2038,27 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.12.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "event-listener 5.4.1", + "futures-util", + "parking_lot", + "portable-atomic", + "rustc_version", + "smallvec", + "tagptr", + "uuid 1.6.1", +] + [[package]] name = "multiversion" version = "0.6.1" @@ -2394,6 +2457,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + [[package]] name = "postgres" version = "0.19.7" @@ -3321,6 +3390,12 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" diff --git a/rust/cubesql/Cargo.lock b/rust/cubesql/Cargo.lock index 1bc909c039808..f6ed58cefe0b2 100644 --- a/rust/cubesql/Cargo.lock +++ b/rust/cubesql/Cargo.lock @@ -144,6 +144,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -515,6 +526,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.0" @@ -587,11 +607,10 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.4" +version = "0.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" dependencies = [ - "cfg-if", "crossbeam-utils", ] @@ -608,27 +627,18 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.8" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "autocfg", - "cfg-if", "crossbeam-utils", - "lazy_static", - "memoffset", - "scopeguard", ] [[package]] name = "crossbeam-utils" -version = "0.8.8" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" -dependencies = [ - "cfg-if", - "lazy_static", -] +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crossterm" @@ -750,6 +760,7 @@ dependencies = [ "log", "lru", "minijinja", + "moka", "paste", "pg-srv", "pg_interval", @@ -1015,6 +1026,27 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -1853,15 +1885,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aec276c09560ce4447087aaefc19eb0c18d97e31bd05ebac38881c4723400c40" -[[package]] -name = "memoffset" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" -dependencies = [ - "autocfg", -] - [[package]] name = "mime" version = "0.3.16" @@ -1911,6 +1934,27 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.12.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "event-listener", + "futures-util", + "parking_lot", + "portable-atomic", + "rustc_version", + "smallvec", + "tagptr", + "uuid 1.10.0", +] + [[package]] name = "multiversion" version = "0.6.1" @@ -2071,6 +2115,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.0" @@ -2347,6 +2397,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + [[package]] name = "portpicker" version = "0.1.1" @@ -2673,6 +2729,15 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.44" @@ -2775,6 +2840,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58bf37232d3bb9a2c4e641ca2a11d83b5062066f88df7fed36c28772046d65ba" +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.217" @@ -3073,6 +3144,12 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.17.1" diff --git a/rust/cubesql/cubesql/Cargo.toml b/rust/cubesql/cubesql/Cargo.toml index ecf0c898ddc89..dcb7d5ba238ba 100644 --- a/rust/cubesql/cubesql/Cargo.toml +++ b/rust/cubesql/cubesql/Cargo.toml @@ -54,6 +54,7 @@ sha1_smol = "1.0.0" tera = { version = "1", default-features = false } minijinja = { version = "1", features = ["json", "loader"] } lru = "0.13.0" +moka = { version = "0.12", features = ["future"] } sha2 = "0.10.8" bigdecimal = "0.4.2" indexmap = "1.9.3" diff --git a/rust/cubesql/cubesql/src/config/mod.rs b/rust/cubesql/cubesql/src/config/mod.rs index a3ed43d99077d..d7977a5d4feb7 100644 --- a/rust/cubesql/cubesql/src/config/mod.rs +++ b/rust/cubesql/cubesql/src/config/mod.rs @@ -100,12 +100,14 @@ pub trait ConfigObj: DIService + Debug { fn compiler_cache_size(&self) -> usize; - fn query_cache_size(&self) -> usize; + fn query_cache_size(&self) -> u64; fn enable_parameterized_rewrite_cache(&self) -> bool; fn enable_rewrite_cache(&self) -> bool; + fn query_cache_time_to_idle_secs(&self) -> u64; + fn push_down_pull_up_split(&self) -> bool; fn stream_mode(&self) -> bool; @@ -127,7 +129,8 @@ pub struct ConfigObjImpl { pub timezone: Option, pub disable_strict_agg_type_match: bool, pub compiler_cache_size: usize, - pub query_cache_size: usize, + pub query_cache_size: u64, + pub query_cache_time_to_idle_secs: u64, pub enable_parameterized_rewrite_cache: bool, pub enable_rewrite_cache: bool, pub push_down_pull_up_split: bool, @@ -163,6 +166,12 @@ impl ConfigObjImpl { auth_expire_secs: env_parse("CUBESQL_AUTH_EXPIRE_SECS", 300), compiler_cache_size: env_parse("CUBEJS_COMPILER_CACHE_SIZE", 100), query_cache_size: env_parse("CUBESQL_QUERY_CACHE_SIZE", 500), + query_cache_time_to_idle_secs: env_parse_duration( + "CUBESQL_QUERY_CACHE_TIME_TO_IDLE", + 60 * 60, + Some(60 * 60 * 24), + Some(60), + ), enable_parameterized_rewrite_cache: env_optparse("CUBESQL_PARAMETERIZED_REWRITE_CACHE") .unwrap_or(sql_push_down), enable_rewrite_cache: env_optparse("CUBESQL_REWRITE_CACHE").unwrap_or(sql_push_down), @@ -207,7 +216,7 @@ impl ConfigObj for ConfigObjImpl { self.compiler_cache_size } - fn query_cache_size(&self) -> usize { + fn query_cache_size(&self) -> u64 { self.query_cache_size } @@ -219,6 +228,10 @@ impl ConfigObj for ConfigObjImpl { self.enable_rewrite_cache } + fn query_cache_time_to_idle_secs(&self) -> u64 { + self.query_cache_time_to_idle_secs + } + fn push_down_pull_up_split(&self) -> bool { self.push_down_pull_up_split } @@ -262,9 +275,10 @@ impl Config { timezone, disable_strict_agg_type_match: false, compiler_cache_size: 100, - query_cache_size: 500, + query_cache_size: 500u64, enable_parameterized_rewrite_cache: false, enable_rewrite_cache: false, + query_cache_time_to_idle_secs: 1800, push_down_pull_up_split: true, stream_mode: false, non_streaming_query_max_row_limit: 50000, @@ -398,4 +412,47 @@ where }) } +pub fn env_parse_duration(name: &str, default: T, max: Option, min: Option) -> T +where + T: FromStr + PartialOrd + Display, + T::Err: Display, +{ + let v = match env::var(name).ok() { + None => { + return default; + } + Some(v) => v, + }; + + let n = match v.parse::() { + Ok(n) => n, + Err(e) => panic!( + "could not parse environment variable '{}' with '{}' value: {}", + name, v, e + ), + }; + + if let Some(max) = max { + if n > max { + panic!( + "wrong configuration for environment variable '{}' with '{}' value: greater then max size {}", + name, v, + max + ) + } + }; + + if let Some(min) = min { + if n < min { + panic!( + "wrong configuration for environment variable '{}' with '{}' value: lower then min size {}", + name, v, + min + ) + } + }; + + n +} + pub type LoopHandle = JoinHandle>; diff --git a/rust/cubesql/cubesql/src/sql/compiler_cache.rs b/rust/cubesql/cubesql/src/sql/compiler_cache.rs index 6444d2c89de24..15da9dba34dbc 100644 --- a/rust/cubesql/cubesql/src/sql/compiler_cache.rs +++ b/rust/cubesql/cubesql/src/sql/compiler_cache.rs @@ -14,7 +14,8 @@ use crate::{ use async_trait::async_trait; use datafusion::scalar::ScalarValue; use lru::LruCache; -use std::{collections::HashMap, fmt::Debug, num::NonZeroUsize, sync::Arc}; +use moka::future::Cache as MokaCache; +use std::{collections::HashMap, fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration}; use uuid::Uuid; #[async_trait] @@ -64,8 +65,8 @@ pub struct CompilerCacheImpl { pub struct CompilerCacheEntry { meta_context: Arc, rewrite_rules: RWLockAsync>>>, - parameterized_cache: MutexAsync>, - queries_cache: MutexAsync>, + parameterized_cache: MokaCache<[u8; 32], CubeEGraph>, + queries_cache: MokaCache<[u8; 32], CubeEGraph>, } crate::di_service!(CompilerCacheImpl, [CompilerCache]); @@ -120,18 +121,20 @@ impl CompilerCache for CompilerCacheImpl { ) -> Result { let graph_key = egraph_hash(¶meterized_graph, None); - let cache_entry_clone = Arc::clone(&cache_entry); - let mut rewrites_cache_lock = cache_entry.parameterized_cache.lock().await; - if let Some(rewrite_entry) = rewrites_cache_lock.get(&graph_key) { - Ok(rewrite_entry.clone()) - } else { - let mut rewriter = Rewriter::new(parameterized_graph, cube_context); - let rewrite_entry = rewriter - .run_rewrite_to_completion(cache_entry_clone, qtrace) - .await?; - rewrites_cache_lock.put(graph_key, rewrite_entry.clone()); - Ok(rewrite_entry) + if let Some(rewrite_entry) = cache_entry.parameterized_cache.get(&graph_key).await { + return Ok(rewrite_entry); } + + let cache_entry_clone = Arc::clone(&cache_entry); + let mut rewriter = Rewriter::new(parameterized_graph, cube_context); + let rewrite_entry = rewriter + .run_rewrite_to_completion(cache_entry_clone, qtrace) + .await?; + cache_entry + .parameterized_cache + .insert(graph_key, rewrite_entry.clone()) + .await; + Ok(rewrite_entry) } async fn rewrite( @@ -152,30 +155,35 @@ impl CompilerCache for CompilerCacheImpl { let graph_key = egraph_hash(&input_plan, Some(param_values)); + if let Some(plan) = cache_entry.queries_cache.get(&graph_key).await { + return Ok(plan); + } + let cache_entry_clone = Arc::clone(&cache_entry); - let mut rewrites_cache_lock = cache_entry.queries_cache.lock().await; - if let Some(plan) = rewrites_cache_lock.get(&graph_key) { - Ok(plan.clone()) + let graph = if self.config_obj.enable_parameterized_rewrite_cache() { + self.parameterized_rewrite( + Arc::clone(&cache_entry), + cube_context.clone(), + input_plan, + qtrace, + ) + .await? } else { - let graph = if self.config_obj.enable_parameterized_rewrite_cache() { - self.parameterized_rewrite( - Arc::clone(&cache_entry), - cube_context.clone(), - input_plan, - qtrace, - ) - .await? - } else { - input_plan - }; - let mut rewriter = Rewriter::new(graph, cube_context); - rewriter.add_param_values(param_values)?; - let final_plan = rewriter - .run_rewrite_to_completion(cache_entry_clone, qtrace) - .await?; - rewrites_cache_lock.put(graph_key, final_plan.clone()); - Ok(final_plan) - } + input_plan + }; + + let mut rewriter = Rewriter::new(graph, cube_context); + rewriter.add_param_values(param_values)?; + let final_plan = rewriter + .run_rewrite_to_completion(cache_entry_clone, qtrace) + .await?; + + cache_entry + .queries_cache + .insert(graph_key, final_plan.clone()) + .await; + + Ok(final_plan) } async fn get_cache_entry( @@ -202,15 +210,18 @@ impl CompilerCache for CompilerCacheImpl { .get(&(meta_context.compiler_id, protocol.clone())) .cloned() .unwrap_or_else(|| { + let ttl = Duration::from_secs(self.config_obj.query_cache_time_to_idle_secs()); let cache_entry = Arc::new(CompilerCacheEntry { meta_context: meta_context.clone(), rewrite_rules: RWLockAsync::new(HashMap::new()), - parameterized_cache: MutexAsync::new(LruCache::new( - NonZeroUsize::new(self.config_obj.query_cache_size()).unwrap(), - )), - queries_cache: MutexAsync::new(LruCache::new( - NonZeroUsize::new(self.config_obj.query_cache_size()).unwrap(), - )), + parameterized_cache: MokaCache::builder() + .max_capacity(self.config_obj.query_cache_size()) + .time_to_idle(ttl) + .build(), + queries_cache: MokaCache::builder() + .max_capacity(self.config_obj.query_cache_size()) + .time_to_idle(ttl) + .build(), }); compiler_id_to_entry.put( (meta_context.compiler_id, protocol.clone()),