From 09fde09fd3c1c4a718be4768e3d7f0c63cf4346f Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Fri, 14 Jul 2023 22:39:22 +0300 Subject: [PATCH 1/2] feat(cubestore): Tracking data amount using in processing --- rust/cubestore/cubestore/src/cluster/mod.rs | 108 ++++- .../cubestore/src/cluster/rate_limiter.rs | 416 ++++++++++++++++++ rust/cubestore/cubestore/src/config/mod.rs | 18 + rust/cubestore/cubestore/src/import/mod.rs | 30 +- .../cubestore/src/queryplanner/mod.rs | 1 + .../src/queryplanner/optimizations/mod.rs | 21 +- .../optimizations/trace_data_loaded.rs | 19 + .../src/queryplanner/query_executor.rs | 28 +- .../src/queryplanner/trace_data_loaded.rs | 137 ++++++ rust/cubestore/cubestore/src/sql/mod.rs | 2 +- .../cubestore/src/store/compaction.rs | 67 ++- rust/cubestore/cubestore/src/store/mod.rs | 16 +- .../cubestore/src/util/batch_memory.rs | 11 + rust/cubestore/cubestore/src/util/mod.rs | 1 + 14 files changed, 826 insertions(+), 49 deletions(-) create mode 100644 rust/cubestore/cubestore/src/cluster/rate_limiter.rs create mode 100644 rust/cubestore/cubestore/src/queryplanner/optimizations/trace_data_loaded.rs create mode 100644 rust/cubestore/cubestore/src/queryplanner/trace_data_loaded.rs create mode 100644 rust/cubestore/cubestore/src/util/batch_memory.rs diff --git a/rust/cubestore/cubestore/src/cluster/mod.rs b/rust/cubestore/cubestore/src/cluster/mod.rs index 2c48ac9808fbd..4c60415f861f5 100644 --- a/rust/cubestore/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/cubestore/src/cluster/mod.rs @@ -4,11 +4,14 @@ pub mod transport; #[cfg(not(target_os = "windows"))] pub mod worker_pool; +pub mod rate_limiter; + #[cfg(not(target_os = "windows"))] use crate::cluster::worker_pool::{worker_main, MessageProcessor, WorkerPool}; use crate::ack_error; use crate::cluster::message::NetworkMessage; +use crate::cluster::rate_limiter::ProcessRateLimiter; use crate::cluster::transport::{ClusterTransport, MetaStoreTransport, WorkerConnection}; use crate::config::injection::{DIService, Injector}; use crate::config::{is_router, WorkerServices}; @@ -28,6 +31,7 @@ use crate::metastore::{ }; use crate::queryplanner::query_executor::{QueryExecutor, SerializedRecordBatchStream}; use crate::queryplanner::serialized_plan::SerializedPlan; +use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::remotefs::RemoteFs; use crate::store::compaction::CompactionService; use crate::store::ChunkDataStore; @@ -183,7 +187,7 @@ pub struct ClusterImpl { Arc< WorkerPool< WorkerMessage, - (SchemaRef, Vec), + (SchemaRef, Vec, usize), WorkerProcessor, >, >, @@ -195,6 +199,7 @@ pub struct ClusterImpl { close_worker_socket_tx: watch::Sender, close_worker_socket_rx: RwLock>, tracing_helper: Arc, + process_rate_limiter: Arc, } crate::di_service!(ClusterImpl, [Cluster]); @@ -213,13 +218,13 @@ pub struct WorkerProcessor; #[cfg(not(target_os = "windows"))] #[async_trait] -impl MessageProcessor)> +impl MessageProcessor, usize)> for WorkerProcessor { async fn process( services: &WorkerServices, args: WorkerMessage, - ) -> Result<(SchemaRef, Vec), CubeError> { + ) -> Result<(SchemaRef, Vec, usize), CubeError> { match args { WorkerMessage::Select( plan_node, @@ -256,9 +261,9 @@ impl MessageProcessor), WorkerProcessor>, + worker_main::< + WorkerMessage, + (SchemaRef, Vec, usize), + WorkerProcessor, + >, ); } @@ -291,6 +300,7 @@ struct JobRunner { chunk_store: Arc, compaction_service: Arc, import_service: Arc, + process_rate_limiter: Arc, server_name: String, notify: Arc, stop_token: CancellationToken, @@ -903,8 +913,19 @@ impl JobRunner { if let RowKey::Table(TableId::Partitions, partition_id) = job.row_reference() { let compaction_service = self.compaction_service.clone(); let partition_id = *partition_id; + let process_rate_limiter = self.process_rate_limiter.clone(); Ok(cube_ext::spawn(async move { - compaction_service.compact(partition_id).await + process_rate_limiter + .wait_for_allow(Some(Duration::from_secs(120))) + .await?; //TODO config, may be same ad orphaned timeout + let data_loaded_size = DataLoadedSize::new(); + let res = compaction_service + .compact(partition_id, data_loaded_size.clone()) + .await; + process_rate_limiter + .commit_task_usage(data_loaded_size.get() as i64) + .await; + res })) } else { Self::fail_job_row_key(job) @@ -984,11 +1005,29 @@ impl JobRunner { let table_id = *table_id; let import_service = self.import_service.clone(); let location = location.to_string(); + let process_rate_limiter = self.process_rate_limiter.clone(); Ok(cube_ext::spawn(async move { - import_service + let is_streaming = Table::is_stream_location(&location); + let data_loaded_size = if is_streaming { + None + } else { + Some(DataLoadedSize::new()) + }; + if !is_streaming { + process_rate_limiter + .wait_for_allow(Some(Duration::from_secs(120))) + .await?; //TODO config, may be same ad orphaned timeout + } + let res = import_service .clone() - .import_table_part(table_id, &location) - .await + .import_table_part(table_id, &location, data_loaded_size.clone()) + .await; + if let Some(data_loaded) = &data_loaded_size { + process_rate_limiter + .commit_task_usage(data_loaded.get() as i64) + .await; + } + res })) } else { Self::fail_job_row_key(job) @@ -998,8 +1037,19 @@ impl JobRunner { if let RowKey::Table(TableId::Chunks, chunk_id) = job.row_reference() { let chunk_store = self.chunk_store.clone(); let chunk_id = *chunk_id; + let process_rate_limiter = self.process_rate_limiter.clone(); Ok(cube_ext::spawn(async move { - chunk_store.repartition_chunk(chunk_id).await + process_rate_limiter + .wait_for_allow(Some(Duration::from_secs(120))) + .await?; //TODO config, may be same ad orphaned timeout + let data_loaded_size = DataLoadedSize::new(); + let res = chunk_store + .repartition_chunk(chunk_id, data_loaded_size.clone()) + .await; + process_rate_limiter + .commit_task_usage(data_loaded_size.get() as i64) + .await; + res })) } else { Self::fail_job_row_key(job) @@ -1030,6 +1080,7 @@ impl ClusterImpl { meta_store_sender: Sender, cluster_transport: Arc, tracing_helper: Arc, + process_rate_limiter: Arc, ) -> Arc { let (close_worker_socket_tx, close_worker_socket_rx) = watch::channel(false); Arc::new_cyclic(|this| ClusterImpl { @@ -1052,6 +1103,7 @@ impl ClusterImpl { close_worker_socket_tx, close_worker_socket_rx: RwLock::new(close_worker_socket_rx), tracing_helper, + process_rate_limiter, }) } @@ -1100,6 +1152,7 @@ impl ClusterImpl { chunk_store: self.injector.upgrade().unwrap().get_service_typed().await, compaction_service: self.injector.upgrade().unwrap().get_service_typed().await, import_service: self.injector.upgrade().unwrap().get_service_typed().await, + process_rate_limiter: self.process_rate_limiter.clone(), server_name: self.server_name.clone(), notify: if is_long_running { self.long_running_job_notify.clone() @@ -1113,6 +1166,10 @@ impl ClusterImpl { job_runner.processing_loop().await; })); } + let process_rate_limiter = self.process_rate_limiter.clone(); + futures.push(cube_ext::spawn(async move { + process_rate_limiter.wait_processing_loop().await; + })); let stop_token = self.stop_token.clone(); let long_running_job_notify = self.long_running_job_notify.clone(); @@ -1147,6 +1204,8 @@ impl ClusterImpl { pool.stop_workers().await?; } + self.process_rate_limiter.stop_processing_loops(); + self.close_worker_socket_tx.send(true)?; Ok(()) } @@ -1313,11 +1372,32 @@ impl ClusterImpl { .await } - #[instrument(level = "trace", skip(self, plan_node))] async fn run_local_select_worker( &self, plan_node: SerializedPlan, ) -> Result<(SchemaRef, Vec), CubeError> { + self.process_rate_limiter + .wait_for_allow(Some(Duration::from_secs(self.config_obj.query_timeout()))) + .await?; + let res = self.run_local_select_worker_impl(plan_node).await; + match res { + Ok((schema, records, data_loaded_size)) => { + self.process_rate_limiter + .commit_task_usage(data_loaded_size as i64) + .await; + Ok((schema, records)) + } + Err(e) => { + self.process_rate_limiter.commit_task_usage(0).await; + Err(e) + } + } + } + #[instrument(level = "trace", skip(self, plan_node))] + async fn run_local_select_worker_impl( + &self, + plan_node: SerializedPlan, + ) -> Result<(SchemaRef, Vec, usize), CubeError> { let start = SystemTime::now(); debug!("Running select"); let remote_to_local_names = self.warmup_select_worker_files(&plan_node).await?; @@ -1411,7 +1491,7 @@ impl ClusterImpl { if res.is_none() { // TODO optimize for no double conversion - let (schema, records) = self + let (schema, records, data_loaded_size) = self .query_executor .execute_worker_plan( plan_node.clone(), @@ -1420,7 +1500,7 @@ impl ClusterImpl { ) .await?; let records = SerializedRecordBatchStream::write(schema.as_ref(), records); - res = Some(Ok((schema, records?))) + res = Some(Ok((schema, records?, data_loaded_size))) } info!("Running select completed ({:?})", start.elapsed()?); diff --git a/rust/cubestore/cubestore/src/cluster/rate_limiter.rs b/rust/cubestore/cubestore/src/cluster/rate_limiter.rs new file mode 100644 index 0000000000000..3e57f8402d40d --- /dev/null +++ b/rust/cubestore/cubestore/src/cluster/rate_limiter.rs @@ -0,0 +1,416 @@ +use crate::config::injection::DIService; +use crate::util::WorkerLoop; +use crate::CubeError; +use async_trait::async_trait; +use futures_timer::Delay; +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tokio::sync::{Notify, RwLock}; +use tokio_util::sync::CancellationToken; + +#[async_trait] +pub trait ProcessRateLimiter: DIService + Send + Sync { + async fn commit_task_usage(&self, size: i64); + + async fn current_budget(&self) -> i64; + + async fn current_budget_f64(&self) -> f64; + + async fn wait_for_allow(&self, timeout: Option) -> Result<(), CubeError>; + + async fn wait_processing_loop(self: Arc); + + async fn pending_size(&self) -> usize; + + fn stop_processing_loops(&self); +} + +crate::di_service!(ProcessRateLimiterImpl, [ProcessRateLimiter]); + +const MS_MUL: i64 = 1000; +const MAX_PENDING_ITEMS: usize = 100000; + +struct PendingItem { + notify: Notify, + timeout_at: SystemTime, +} + +impl PendingItem { + pub fn new(timeout: Duration) -> Arc { + Arc::new(Self { + notify: Notify::new(), + timeout_at: SystemTime::now() + timeout, + }) + } + + pub fn notify(&self) { + self.notify.notify_one() + } + + pub async fn wait(&self) { + self.notify.notified().await + } + + pub fn is_timeout(&self) -> bool { + self.timeout_at <= SystemTime::now() + } +} + +struct Budget { + rate: i64, + burst: i64, + deposit: i64, + value: i64, + last_refill: SystemTime, + pending: VecDeque>, +} + +impl Budget { + pub fn new(rate: i64, burst: i64, deposit: i64) -> Self { + Self { + rate, + burst, + deposit, + value: burst, + last_refill: SystemTime::now(), + pending: VecDeque::with_capacity(10000), + } + } + + pub fn value(&self) -> i64 { + self.value + } + + pub fn commit_task_usage(&mut self, value: i64) { + self.value -= value - self.deposit; + } + + pub fn refill(&mut self) { + let now = SystemTime::now(); + let res = now.duration_since(self.last_refill); + let duration = if let Ok(dur) = res { + dur.as_millis() + } else { + 0 + }; + if duration > 0 { + self.value = (self.value + duration as i64 * self.rate).min(self.burst); + self.last_refill = now; + } + self.process_pending(); + } + + pub fn try_allow( + &mut self, + timeout: Option, + ) -> Result>, CubeError> { + self.refill(); + if self.pending.is_empty() && self.value >= self.deposit { + self.value -= self.deposit; + Ok(None) + } else if let Some(timeout) = timeout { + if self.pending_size() >= MAX_PENDING_ITEMS { + Err(CubeError::internal("Too many pending items".to_string())) + } else { + let pending_item = PendingItem::new(timeout); + self.pending.push_back(pending_item.clone()); + Ok(Some(pending_item)) + } + } else { + Err(CubeError::internal( + "Process can not be started due to rate limit".to_string(), + )) + } + } + + fn process_pending(&mut self) { + if self.pending.is_empty() { + return; + } + + loop { + if let Some(item) = self.pending.front() { + if item.is_timeout() { + item.notify(); + self.pending.pop_front(); + } else if self.value >= self.deposit { + self.value -= self.deposit; + item.notify(); + self.pending.pop_front(); + } else { + break; + } + } else { + break; + } + } + } + + pub fn pending_size(&self) -> usize { + self.pending.len() + } +} + +pub struct ProcessRateLimiterImpl { + budget: RwLock, + cancel_token: CancellationToken, + pending_process_loop: WorkerLoop, +} + +impl ProcessRateLimiterImpl { + /// Crates new limitter for rate of data processing + /// per_second - the amount of available for processing data renewable per second + /// burst - the maximum amount of available for processing data + /// deposit_size - the fixed amount substracted form available stock at start of task processing + pub fn new(per_second: i64, burst: i64, deposit_size: i64) -> Arc { + Arc::new(Self { + budget: RwLock::new(Budget::new( + per_second, + burst * MS_MUL, + deposit_size * MS_MUL, + )), + cancel_token: CancellationToken::new(), + pending_process_loop: WorkerLoop::new("RateLimiterPendingProcessing"), + }) + } + + async fn refill_budget(&self) { + self.budget.write().await.refill(); + } +} + +#[async_trait] +impl ProcessRateLimiter for ProcessRateLimiterImpl { + async fn commit_task_usage(&self, size: i64) { + self.budget.write().await.commit_task_usage(size * MS_MUL); + } + + async fn current_budget(&self) -> i64 { + self.budget.read().await.value() / MS_MUL + } + + async fn current_budget_f64(&self) -> f64 { + self.budget.read().await.value() as f64 / MS_MUL as f64 + } + + async fn wait_for_allow(&self, timeout: Option) -> Result<(), CubeError> { + let result = self.budget.write().await.try_allow(timeout); + match result { + Ok(None) => Ok(()), + Ok(Some(pending)) => { + let timeout = if let Some(t) = timeout { + t + } else { + Duration::from_millis(0) + }; + tokio::select! { + _ = self.cancel_token.cancelled() => { + Ok(()) + } + _ = pending.wait() => { + if pending.is_timeout() { + Err(CubeError::internal("Process can not be started due aaa to rate limit".to_string())) + } else { + Ok(()) + } + } + _ = Delay::new(timeout) => { + Err(CubeError::internal("Process can not be started due !!! to rate limit".to_string())) + } + } + } + Err(e) => Err(e), + } + } + + async fn wait_processing_loop(self: Arc) { + let scheduler = self.clone(); + scheduler + .pending_process_loop + .process( + scheduler.clone(), + async move |_| Ok(Delay::new(Duration::from_millis(10)).await), + async move |s, _| { + s.refill_budget().await; + Ok(()) + }, + ) + .await; + } + + fn stop_processing_loops(&self) { + self.cancel_token.cancel(); + self.pending_process_loop.stop(); + } + + async fn pending_size(&self) -> usize { + self.budget.read().await.pending_size() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::cube_ext; + use futures_util::future::join_all; + use tokio::time::sleep; + + #[tokio::test] + async fn rate_limiter_without_refill_test() { + let rate_limiter = ProcessRateLimiterImpl::new(0, 100, 10); + let r = rate_limiter.wait_for_allow(None).await; + assert!(r.is_ok()); + assert_eq!(rate_limiter.current_budget().await, 90); + rate_limiter.commit_task_usage(50).await; + assert_eq!(rate_limiter.current_budget().await, 50); + + let r = rate_limiter.wait_for_allow(None).await; + assert!(r.is_ok()); + assert_eq!(rate_limiter.current_budget().await, 40); + rate_limiter.commit_task_usage(45).await; + assert_eq!(rate_limiter.current_budget().await, 5); + + assert!(rate_limiter.wait_for_allow(None).await.is_err()); + + rate_limiter.commit_task_usage(20).await; + assert_eq!(rate_limiter.current_budget().await, -5); + + assert!(rate_limiter.wait_for_allow(None).await.is_err()); + } + + #[tokio::test] + async fn rate_limiter_base_refill_test() { + let rate_limiter = ProcessRateLimiterImpl::new(10, 10, 0); + rate_limiter.commit_task_usage(3).await; + assert_eq!(rate_limiter.current_budget().await, 7); + sleep(Duration::from_millis(200)).await; + let r = rate_limiter.wait_for_allow(None).await; + assert!(r.is_ok()); + assert_eq!(rate_limiter.current_budget().await, 9); + sleep(Duration::from_millis(300)).await; + let r = rate_limiter.wait_for_allow(None).await; + assert!(r.is_ok()); + assert_eq!(rate_limiter.current_budget().await, 10); + + rate_limiter.commit_task_usage(12).await; + let r = rate_limiter.wait_for_allow(None).await; + assert!(r.is_err()); + sleep(Duration::from_millis(200)).await; + let r = rate_limiter.wait_for_allow(None).await; + assert!(r.is_ok()); + } + #[tokio::test] + async fn rate_limiter_pending_test() { + let rate_limiter = ProcessRateLimiterImpl::new(10, 10, 2); + let rl = rate_limiter.clone(); + let proc = cube_ext::spawn(async move { rl.wait_processing_loop().await }); + let mut futures = Vec::new(); + for _ in 0..10 { + let now = SystemTime::now(); + let limiter_to_move = rate_limiter.clone(); + futures.push(cube_ext::spawn(async move { + let res = limiter_to_move + .wait_for_allow(Some(Duration::from_millis(1100))) + .await; + match res { + Ok(_) => Some(now.elapsed().unwrap().as_millis() / 100), + Err(_) => None, + } + })); + //Delay::new(Duration::from_millis(5)).await; + } + let r = join_all(futures) + .await + .into_iter() + .collect::, _>>() + .unwrap(); + assert_eq!( + r, + vec![ + Some(0), + Some(0), + Some(0), + Some(0), + Some(0), + Some(2), + Some(4), + Some(6), + Some(8), + Some(10) + ] + ); + assert_eq!(rate_limiter.pending_size().await, 0); + + Delay::new(Duration::from_millis(1000)).await; + + let mut futures = Vec::new(); + for _ in 0..10 { + let now = SystemTime::now(); + let limiter_to_move = rate_limiter.clone(); + futures.push(cube_ext::spawn(async move { + let res = limiter_to_move + .wait_for_allow(Some(Duration::from_millis(500))) + .await; + match res { + Ok(_) => Some(now.elapsed().unwrap().as_millis() / 100), + Err(_) => None, + } + })); + //Delay::new(Duration::from_millis(5)).await; + } + let r = join_all(futures) + .await + .into_iter() + .collect::, _>>() + .unwrap(); + assert_eq!( + r, + vec![ + Some(0), + Some(0), + Some(0), + Some(0), + Some(0), + Some(2), + Some(4), + None, + None, + None + ] + ); + + Delay::new(Duration::from_millis(1050)).await; + assert_eq!(rate_limiter.current_budget().await, 10); + rate_limiter.commit_task_usage(12).await; + assert_eq!(rate_limiter.current_budget().await, 0); + + let mut futures = Vec::new(); + for _ in 0..2 { + let now = SystemTime::now(); + let limiter_to_move = rate_limiter.clone(); + futures.push(cube_ext::spawn(async move { + let res = limiter_to_move + .wait_for_allow(Some(Duration::from_millis(100))) + .await; + match res { + Ok(_) => Some(now.elapsed().unwrap().as_millis() / 100), + Err(_) => None, + } + })); + Delay::new(Duration::from_millis(300)).await; + } + let r = join_all(futures) + .await + .into_iter() + .collect::, _>>() + .unwrap(); + assert_eq!(r, vec![None, Some(0)]); + + Delay::new(Duration::from_millis(15)).await; + assert_eq!(rate_limiter.pending_size().await, 0); + + rate_limiter.stop_processing_loops(); + proc.await.unwrap(); + } +} diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 6964e54a868d5..ca815af2c7ac1 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -5,6 +5,7 @@ pub mod processing_loop; use crate::cachestore::{ CacheStore, CacheStoreSchedulerImpl, ClusterCacheStoreClient, LazyRocksCacheStore, }; +use crate::cluster::rate_limiter::{ProcessRateLimiter, ProcessRateLimiterImpl}; use crate::cluster::transport::{ ClusterTransport, ClusterTransportImpl, MetaStoreTransport, MetaStoreTransportImpl, }; @@ -1755,6 +1756,22 @@ impl Config { }) .await; + self.injector + .register_typed::(async move |_| { + ProcessRateLimiterImpl::new( + env_parse( + "CUBESTORE_DATA_PROCESSING_RATE_LIMIT", + 1 * 1024 * 1024 * 1024, + ), + env_parse( + "CUBESTORE_DATA_PROCESSING_RATE_BURST", + 50 * 1024 * 1024 * 1024, + ), + env_parse("CUBESTORE_DATA_PROCESSING_RATE_DEPOSIT", 500 * 1024), + ) + }) + .await; + let cluster_meta_store_sender = metastore_event_sender_to_move.clone(); self.injector @@ -1774,6 +1791,7 @@ impl Config { cluster_meta_store_sender, i.get_service_typed().await, i.get_service_typed().await, + i.get_service_typed().await, ) }) .await; diff --git a/rust/cubestore/cubestore/src/import/mod.rs b/rust/cubestore/cubestore/src/import/mod.rs index 86ccc04f616ac..4cc362ddf470c 100644 --- a/rust/cubestore/cubestore/src/import/mod.rs +++ b/rust/cubestore/cubestore/src/import/mod.rs @@ -31,12 +31,14 @@ use crate::import::limits::ConcurrencyLimits; use crate::metastore::table::Table; use crate::metastore::{is_valid_plain_binary_hll, HllFlavour, IdRow}; use crate::metastore::{Column, ColumnType, ImportFormat, MetaStore}; +use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::remotefs::RemoteFs; use crate::sql::timestamp_from_string; use crate::store::ChunkDataStore; use crate::streaming::StreamingService; use crate::table::data::{append_row, create_array_builders}; use crate::table::{Row, TableValue}; +use crate::util::batch_memory::columns_vec_buffer_size; use crate::util::decimal::Decimal; use crate::util::maybe_owned::MaybeOwnedStr; use crate::CubeError; @@ -442,7 +444,12 @@ impl Stream for CsvLineStream { #[async_trait] pub trait ImportService: DIService + Send + Sync { async fn import_table(&self, table_id: u64) -> Result<(), CubeError>; - async fn import_table_part(&self, table_id: u64, location: &str) -> Result<(), CubeError>; + async fn import_table_part( + &self, + table_id: u64, + location: &str, + data_loaded_size: Option>, + ) -> Result<(), CubeError>; async fn validate_table_location(&self, table_id: u64, location: &str) -> Result<(), CubeError>; async fn estimate_location_row_count(&self, location: &str) -> Result; @@ -609,6 +616,7 @@ impl ImportServiceImpl { table: &IdRow, format: ImportFormat, location: &str, + data_loaded_size: Option>, ) -> Result<(), CubeError> { let temp_dir = self.config_obj.data_dir().join("tmp"); tokio::fs::create_dir_all(temp_dir.clone()) @@ -656,7 +664,13 @@ impl ImportServiceImpl { mem::swap(&mut builders, &mut to_add); num_rows = 0; - ingestion.queue_data_frame(finish(to_add)).await?; + let builded_rows = finish(to_add); + + if let Some(data_loaded_size) = &data_loaded_size { + data_loaded_size.add(columns_vec_buffer_size(&builded_rows)); + } + + ingestion.queue_data_frame(builded_rows).await?; } } } @@ -702,7 +716,7 @@ impl ImportService for ImportServiceImpl { table )))?; for location in locations.iter() { - self.do_import(&table, *format, location).await?; + self.do_import(&table, *format, location, None).await?; } for location in locations.iter() { @@ -712,7 +726,12 @@ impl ImportService for ImportServiceImpl { Ok(()) } - async fn import_table_part(&self, table_id: u64, location: &str) -> Result<(), CubeError> { + async fn import_table_part( + &self, + table_id: u64, + location: &str, + data_loaded_size: Option>, + ) -> Result<(), CubeError> { let table = self.meta_store.get_table_by_id(table_id).await?; let format = table .get_row() @@ -739,7 +758,8 @@ impl ImportService for ImportServiceImpl { if Table::is_stream_location(location) { self.streaming_service.stream_table(table, location).await?; } else { - self.do_import(&table, *format, location).await?; + self.do_import(&table, *format, location, data_loaded_size.clone()) + .await?; self.drop_temp_uploads(&location).await?; } diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index 0399e7d9fd070..4e0934a0071d1 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -10,6 +10,7 @@ pub mod query_executor; pub mod serialized_plan; mod tail_limit; mod topk; +pub mod trace_data_loaded; pub use topk::MIN_TOPK_STREAM_ROWS; mod coalesce; mod filter_by_key_range; diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs index 8981147e793ca..e33f2c62a272b 100644 --- a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs @@ -2,6 +2,7 @@ mod check_memory; mod distributed_partial_aggregate; mod prefer_inplace_aggregates; pub mod rewrite_plan; +mod trace_data_loaded; use crate::cluster::Cluster; use crate::queryplanner::optimizations::distributed_partial_aggregate::{ @@ -10,6 +11,7 @@ use crate::queryplanner::optimizations::distributed_partial_aggregate::{ use crate::queryplanner::optimizations::prefer_inplace_aggregates::try_switch_to_inplace_aggregates; use crate::queryplanner::planning::CubeExtensionPlanner; use crate::queryplanner::serialized_plan::SerializedPlan; +use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::util::memory::MemoryHandler; use check_memory::add_check_memory_exec; use datafusion::error::DataFusionError; @@ -19,11 +21,13 @@ use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner}; use rewrite_plan::rewrite_physical_plan; use std::sync::Arc; +use trace_data_loaded::add_trace_data_loaded_exec; pub struct CubeQueryPlanner { cluster: Option>, serialized_plan: Arc, memory_handler: Arc, + data_loaded_size: Option>, } impl CubeQueryPlanner { @@ -36,17 +40,20 @@ impl CubeQueryPlanner { cluster: Some(cluster), serialized_plan, memory_handler, + data_loaded_size: None, } } pub fn new_on_worker( serialized_plan: Arc, memory_handler: Arc, + data_loaded_size: Option>, ) -> CubeQueryPlanner { CubeQueryPlanner { serialized_plan, cluster: None, memory_handler, + data_loaded_size, } } } @@ -64,18 +71,30 @@ impl QueryPlanner for CubeQueryPlanner { })]) .create_physical_plan(logical_plan, ctx_state)?; // TODO: assert there is only a single ClusterSendExec in the plan. - finalize_physical_plan(p, self.memory_handler.clone()) + finalize_physical_plan( + p, + self.memory_handler.clone(), + self.data_loaded_size.clone(), + ) } } fn finalize_physical_plan( p: Arc, memory_handler: Arc, + data_loaded_size: Option>, ) -> Result, DataFusionError> { let p = rewrite_physical_plan(p.as_ref(), &mut |p| try_switch_to_inplace_aggregates(p))?; let p = rewrite_physical_plan(p.as_ref(), &mut |p| push_aggregate_to_workers(p))?; let p = rewrite_physical_plan(p.as_ref(), &mut |p| { add_check_memory_exec(p, memory_handler.clone()) })?; + let p = if let Some(data_loaded_size) = data_loaded_size { + rewrite_physical_plan(p.as_ref(), &mut |p| { + add_trace_data_loaded_exec(p, data_loaded_size.clone()) + })? + } else { + p + }; rewrite_physical_plan(p.as_ref(), &mut |p| add_limit_to_workers(p)) } diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/trace_data_loaded.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/trace_data_loaded.rs new file mode 100644 index 0000000000000..03f16a0a2ebe7 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/trace_data_loaded.rs @@ -0,0 +1,19 @@ +use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec}; +use datafusion::error::DataFusionError; +use datafusion::physical_plan::parquet::ParquetExec; +use datafusion::physical_plan::ExecutionPlan; +use std::sync::Arc; + +/// Add `TraceDataLoadedExec` behind ParquetExec nodes. +pub fn add_trace_data_loaded_exec( + p: Arc, + data_loaded_size: Arc, +) -> Result, DataFusionError> { + let p_any = p.as_any(); + if p_any.is::() { + let trace_data_loaded = Arc::new(TraceDataLoadedExec::new(p, data_loaded_size.clone())); + Ok(trace_data_loaded) + } else { + Ok(p) + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 2d9d36c13dc5d..321cba21557a9 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -9,6 +9,7 @@ use crate::queryplanner::optimizations::CubeQueryPlanner; use crate::queryplanner::planning::{get_worker_plan, Snapshot, Snapshots}; use crate::queryplanner::pretty_printers::{pp_phys_plan, pp_plan}; use crate::queryplanner::serialized_plan::{IndexSnapshot, RowFilter, RowRange, SerializedPlan}; +use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::store::DataFrame; use crate::table::data::rows_to_columns; use crate::table::parquet::CubestoreParquetMetadataCache; @@ -73,7 +74,7 @@ pub trait QueryExecutor: DIService + Send + Sync { plan: SerializedPlan, remote_to_local_names: HashMap, chunk_id_to_record_batches: HashMap>, - ) -> Result<(SchemaRef, Vec), CubeError>; + ) -> Result<(SchemaRef, Vec, usize), CubeError>; async fn router_plan( &self, @@ -86,6 +87,7 @@ pub trait QueryExecutor: DIService + Send + Sync { plan: SerializedPlan, remote_to_local_names: HashMap, chunk_id_to_record_batches: HashMap>, + data_loaded_size: Option>, ) -> Result<(Arc, LogicalPlan), CubeError>; async fn pp_worker_plan( @@ -161,9 +163,15 @@ impl QueryExecutor for QueryExecutorImpl { plan: SerializedPlan, remote_to_local_names: HashMap, chunk_id_to_record_batches: HashMap>, - ) -> Result<(SchemaRef, Vec), CubeError> { + ) -> Result<(SchemaRef, Vec, usize), CubeError> { + let data_loaded_size = DataLoadedSize::new(); let (physical_plan, logical_plan) = self - .worker_plan(plan, remote_to_local_names, chunk_id_to_record_batches) + .worker_plan( + plan, + remote_to_local_names, + chunk_id_to_record_batches, + Some(data_loaded_size.clone()), + ) .await?; let worker_plan; let max_batch_rows; @@ -219,7 +227,7 @@ impl QueryExecutor for QueryExecutorImpl { } // TODO: stream results as they become available. let results = regroup_batches(results?, max_batch_rows)?; - Ok((worker_plan.schema(), results)) + Ok((worker_plan.schema(), results, data_loaded_size.get())) } async fn router_plan( @@ -245,6 +253,7 @@ impl QueryExecutor for QueryExecutorImpl { plan: SerializedPlan, remote_to_local_names: HashMap, chunk_id_to_record_batches: HashMap>, + data_loaded_size: Option>, ) -> Result<(Arc, LogicalPlan), CubeError> { let plan_to_move = plan.logical_plan( remote_to_local_names, @@ -252,7 +261,7 @@ impl QueryExecutor for QueryExecutorImpl { self.parquet_metadata_cache.cache().clone(), )?; let plan = Arc::new(plan); - let ctx = self.worker_context(plan.clone())?; + let ctx = self.worker_context(plan.clone(), data_loaded_size)?; let plan_ctx = ctx.clone(); Ok(( plan_ctx.create_physical_plan(&plan_to_move.clone())?, @@ -267,7 +276,12 @@ impl QueryExecutor for QueryExecutorImpl { chunk_id_to_record_batches: HashMap>, ) -> Result { let (physical_plan, _) = self - .worker_plan(plan, remote_to_local_names, chunk_id_to_record_batches) + .worker_plan( + plan, + remote_to_local_names, + chunk_id_to_record_batches, + None, + ) .await?; let worker_plan; @@ -315,6 +329,7 @@ impl QueryExecutorImpl { fn worker_context( &self, serialized_plan: Arc, + data_loaded_size: Option>, ) -> Result, CubeError> { Ok(Arc::new(ExecutionContext::with_config( ExecutionConfig::new() @@ -323,6 +338,7 @@ impl QueryExecutorImpl { .with_query_planner(Arc::new(CubeQueryPlanner::new_on_worker( serialized_plan, self.memory_handler.clone(), + data_loaded_size, ))), ))) } diff --git a/rust/cubestore/cubestore/src/queryplanner/trace_data_loaded.rs b/rust/cubestore/cubestore/src/queryplanner/trace_data_loaded.rs new file mode 100644 index 0000000000000..712b9a3546271 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/trace_data_loaded.rs @@ -0,0 +1,137 @@ +use crate::util::batch_memory::record_batch_buffer_size; +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use datafusion::error::DataFusionError; +use datafusion::physical_plan::{ + ExecutionPlan, OptimizerHints, Partitioning, RecordBatchStream, SendableRecordBatchStream, +}; +use flatbuffers::bitflags::_core::any::Any; +use futures::stream::Stream; +use futures::StreamExt; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; + +#[derive(Debug)] +pub struct DataLoadedSize { + size: AtomicUsize, +} + +impl DataLoadedSize { + pub fn new() -> Arc { + Arc::new(Self { + size: AtomicUsize::new(0), + }) + } + + pub fn add(&self, size: usize) { + self.size.fetch_add(size, Ordering::SeqCst); + } + + pub fn get(&self) -> usize { + self.size.load(Ordering::SeqCst) + } +} + +#[derive(Debug)] +pub struct TraceDataLoadedExec { + pub input: Arc, + pub data_loaded_size: Arc, +} + +impl TraceDataLoadedExec { + pub fn new(input: Arc, data_loaded_size: Arc) -> Self { + Self { + input, + data_loaded_size, + } + } +} + +#[async_trait] +impl ExecutionPlan for TraceDataLoadedExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> Partitioning { + self.input.output_partitioning() + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result, DataFusionError> { + assert_eq!(children.len(), 1); + Ok(Arc::new(Self { + input: children.into_iter().next().unwrap(), + data_loaded_size: self.data_loaded_size.clone(), + })) + } + + fn output_hints(&self) -> OptimizerHints { + self.input.output_hints() + } + + async fn execute( + &self, + partition: usize, + ) -> Result { + if partition >= self.input.output_partitioning().partition_count() { + return Err(DataFusionError::Internal(format!( + "ExecutionPlanExec invalid partition {}", + partition + ))); + } + + let input = self.input.execute(partition).await?; + Ok(Box::pin(TraceDataLoadedStream { + schema: self.schema(), + data_loaded_size: self.data_loaded_size.clone(), + input, + })) + } +} + +struct TraceDataLoadedStream { + schema: SchemaRef, + data_loaded_size: Arc, + input: SendableRecordBatchStream, +} + +impl Stream for TraceDataLoadedStream { + type Item = ArrowResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.input.poll_next_unpin(cx).map(|x| match x { + Some(Ok(batch)) => { + self.data_loaded_size.add(record_batch_buffer_size(&batch)); + Some(Ok(batch)) + } + other => other, + }) + } + + fn size_hint(&self) -> (usize, Option) { + // same number of record batches + self.input.size_hint() + } +} + +impl RecordBatchStream for TraceDataLoadedStream { + /// Get the schema + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index ae1c7bb3c55f2..d9a6bb930d52a 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -1408,7 +1408,7 @@ impl SqlService for SqlServiceImpl { .0, worker: self .query_executor - .worker_plan(worker_plan, mocked_names, chunk_ids_to_batches) + .worker_plan(worker_plan, mocked_names, chunk_ids_to_batches, None) .await? .0, }); diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index bca3aae8f8c5b..791ae891a09bb 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -8,12 +8,14 @@ use crate::metastore::{ deactivate_table_on_corrupt_data, table::Table, Chunk, IdRow, Index, IndexType, MetaStore, Partition, PartitionData, }; +use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec}; use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs}; use crate::store::{min_max_values_from_data, ChunkDataStore, ChunkStore, ROW_GROUP_SIZE}; use crate::table::data::{cmp_min_rows, cmp_partition_key}; use crate::table::parquet::{arrow_schema, ParquetTableStore}; use crate::table::redistribute::redistribute; use crate::table::{Row, TableValue}; +use crate::util::batch_memory::record_batch_buffer_size; use crate::CubeError; use arrow::array::{ArrayRef, UInt64Array}; use arrow::compute::{lexsort_to_indices, SortColumn, SortOptions}; @@ -50,7 +52,11 @@ use tokio::task::JoinHandle; #[async_trait] pub trait CompactionService: DIService + Send + Sync { - async fn compact(&self, partition_id: u64) -> Result<(), CubeError>; + async fn compact( + &self, + partition_id: u64, + data_loaded_size: Arc, + ) -> Result<(), CubeError>; async fn compact_in_memory_chunks(&self, partition_id: u64) -> Result<(), CubeError>; async fn compact_node_in_memory_chunks(&self, node: String) -> Result<(), CubeError>; /// Split multi-partition that has too many rows. Figures out the keys based on stored data. @@ -415,7 +421,11 @@ impl CompactionServiceImpl { } #[async_trait] impl CompactionService for CompactionServiceImpl { - async fn compact(&self, partition_id: u64) -> Result<(), CubeError> { + async fn compact( + &self, + partition_id: u64, + data_loaded_size: Arc, + ) -> Result<(), CubeError> { let (partition, index, table, multi_part) = self .meta_store .get_partition_for_compaction(partition_id) @@ -466,7 +476,7 @@ impl CompactionService for CompactionServiceImpl { let mut data = Vec::new(); let mut chunks_to_use = Vec::new(); - let mut total_size = 0; + let mut chunks_total_size = 0; let num_columns = index.get_row().columns().len(); for chunk in chunks.iter() { @@ -486,17 +496,18 @@ impl CompactionService for CompactionServiceImpl { index, chunk ); - for col in b.columns() { - total_size += col.get_array_memory_size(); - } + chunks_total_size += record_batch_buffer_size(&b); data.push(b); } chunks_to_use.push(chunk.clone()); - if total_size > self.config.compaction_chunks_in_memory_size_threshold() as usize { + if chunks_total_size > self.config.compaction_chunks_in_memory_size_threshold() as usize + { break; } } + data_loaded_size.add(chunks_total_size); + let chunks = chunks_to_use; let chunks_row_count = chunks @@ -629,14 +640,21 @@ impl CompactionService for CompactionServiceImpl { // Merge and write rows. let schema = Arc::new(arrow_schema(index.get_row())); let main_table: Arc = match old_partition_local { - Some(file) => Arc::new(ParquetExec::try_from_path( - file.as_str(), - None, - None, - ROW_GROUP_SIZE, - 1, - None, - )?), + Some(file) => { + let parquet_exec = Arc::new(ParquetExec::try_from_path( + file.as_str(), + None, + None, + ROW_GROUP_SIZE, + 1, + None, + )?); + + Arc::new(TraceDataLoadedExec::new( + parquet_exec, + data_loaded_size.clone(), + )) + } None => Arc::new(EmptyExec::new(false, schema.clone())), }; @@ -1496,7 +1514,10 @@ mod tests { remote_fs, Arc::new(config), ); - compaction_service.compact(1).await.unwrap(); + compaction_service + .compact(1, DataLoadedSize::new()) + .await + .unwrap(); fn sort_fn( a: &(u64, Option, Option), @@ -1566,7 +1587,10 @@ mod tests { .unwrap(); metastore.chunk_uploaded(4).await.unwrap(); - compaction_service.compact(next_partition_id).await.unwrap(); + compaction_service + .compact(next_partition_id, DataLoadedSize::new()) + .await + .unwrap(); let active_partitions = metastore .get_active_partitions_by_index_id(1) @@ -1896,7 +1920,7 @@ mod tests { config.config_obj(), ); compaction_service - .compact(partition.get_id()) + .compact(partition.get_id(), DataLoadedSize::new()) .await .unwrap(); @@ -1967,7 +1991,10 @@ mod tests { .join(", "); let query = format!("insert into test.a (a, b) values {}", values); service.exec_query(&query).await.unwrap(); - compaction_service.compact(1).await.unwrap(); + compaction_service + .compact(1, DataLoadedSize::new()) + .await + .unwrap(); let partitions = services .meta_store .get_active_partitions_by_index_id(1) @@ -1982,7 +2009,7 @@ mod tests { service.exec_query(&query).await.unwrap(); compaction_service - .compact(partitions[0].get_id()) + .compact(partitions[0].get_id(), DataLoadedSize::new()) .await .unwrap(); let partitions = services diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 30c1caeae585e..ed4d72e506634 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -22,6 +22,7 @@ use crate::metastore::{ }; use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs}; use crate::table::{Row, TableValue}; +use crate::util::batch_memory::columns_vec_buffer_size; use crate::CubeError; use arrow::datatypes::{Schema, SchemaRef}; use std::{ @@ -35,6 +36,7 @@ use crate::cluster::{node_name_by_partition, Cluster}; use crate::config::injection::DIService; use crate::config::ConfigObj; use crate::metastore::chunks::chunk_file_name; +use crate::queryplanner::trace_data_loaded::DataLoadedSize; use crate::table::data::cmp_partition_key; use crate::table::parquet::{arrow_schema, ParquetTableStore}; use arrow::array::{Array, ArrayRef, Int64Builder, StringBuilder, UInt64Array}; @@ -222,7 +224,11 @@ pub trait ChunkDataStore: DIService + Send + Sync { in_memory: bool, ) -> Result, CubeError>; async fn repartition(&self, partition_id: u64) -> Result<(), CubeError>; - async fn repartition_chunk(&self, chunk_id: u64) -> Result<(), CubeError>; + async fn repartition_chunk( + &self, + chunk_id: u64, + data_loaded_size: Arc, + ) -> Result<(), CubeError>; async fn get_chunk_columns(&self, chunk: IdRow) -> Result, CubeError>; async fn has_in_memory_chunk( &self, @@ -466,7 +472,11 @@ impl ChunkDataStore for ChunkStore { Ok(()) } - async fn repartition_chunk(&self, chunk_id: u64) -> Result<(), CubeError> { + async fn repartition_chunk( + &self, + chunk_id: u64, + data_loaded_size: Arc, + ) -> Result<(), CubeError> { let chunk = self.meta_store.get_chunk(chunk_id).await?; if !chunk.get_row().active() { log::debug!("Skipping repartition of inactive chunk: {:?}", chunk); @@ -501,6 +511,8 @@ impl ChunkDataStore for ChunkStore { )?) } + data_loaded_size.add(columns_vec_buffer_size(&columns)); + //There is no data in the chunk, so we just deactivate it if columns.len() == 0 || columns[0].data().len() == 0 { self.meta_store.deactivate_chunk(chunk_id).await?; diff --git a/rust/cubestore/cubestore/src/util/batch_memory.rs b/rust/cubestore/cubestore/src/util/batch_memory.rs new file mode 100644 index 0000000000000..d80297554cb59 --- /dev/null +++ b/rust/cubestore/cubestore/src/util/batch_memory.rs @@ -0,0 +1,11 @@ +use arrow::array::ArrayRef; +use arrow::record_batch::RecordBatch; + +pub fn record_batch_buffer_size(batch: &RecordBatch) -> usize { + columns_vec_buffer_size(batch.columns()) +} +pub fn columns_vec_buffer_size(columns: &[ArrayRef]) -> usize { + columns + .iter() + .fold(0, |size, col| size + col.get_buffer_memory_size()) +} diff --git a/rust/cubestore/cubestore/src/util/mod.rs b/rust/cubestore/cubestore/src/util/mod.rs index 2244f5ebbc74c..ce6a6480eed0a 100644 --- a/rust/cubestore/cubestore/src/util/mod.rs +++ b/rust/cubestore/cubestore/src/util/mod.rs @@ -1,4 +1,5 @@ pub mod aborting_join_handle; +pub mod batch_memory; pub mod decimal; pub mod error; pub mod lock; From 7f9e14a32658c715a4c9e263697f137fad4971e3 Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Wed, 19 Jul 2023 00:16:33 +0300 Subject: [PATCH 2/2] update --- rust/cubestore/cubestore/src/cluster/mod.rs | 28 +- .../cubestore/src/cluster/rate_limiter.rs | 421 ++---------------- rust/cubestore/cubestore/src/config/mod.rs | 16 +- 3 files changed, 60 insertions(+), 405 deletions(-) diff --git a/rust/cubestore/cubestore/src/cluster/mod.rs b/rust/cubestore/cubestore/src/cluster/mod.rs index 4c60415f861f5..2a4d9ffa8d4bc 100644 --- a/rust/cubestore/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/cubestore/src/cluster/mod.rs @@ -11,7 +11,7 @@ use crate::cluster::worker_pool::{worker_main, MessageProcessor, WorkerPool}; use crate::ack_error; use crate::cluster::message::NetworkMessage; -use crate::cluster::rate_limiter::ProcessRateLimiter; +use crate::cluster::rate_limiter::{ProcessRateLimiter, TaskType}; use crate::cluster::transport::{ClusterTransport, MetaStoreTransport, WorkerConnection}; use crate::config::injection::{DIService, Injector}; use crate::config::{is_router, WorkerServices}; @@ -914,16 +914,17 @@ impl JobRunner { let compaction_service = self.compaction_service.clone(); let partition_id = *partition_id; let process_rate_limiter = self.process_rate_limiter.clone(); + let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout())); Ok(cube_ext::spawn(async move { process_rate_limiter - .wait_for_allow(Some(Duration::from_secs(120))) + .wait_for_allow(TaskType::Job, timeout) .await?; //TODO config, may be same ad orphaned timeout let data_loaded_size = DataLoadedSize::new(); let res = compaction_service .compact(partition_id, data_loaded_size.clone()) .await; process_rate_limiter - .commit_task_usage(data_loaded_size.get() as i64) + .commit_task_usage(TaskType::Job, data_loaded_size.get() as i64) .await; res })) @@ -1006,6 +1007,7 @@ impl JobRunner { let import_service = self.import_service.clone(); let location = location.to_string(); let process_rate_limiter = self.process_rate_limiter.clone(); + let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout())); Ok(cube_ext::spawn(async move { let is_streaming = Table::is_stream_location(&location); let data_loaded_size = if is_streaming { @@ -1015,7 +1017,7 @@ impl JobRunner { }; if !is_streaming { process_rate_limiter - .wait_for_allow(Some(Duration::from_secs(120))) + .wait_for_allow(TaskType::Job, timeout) .await?; //TODO config, may be same ad orphaned timeout } let res = import_service @@ -1024,7 +1026,7 @@ impl JobRunner { .await; if let Some(data_loaded) = &data_loaded_size { process_rate_limiter - .commit_task_usage(data_loaded.get() as i64) + .commit_task_usage(TaskType::Job, data_loaded.get() as i64) .await; } res @@ -1038,16 +1040,17 @@ impl JobRunner { let chunk_store = self.chunk_store.clone(); let chunk_id = *chunk_id; let process_rate_limiter = self.process_rate_limiter.clone(); + let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout())); Ok(cube_ext::spawn(async move { process_rate_limiter - .wait_for_allow(Some(Duration::from_secs(120))) + .wait_for_allow(TaskType::Job, timeout) .await?; //TODO config, may be same ad orphaned timeout let data_loaded_size = DataLoadedSize::new(); let res = chunk_store .repartition_chunk(chunk_id, data_loaded_size.clone()) .await; process_rate_limiter - .commit_task_usage(data_loaded_size.get() as i64) + .commit_task_usage(TaskType::Job, data_loaded_size.get() as i64) .await; res })) @@ -1377,18 +1380,23 @@ impl ClusterImpl { plan_node: SerializedPlan, ) -> Result<(SchemaRef, Vec), CubeError> { self.process_rate_limiter - .wait_for_allow(Some(Duration::from_secs(self.config_obj.query_timeout()))) + .wait_for_allow( + TaskType::Job, + Some(Duration::from_secs(self.config_obj.query_timeout())), + ) .await?; let res = self.run_local_select_worker_impl(plan_node).await; match res { Ok((schema, records, data_loaded_size)) => { self.process_rate_limiter - .commit_task_usage(data_loaded_size as i64) + .commit_task_usage(TaskType::Select, data_loaded_size as i64) .await; Ok((schema, records)) } Err(e) => { - self.process_rate_limiter.commit_task_usage(0).await; + self.process_rate_limiter + .commit_task_usage(TaskType::Select, 0) + .await; Err(e) } } diff --git a/rust/cubestore/cubestore/src/cluster/rate_limiter.rs b/rust/cubestore/cubestore/src/cluster/rate_limiter.rs index 3e57f8402d40d..297817d096950 100644 --- a/rust/cubestore/cubestore/src/cluster/rate_limiter.rs +++ b/rust/cubestore/cubestore/src/cluster/rate_limiter.rs @@ -1,416 +1,73 @@ use crate::config::injection::DIService; -use crate::util::WorkerLoop; use crate::CubeError; use async_trait::async_trait; -use futures_timer::Delay; -use std::collections::VecDeque; +use std::cmp::PartialEq; +use std::hash::Hash; use std::sync::Arc; -use std::time::{Duration, SystemTime}; -use tokio::sync::{Notify, RwLock}; -use tokio_util::sync::CancellationToken; +use std::time::Duration; + +#[derive(Eq, PartialEq, Hash)] +pub enum TaskType { + Select, + Job, +} #[async_trait] pub trait ProcessRateLimiter: DIService + Send + Sync { - async fn commit_task_usage(&self, size: i64); + async fn commit_task_usage(&self, task_type: TaskType, size: i64); - async fn current_budget(&self) -> i64; + async fn current_budget(&self, task_type: TaskType) -> Option; - async fn current_budget_f64(&self) -> f64; + async fn current_budget_f64(&self, task_type: TaskType) -> Option; - async fn wait_for_allow(&self, timeout: Option) -> Result<(), CubeError>; + async fn wait_for_allow( + &self, + task_type: TaskType, + timeout: Option, + ) -> Result<(), CubeError>; async fn wait_processing_loop(self: Arc); - async fn pending_size(&self) -> usize; + async fn pending_size(&self, task_type: TaskType) -> Option; fn stop_processing_loops(&self); } -crate::di_service!(ProcessRateLimiterImpl, [ProcessRateLimiter]); - -const MS_MUL: i64 = 1000; -const MAX_PENDING_ITEMS: usize = 100000; - -struct PendingItem { - notify: Notify, - timeout_at: SystemTime, -} - -impl PendingItem { - pub fn new(timeout: Duration) -> Arc { - Arc::new(Self { - notify: Notify::new(), - timeout_at: SystemTime::now() + timeout, - }) - } - - pub fn notify(&self) { - self.notify.notify_one() - } - - pub async fn wait(&self) { - self.notify.notified().await - } - - pub fn is_timeout(&self) -> bool { - self.timeout_at <= SystemTime::now() - } -} - -struct Budget { - rate: i64, - burst: i64, - deposit: i64, - value: i64, - last_refill: SystemTime, - pending: VecDeque>, -} - -impl Budget { - pub fn new(rate: i64, burst: i64, deposit: i64) -> Self { - Self { - rate, - burst, - deposit, - value: burst, - last_refill: SystemTime::now(), - pending: VecDeque::with_capacity(10000), - } - } - - pub fn value(&self) -> i64 { - self.value - } - - pub fn commit_task_usage(&mut self, value: i64) { - self.value -= value - self.deposit; - } - - pub fn refill(&mut self) { - let now = SystemTime::now(); - let res = now.duration_since(self.last_refill); - let duration = if let Ok(dur) = res { - dur.as_millis() - } else { - 0 - }; - if duration > 0 { - self.value = (self.value + duration as i64 * self.rate).min(self.burst); - self.last_refill = now; - } - self.process_pending(); - } - - pub fn try_allow( - &mut self, - timeout: Option, - ) -> Result>, CubeError> { - self.refill(); - if self.pending.is_empty() && self.value >= self.deposit { - self.value -= self.deposit; - Ok(None) - } else if let Some(timeout) = timeout { - if self.pending_size() >= MAX_PENDING_ITEMS { - Err(CubeError::internal("Too many pending items".to_string())) - } else { - let pending_item = PendingItem::new(timeout); - self.pending.push_back(pending_item.clone()); - Ok(Some(pending_item)) - } - } else { - Err(CubeError::internal( - "Process can not be started due to rate limit".to_string(), - )) - } - } - - fn process_pending(&mut self) { - if self.pending.is_empty() { - return; - } - - loop { - if let Some(item) = self.pending.front() { - if item.is_timeout() { - item.notify(); - self.pending.pop_front(); - } else if self.value >= self.deposit { - self.value -= self.deposit; - item.notify(); - self.pending.pop_front(); - } else { - break; - } - } else { - break; - } - } - } - - pub fn pending_size(&self) -> usize { - self.pending.len() - } -} - -pub struct ProcessRateLimiterImpl { - budget: RwLock, - cancel_token: CancellationToken, - pending_process_loop: WorkerLoop, -} +crate::di_service!(BasicProcessRateLimiter, [ProcessRateLimiter]); -impl ProcessRateLimiterImpl { - /// Crates new limitter for rate of data processing - /// per_second - the amount of available for processing data renewable per second - /// burst - the maximum amount of available for processing data - /// deposit_size - the fixed amount substracted form available stock at start of task processing - pub fn new(per_second: i64, burst: i64, deposit_size: i64) -> Arc { - Arc::new(Self { - budget: RwLock::new(Budget::new( - per_second, - burst * MS_MUL, - deposit_size * MS_MUL, - )), - cancel_token: CancellationToken::new(), - pending_process_loop: WorkerLoop::new("RateLimiterPendingProcessing"), - }) - } +pub struct BasicProcessRateLimiter; - async fn refill_budget(&self) { - self.budget.write().await.refill(); +impl BasicProcessRateLimiter { + pub fn new() -> Arc { + Arc::new(Self {}) } } #[async_trait] -impl ProcessRateLimiter for ProcessRateLimiterImpl { - async fn commit_task_usage(&self, size: i64) { - self.budget.write().await.commit_task_usage(size * MS_MUL); - } - - async fn current_budget(&self) -> i64 { - self.budget.read().await.value() / MS_MUL - } +impl ProcessRateLimiter for BasicProcessRateLimiter { + async fn commit_task_usage(&self, _task_type: TaskType, _size: i64) {} - async fn current_budget_f64(&self) -> f64 { - self.budget.read().await.value() as f64 / MS_MUL as f64 + async fn current_budget(&self, _task_type: TaskType) -> Option { + None } - async fn wait_for_allow(&self, timeout: Option) -> Result<(), CubeError> { - let result = self.budget.write().await.try_allow(timeout); - match result { - Ok(None) => Ok(()), - Ok(Some(pending)) => { - let timeout = if let Some(t) = timeout { - t - } else { - Duration::from_millis(0) - }; - tokio::select! { - _ = self.cancel_token.cancelled() => { - Ok(()) - } - _ = pending.wait() => { - if pending.is_timeout() { - Err(CubeError::internal("Process can not be started due aaa to rate limit".to_string())) - } else { - Ok(()) - } - } - _ = Delay::new(timeout) => { - Err(CubeError::internal("Process can not be started due !!! to rate limit".to_string())) - } - } - } - Err(e) => Err(e), - } + async fn current_budget_f64(&self, _task_type: TaskType) -> Option { + None } - async fn wait_processing_loop(self: Arc) { - let scheduler = self.clone(); - scheduler - .pending_process_loop - .process( - scheduler.clone(), - async move |_| Ok(Delay::new(Duration::from_millis(10)).await), - async move |s, _| { - s.refill_budget().await; - Ok(()) - }, - ) - .await; + async fn wait_for_allow( + &self, + _task_type: TaskType, + _timeout: Option, + ) -> Result<(), CubeError> { + Ok(()) } - fn stop_processing_loops(&self) { - self.cancel_token.cancel(); - self.pending_process_loop.stop(); - } + async fn wait_processing_loop(self: Arc) {} - async fn pending_size(&self) -> usize { - self.budget.read().await.pending_size() + async fn pending_size(&self, _task_type: TaskType) -> Option { + None } -} - -#[cfg(test)] -mod tests { - use super::*; - use datafusion::cube_ext; - use futures_util::future::join_all; - use tokio::time::sleep; - - #[tokio::test] - async fn rate_limiter_without_refill_test() { - let rate_limiter = ProcessRateLimiterImpl::new(0, 100, 10); - let r = rate_limiter.wait_for_allow(None).await; - assert!(r.is_ok()); - assert_eq!(rate_limiter.current_budget().await, 90); - rate_limiter.commit_task_usage(50).await; - assert_eq!(rate_limiter.current_budget().await, 50); - let r = rate_limiter.wait_for_allow(None).await; - assert!(r.is_ok()); - assert_eq!(rate_limiter.current_budget().await, 40); - rate_limiter.commit_task_usage(45).await; - assert_eq!(rate_limiter.current_budget().await, 5); - - assert!(rate_limiter.wait_for_allow(None).await.is_err()); - - rate_limiter.commit_task_usage(20).await; - assert_eq!(rate_limiter.current_budget().await, -5); - - assert!(rate_limiter.wait_for_allow(None).await.is_err()); - } - - #[tokio::test] - async fn rate_limiter_base_refill_test() { - let rate_limiter = ProcessRateLimiterImpl::new(10, 10, 0); - rate_limiter.commit_task_usage(3).await; - assert_eq!(rate_limiter.current_budget().await, 7); - sleep(Duration::from_millis(200)).await; - let r = rate_limiter.wait_for_allow(None).await; - assert!(r.is_ok()); - assert_eq!(rate_limiter.current_budget().await, 9); - sleep(Duration::from_millis(300)).await; - let r = rate_limiter.wait_for_allow(None).await; - assert!(r.is_ok()); - assert_eq!(rate_limiter.current_budget().await, 10); - - rate_limiter.commit_task_usage(12).await; - let r = rate_limiter.wait_for_allow(None).await; - assert!(r.is_err()); - sleep(Duration::from_millis(200)).await; - let r = rate_limiter.wait_for_allow(None).await; - assert!(r.is_ok()); - } - #[tokio::test] - async fn rate_limiter_pending_test() { - let rate_limiter = ProcessRateLimiterImpl::new(10, 10, 2); - let rl = rate_limiter.clone(); - let proc = cube_ext::spawn(async move { rl.wait_processing_loop().await }); - let mut futures = Vec::new(); - for _ in 0..10 { - let now = SystemTime::now(); - let limiter_to_move = rate_limiter.clone(); - futures.push(cube_ext::spawn(async move { - let res = limiter_to_move - .wait_for_allow(Some(Duration::from_millis(1100))) - .await; - match res { - Ok(_) => Some(now.elapsed().unwrap().as_millis() / 100), - Err(_) => None, - } - })); - //Delay::new(Duration::from_millis(5)).await; - } - let r = join_all(futures) - .await - .into_iter() - .collect::, _>>() - .unwrap(); - assert_eq!( - r, - vec![ - Some(0), - Some(0), - Some(0), - Some(0), - Some(0), - Some(2), - Some(4), - Some(6), - Some(8), - Some(10) - ] - ); - assert_eq!(rate_limiter.pending_size().await, 0); - - Delay::new(Duration::from_millis(1000)).await; - - let mut futures = Vec::new(); - for _ in 0..10 { - let now = SystemTime::now(); - let limiter_to_move = rate_limiter.clone(); - futures.push(cube_ext::spawn(async move { - let res = limiter_to_move - .wait_for_allow(Some(Duration::from_millis(500))) - .await; - match res { - Ok(_) => Some(now.elapsed().unwrap().as_millis() / 100), - Err(_) => None, - } - })); - //Delay::new(Duration::from_millis(5)).await; - } - let r = join_all(futures) - .await - .into_iter() - .collect::, _>>() - .unwrap(); - assert_eq!( - r, - vec![ - Some(0), - Some(0), - Some(0), - Some(0), - Some(0), - Some(2), - Some(4), - None, - None, - None - ] - ); - - Delay::new(Duration::from_millis(1050)).await; - assert_eq!(rate_limiter.current_budget().await, 10); - rate_limiter.commit_task_usage(12).await; - assert_eq!(rate_limiter.current_budget().await, 0); - - let mut futures = Vec::new(); - for _ in 0..2 { - let now = SystemTime::now(); - let limiter_to_move = rate_limiter.clone(); - futures.push(cube_ext::spawn(async move { - let res = limiter_to_move - .wait_for_allow(Some(Duration::from_millis(100))) - .await; - match res { - Ok(_) => Some(now.elapsed().unwrap().as_millis() / 100), - Err(_) => None, - } - })); - Delay::new(Duration::from_millis(300)).await; - } - let r = join_all(futures) - .await - .into_iter() - .collect::, _>>() - .unwrap(); - assert_eq!(r, vec![None, Some(0)]); - - Delay::new(Duration::from_millis(15)).await; - assert_eq!(rate_limiter.pending_size().await, 0); - - rate_limiter.stop_processing_loops(); - proc.await.unwrap(); - } + fn stop_processing_loops(&self) {} } diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index ca815af2c7ac1..48ea5bc13cf83 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -1,11 +1,11 @@ -#![allow(deprecated)] // 'vtable' and 'TraitObject' are deprecated. +#![allow(deprecated)] // 'vtable' and 'TraitObject' are deprecated.confi pub mod injection; pub mod processing_loop; use crate::cachestore::{ CacheStore, CacheStoreSchedulerImpl, ClusterCacheStoreClient, LazyRocksCacheStore, }; -use crate::cluster::rate_limiter::{ProcessRateLimiter, ProcessRateLimiterImpl}; +use crate::cluster::rate_limiter::{BasicProcessRateLimiter, ProcessRateLimiter}; use crate::cluster::transport::{ ClusterTransport, ClusterTransportImpl, MetaStoreTransport, MetaStoreTransportImpl, }; @@ -1758,17 +1758,7 @@ impl Config { self.injector .register_typed::(async move |_| { - ProcessRateLimiterImpl::new( - env_parse( - "CUBESTORE_DATA_PROCESSING_RATE_LIMIT", - 1 * 1024 * 1024 * 1024, - ), - env_parse( - "CUBESTORE_DATA_PROCESSING_RATE_BURST", - 50 * 1024 * 1024 * 1024, - ), - env_parse("CUBESTORE_DATA_PROCESSING_RATE_DEPOSIT", 500 * 1024), - ) + BasicProcessRateLimiter::new() }) .await;