diff --git a/rust/worker/src/execution/operators/flush_s3.rs b/rust/worker/src/execution/operators/flush_s3.rs new file mode 100644 index 0000000000..99942203b1 --- /dev/null +++ b/rust/worker/src/execution/operators/flush_s3.rs @@ -0,0 +1,22 @@ +use crate::execution::operator::Operator; +use async_trait::async_trait; + +#[derive(Debug)] +pub struct FlushS3Operator {} + +#[derive(Debug)] +pub struct FlushS3Input {} + +#[derive(Debug)] +pub struct FlushS3Output {} + +pub type WriteSegmentsResult = Result; + +#[async_trait] +impl Operator for FlushS3Operator { + type Error = (); + + async fn run(&self, input: &FlushS3Input) -> WriteSegmentsResult { + Ok(FlushS3Output {}) + } +} diff --git a/rust/worker/src/execution/operators/mod.rs b/rust/worker/src/execution/operators/mod.rs index 312ae6125f..87c0a602d3 100644 --- a/rust/worker/src/execution/operators/mod.rs +++ b/rust/worker/src/execution/operators/mod.rs @@ -1,5 +1,7 @@ pub(super) mod brute_force_knn; +pub(super) mod flush_s3; pub(super) mod flush_sysdb; pub(super) mod normalize_vectors; pub(super) mod partition; pub(super) mod pull_log; +pub(super) mod write_segments; diff --git a/rust/worker/src/execution/operators/write_segments.rs b/rust/worker/src/execution/operators/write_segments.rs new file mode 100644 index 0000000000..dbd8b69ae5 --- /dev/null +++ b/rust/worker/src/execution/operators/write_segments.rs @@ -0,0 +1,22 @@ +use crate::execution::operator::Operator; +use async_trait::async_trait; + +#[derive(Debug)] +pub struct WriteSegmentsOperator {} + +#[derive(Debug)] +pub struct WriteSegmentsInput {} + +#[derive(Debug)] +pub struct WriteSegmentsOutput {} + +pub type WriteSegmentsResult = Result; + +#[async_trait] +impl Operator for WriteSegmentsOperator { + type Error = (); + + async fn run(&self, input: &WriteSegmentsInput) -> WriteSegmentsResult { + Ok(WriteSegmentsOutput {}) + } +} diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index f7ac7155ad..193c0fbe0a 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -11,6 +11,7 @@ use crate::execution::operators::partition::PartitionResult; use crate::execution::operators::pull_log::PullLogsInput; use crate::execution::operators::pull_log::PullLogsOperator; use crate::execution::operators::pull_log::PullLogsResult; +use crate::execution::operators::write_segments::WriteSegmentsResult; use crate::log::log::Log; use crate::sysdb::sysdb::SysDb; use crate::system::Component; @@ -18,6 +19,7 @@ use crate::system::Handler; use crate::system::Receiver; use crate::system::System; use crate::types::SegmentFlushInfo; +use arrow::compute::kernels::partition; use async_trait::async_trait; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -45,6 +47,7 @@ enum ExecutionState { Partition, Write, Flush, + Register, Finished, } @@ -61,6 +64,8 @@ pub struct CompactOrchestrator { sysdb: Box, // Dispatcher dispatcher: Box>, + // number of write segments tasks + num_write_tasks: i32, // Result Channel result_channel: Option>>>, @@ -95,6 +100,7 @@ impl CompactOrchestrator { log, sysdb, dispatcher, + num_write_tasks: 0, result_channel, } } @@ -141,21 +147,27 @@ impl CompactOrchestrator { } } - async fn write(&mut self, records: Vec) { + async fn write(&mut self, partitions: Vec) { self.state = ExecutionState::Write; - for record in records { + self.num_write_tasks = partitions.len() as i32; + for partition in partitions { // TODO: implement write } } + async fn flush_s3(&mut self, self_address: Box>) { + self.state = ExecutionState::Flush; + // TODO: implement flush to s3 + } + async fn flush_sysdb( &mut self, log_position: i64, segment_flush_info: Vec, self_address: Box>, ) { - self.state = ExecutionState::Flush; + self.state = ExecutionState::Register; let operator = FlushSysDbOperator::new(); let input = FlushSysDbInput::new( self.compaction_job.tenant_id.clone(), @@ -264,3 +276,25 @@ impl Handler for CompactOrchestrator { let _ = result_channel.send(Ok(response)); } } + +#[async_trait] +impl Handler for CompactOrchestrator { + async fn handle( + &mut self, + message: WriteSegmentsResult, + _ctx: &crate::system::ComponentContext, + ) { + match message { + Ok(result) => { + // Log an error + self.num_write_tasks -= 1; + } + Err(e) => { + // Log an error + } + } + if self.num_write_tasks == 0 { + self.flush_s3(_ctx.sender.as_receiver()).await; + } + } +}