Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 102 additions & 14 deletions rust/cubestore/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ pub mod transport;
#[cfg(not(target_os = "windows"))]
pub mod worker_pool;

pub mod rate_limiter;

#[cfg(not(target_os = "windows"))]
use crate::cluster::worker_pool::{worker_main, MessageProcessor, WorkerPool};

use crate::ack_error;
use crate::cluster::message::NetworkMessage;
use crate::cluster::rate_limiter::{ProcessRateLimiter, TaskType};
use crate::cluster::transport::{ClusterTransport, MetaStoreTransport, WorkerConnection};
use crate::config::injection::{DIService, Injector};
use crate::config::{is_router, WorkerServices};
Expand All @@ -28,6 +31,7 @@ use crate::metastore::{
};
use crate::queryplanner::query_executor::{QueryExecutor, SerializedRecordBatchStream};
use crate::queryplanner::serialized_plan::SerializedPlan;
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
use crate::remotefs::RemoteFs;
use crate::store::compaction::CompactionService;
use crate::store::ChunkDataStore;
Expand Down Expand Up @@ -183,7 +187,7 @@ pub struct ClusterImpl {
Arc<
WorkerPool<
WorkerMessage,
(SchemaRef, Vec<SerializedRecordBatchStream>),
(SchemaRef, Vec<SerializedRecordBatchStream>, usize),
WorkerProcessor,
>,
>,
Expand All @@ -195,6 +199,7 @@ pub struct ClusterImpl {
close_worker_socket_tx: watch::Sender<bool>,
close_worker_socket_rx: RwLock<watch::Receiver<bool>>,
tracing_helper: Arc<dyn TracingHelper>,
process_rate_limiter: Arc<dyn ProcessRateLimiter>,
}

crate::di_service!(ClusterImpl, [Cluster]);
Expand All @@ -213,13 +218,13 @@ pub struct WorkerProcessor;

#[cfg(not(target_os = "windows"))]
#[async_trait]
impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream>)>
impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream>, usize)>
for WorkerProcessor
{
async fn process(
services: &WorkerServices,
args: WorkerMessage,
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>), CubeError> {
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>, usize), CubeError> {
match args {
WorkerMessage::Select(
plan_node,
Expand Down Expand Up @@ -256,9 +261,9 @@ impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream
"Running select in worker completed ({:?})",
time.elapsed().unwrap()
);
let (schema, records) = res?;
let (schema, records, data_loaded_size) = res?;
let records = SerializedRecordBatchStream::write(schema.as_ref(), records)?;
Ok((schema, records))
Ok((schema, records, data_loaded_size))
};
let span = trace_id_and_span_id.map(|(t, s)| {
tracing::info_span!(
Expand All @@ -281,7 +286,11 @@ impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream
#[ctor::ctor]
fn proc_handler() {
crate::util::respawn::register_handler(
worker_main::<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream>), WorkerProcessor>,
worker_main::<
WorkerMessage,
(SchemaRef, Vec<SerializedRecordBatchStream>, usize),
WorkerProcessor,
>,
);
}

Expand All @@ -291,6 +300,7 @@ struct JobRunner {
chunk_store: Arc<dyn ChunkDataStore>,
compaction_service: Arc<dyn CompactionService>,
import_service: Arc<dyn ImportService>,
process_rate_limiter: Arc<dyn ProcessRateLimiter>,
server_name: String,
notify: Arc<Notify>,
stop_token: CancellationToken,
Expand Down Expand Up @@ -903,8 +913,20 @@ impl JobRunner {
if let RowKey::Table(TableId::Partitions, partition_id) = job.row_reference() {
let compaction_service = self.compaction_service.clone();
let partition_id = *partition_id;
let process_rate_limiter = self.process_rate_limiter.clone();
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
Ok(cube_ext::spawn(async move {
compaction_service.compact(partition_id).await
process_rate_limiter
.wait_for_allow(TaskType::Job, timeout)
.await?; //TODO config, may be same ad orphaned timeout
let data_loaded_size = DataLoadedSize::new();
let res = compaction_service
.compact(partition_id, data_loaded_size.clone())
.await;
process_rate_limiter
.commit_task_usage(TaskType::Job, data_loaded_size.get() as i64)
.await;
res
}))
} else {
Self::fail_job_row_key(job)
Expand Down Expand Up @@ -984,11 +1006,30 @@ impl JobRunner {
let table_id = *table_id;
let import_service = self.import_service.clone();
let location = location.to_string();
let process_rate_limiter = self.process_rate_limiter.clone();
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
Ok(cube_ext::spawn(async move {
import_service
let is_streaming = Table::is_stream_location(&location);
let data_loaded_size = if is_streaming {
None
} else {
Some(DataLoadedSize::new())
};
if !is_streaming {
process_rate_limiter
.wait_for_allow(TaskType::Job, timeout)
.await?; //TODO config, may be same ad orphaned timeout
}
let res = import_service
.clone()
.import_table_part(table_id, &location)
.await
.import_table_part(table_id, &location, data_loaded_size.clone())
.await;
if let Some(data_loaded) = &data_loaded_size {
process_rate_limiter
.commit_task_usage(TaskType::Job, data_loaded.get() as i64)
.await;
}
res
}))
} else {
Self::fail_job_row_key(job)
Expand All @@ -998,8 +1039,20 @@ impl JobRunner {
if let RowKey::Table(TableId::Chunks, chunk_id) = job.row_reference() {
let chunk_store = self.chunk_store.clone();
let chunk_id = *chunk_id;
let process_rate_limiter = self.process_rate_limiter.clone();
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
Ok(cube_ext::spawn(async move {
chunk_store.repartition_chunk(chunk_id).await
process_rate_limiter
.wait_for_allow(TaskType::Job, timeout)
.await?; //TODO config, may be same ad orphaned timeout
let data_loaded_size = DataLoadedSize::new();
let res = chunk_store
.repartition_chunk(chunk_id, data_loaded_size.clone())
.await;
process_rate_limiter
.commit_task_usage(TaskType::Job, data_loaded_size.get() as i64)
.await;
res
}))
} else {
Self::fail_job_row_key(job)
Expand Down Expand Up @@ -1030,6 +1083,7 @@ impl ClusterImpl {
meta_store_sender: Sender<MetaStoreEvent>,
cluster_transport: Arc<dyn ClusterTransport>,
tracing_helper: Arc<dyn TracingHelper>,
process_rate_limiter: Arc<dyn ProcessRateLimiter>,
) -> Arc<ClusterImpl> {
let (close_worker_socket_tx, close_worker_socket_rx) = watch::channel(false);
Arc::new_cyclic(|this| ClusterImpl {
Expand All @@ -1052,6 +1106,7 @@ impl ClusterImpl {
close_worker_socket_tx,
close_worker_socket_rx: RwLock::new(close_worker_socket_rx),
tracing_helper,
process_rate_limiter,
})
}

Expand Down Expand Up @@ -1100,6 +1155,7 @@ impl ClusterImpl {
chunk_store: self.injector.upgrade().unwrap().get_service_typed().await,
compaction_service: self.injector.upgrade().unwrap().get_service_typed().await,
import_service: self.injector.upgrade().unwrap().get_service_typed().await,
process_rate_limiter: self.process_rate_limiter.clone(),
server_name: self.server_name.clone(),
notify: if is_long_running {
self.long_running_job_notify.clone()
Expand All @@ -1113,6 +1169,10 @@ impl ClusterImpl {
job_runner.processing_loop().await;
}));
}
let process_rate_limiter = self.process_rate_limiter.clone();
futures.push(cube_ext::spawn(async move {
process_rate_limiter.wait_processing_loop().await;
}));

let stop_token = self.stop_token.clone();
let long_running_job_notify = self.long_running_job_notify.clone();
Expand Down Expand Up @@ -1147,6 +1207,8 @@ impl ClusterImpl {
pool.stop_workers().await?;
}

self.process_rate_limiter.stop_processing_loops();

self.close_worker_socket_tx.send(true)?;
Ok(())
}
Expand Down Expand Up @@ -1313,11 +1375,37 @@ impl ClusterImpl {
.await
}

#[instrument(level = "trace", skip(self, plan_node))]
async fn run_local_select_worker(
&self,
plan_node: SerializedPlan,
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>), CubeError> {
self.process_rate_limiter
.wait_for_allow(
TaskType::Job,
Some(Duration::from_secs(self.config_obj.query_timeout())),
)
.await?;
let res = self.run_local_select_worker_impl(plan_node).await;
match res {
Ok((schema, records, data_loaded_size)) => {
self.process_rate_limiter
.commit_task_usage(TaskType::Select, data_loaded_size as i64)
.await;
Ok((schema, records))
}
Err(e) => {
self.process_rate_limiter
.commit_task_usage(TaskType::Select, 0)
.await;
Err(e)
}
}
}
#[instrument(level = "trace", skip(self, plan_node))]
async fn run_local_select_worker_impl(
&self,
plan_node: SerializedPlan,
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>, usize), CubeError> {
let start = SystemTime::now();
debug!("Running select");
let remote_to_local_names = self.warmup_select_worker_files(&plan_node).await?;
Expand Down Expand Up @@ -1411,7 +1499,7 @@ impl ClusterImpl {

if res.is_none() {
// TODO optimize for no double conversion
let (schema, records) = self
let (schema, records, data_loaded_size) = self
.query_executor
.execute_worker_plan(
plan_node.clone(),
Expand All @@ -1420,7 +1508,7 @@ impl ClusterImpl {
)
.await?;
let records = SerializedRecordBatchStream::write(schema.as_ref(), records);
res = Some(Ok((schema, records?)))
res = Some(Ok((schema, records?, data_loaded_size)))
}

info!("Running select completed ({:?})", start.elapsed()?);
Expand Down
73 changes: 73 additions & 0 deletions rust/cubestore/cubestore/src/cluster/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use crate::config::injection::DIService;
use crate::CubeError;
use async_trait::async_trait;
use std::cmp::PartialEq;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;

#[derive(Eq, PartialEq, Hash)]
pub enum TaskType {
Select,
Job,
}

#[async_trait]
pub trait ProcessRateLimiter: DIService + Send + Sync {
async fn commit_task_usage(&self, task_type: TaskType, size: i64);

async fn current_budget(&self, task_type: TaskType) -> Option<i64>;

async fn current_budget_f64(&self, task_type: TaskType) -> Option<f64>;

async fn wait_for_allow(
&self,
task_type: TaskType,
timeout: Option<Duration>,
) -> Result<(), CubeError>;

async fn wait_processing_loop(self: Arc<Self>);

async fn pending_size(&self, task_type: TaskType) -> Option<usize>;

fn stop_processing_loops(&self);
}

crate::di_service!(BasicProcessRateLimiter, [ProcessRateLimiter]);

pub struct BasicProcessRateLimiter;

impl BasicProcessRateLimiter {
pub fn new() -> Arc<Self> {
Arc::new(Self {})
}
}

#[async_trait]
impl ProcessRateLimiter for BasicProcessRateLimiter {
async fn commit_task_usage(&self, _task_type: TaskType, _size: i64) {}

async fn current_budget(&self, _task_type: TaskType) -> Option<i64> {
None
}

async fn current_budget_f64(&self, _task_type: TaskType) -> Option<f64> {
None
}

async fn wait_for_allow(
&self,
_task_type: TaskType,
_timeout: Option<Duration>,
) -> Result<(), CubeError> {
Ok(())
}

async fn wait_processing_loop(self: Arc<Self>) {}

async fn pending_size(&self, _task_type: TaskType) -> Option<usize> {
None
}

fn stop_processing_loops(&self) {}
}
10 changes: 9 additions & 1 deletion rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#![allow(deprecated)] // 'vtable' and 'TraitObject' are deprecated.
#![allow(deprecated)] // 'vtable' and 'TraitObject' are deprecated.confi
pub mod injection;
pub mod processing_loop;

use crate::cachestore::{
CacheStore, CacheStoreSchedulerImpl, ClusterCacheStoreClient, LazyRocksCacheStore,
};
use crate::cluster::rate_limiter::{BasicProcessRateLimiter, ProcessRateLimiter};
use crate::cluster::transport::{
ClusterTransport, ClusterTransportImpl, MetaStoreTransport, MetaStoreTransportImpl,
};
Expand Down Expand Up @@ -1755,6 +1756,12 @@ impl Config {
})
.await;

self.injector
.register_typed::<dyn ProcessRateLimiter, _, _, _>(async move |_| {
BasicProcessRateLimiter::new()
})
.await;

let cluster_meta_store_sender = metastore_event_sender_to_move.clone();

self.injector
Expand All @@ -1774,6 +1781,7 @@ impl Config {
cluster_meta_store_sender,
i.get_service_typed().await,
i.get_service_typed().await,
i.get_service_typed().await,
)
})
.await;
Expand Down
Loading