Skip to content

Commit

Permalink
[ENH] Add write segments operator
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Apr 12, 2024
1 parent ceed4d8 commit 058d62f
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 1 deletion.
22 changes: 22 additions & 0 deletions rust/worker/src/execution/operators/flush_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::execution::operator::Operator;
use async_trait::async_trait;

#[derive(Debug)]
pub struct FlushS3Operator {}

#[derive(Debug)]
pub struct FlushS3Input {}

#[derive(Debug)]
pub struct FlushS3Output {}

pub type WriteSegmentsResult = Result<FlushS3Output, ()>;

#[async_trait]
impl Operator<FlushS3Input, FlushS3Output> for FlushS3Operator {
type Error = ();

async fn run(&self, input: &FlushS3Input) -> WriteSegmentsResult {
Ok(FlushS3Output {})
}
}
2 changes: 2 additions & 0 deletions rust/worker/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub(super) mod brute_force_knn;
pub(super) mod flush_s3;
pub(super) mod flush_sysdb;
pub(super) mod normalize_vectors;
pub(super) mod partition;
pub(super) mod pull_log;
pub(super) mod write_segments;
22 changes: 22 additions & 0 deletions rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::execution::operator::Operator;
use async_trait::async_trait;

#[derive(Debug)]
pub struct WriteSegmentsOperator {}

#[derive(Debug)]
pub struct WriteSegmentsInput {}

#[derive(Debug)]
pub struct WriteSegmentsOutput {}

pub type WriteSegmentsResult = Result<WriteSegmentsOutput, ()>;

#[async_trait]
impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator {
type Error = ();

async fn run(&self, input: &WriteSegmentsInput) -> WriteSegmentsResult {
Ok(WriteSegmentsOutput {})
}
}
34 changes: 33 additions & 1 deletion rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ enum ExecutionState {
Partition,
Write,
Flush,
Register,
Finished,
}

Expand All @@ -62,6 +63,8 @@ pub struct CompactOrchestrator {
sysdb: Box<dyn SysDb>,
// Dispatcher
dispatcher: Box<dyn Receiver<TaskMessage>>,
// number of write segments tasks
num_write_tasks: i32,
// Result Channel
result_channel:
Option<tokio::sync::oneshot::Sender<Result<CompactionResponse, Box<dyn ChromaError>>>>,
Expand Down Expand Up @@ -96,6 +99,7 @@ impl CompactOrchestrator {
log,
sysdb,
dispatcher,
num_write_tasks: 0,
result_channel,
}
}
Expand Down Expand Up @@ -145,18 +149,24 @@ impl CompactOrchestrator {
async fn write(&mut self, records: Vec<DataChunk>) {
self.state = ExecutionState::Write;

self.num_write_tasks = records.len() as i32;
for record in records {
// TODO: implement write
}
}

async fn flush_s3(&mut self, self_address: Box<dyn Receiver<WriteSegmentsResult>>) {
self.state = ExecutionState::Flush;
// TODO: implement flush to s3
}

async fn flush_sysdb(
&mut self,
log_position: i64,
segment_flush_info: Vec<SegmentFlushInfo>,
self_address: Box<dyn Receiver<FlushSysDbResult>>,
) {
self.state = ExecutionState::Flush;
self.state = ExecutionState::Register;
let operator = FlushSysDbOperator::new();
let input = FlushSysDbInput::new(
self.compaction_job.tenant_id.clone(),
Expand Down Expand Up @@ -265,3 +275,25 @@ impl Handler<PartitionResult> for CompactOrchestrator {
let _ = result_channel.send(Ok(response));
}
}

#[async_trait]
impl Handler<WriteSegmentsResult> for CompactOrchestrator {
async fn handle(
&mut self,
message: WriteSegmentsResult,
_ctx: &crate::system::ComponentContext<CompactOrchestrator>,
) {
match message {
Ok(result) => {
// Log an error
self.num_write_tasks -= 1;
}
Err(e) => {
// Log an error
}
}
if self.num_write_tasks == 0 {
self.flush_s3(_ctx.sender.as_receiver()).await;
}
}
}

0 comments on commit 058d62f

Please sign in to comment.