Skip to content

Commit

Permalink
[ENH] Rust write segments and flush S3 operator skeleton (#2009)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- This PR adds skeleton code for write segments and flush S3 operator to
prepare for integration with Blockfile.
 - New functionality
	 - ...

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Ishiihara committed Apr 12, 2024
1 parent 7e330ba commit 663a02d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 3 deletions.
22 changes: 22 additions & 0 deletions rust/worker/src/execution/operators/flush_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::execution::operator::Operator;
use async_trait::async_trait;

#[derive(Debug)]
pub struct FlushS3Operator {}

#[derive(Debug)]
pub struct FlushS3Input {}

#[derive(Debug)]
pub struct FlushS3Output {}

pub type WriteSegmentsResult = Result<FlushS3Output, ()>;

#[async_trait]
impl Operator<FlushS3Input, FlushS3Output> for FlushS3Operator {
type Error = ();

async fn run(&self, input: &FlushS3Input) -> WriteSegmentsResult {
Ok(FlushS3Output {})
}
}
2 changes: 2 additions & 0 deletions rust/worker/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub(super) mod brute_force_knn;
pub(super) mod flush_s3;
pub(super) mod flush_sysdb;
pub(super) mod normalize_vectors;
pub(super) mod partition;
pub(super) mod pull_log;
pub(super) mod write_segments;
22 changes: 22 additions & 0 deletions rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::execution::operator::Operator;
use async_trait::async_trait;

#[derive(Debug)]
pub struct WriteSegmentsOperator {}

#[derive(Debug)]
pub struct WriteSegmentsInput {}

#[derive(Debug)]
pub struct WriteSegmentsOutput {}

pub type WriteSegmentsResult = Result<WriteSegmentsOutput, ()>;

#[async_trait]
impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator {
type Error = ();

async fn run(&self, input: &WriteSegmentsInput) -> WriteSegmentsResult {
Ok(WriteSegmentsOutput {})
}
}
40 changes: 37 additions & 3 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ use crate::execution::operators::partition::PartitionResult;
use crate::execution::operators::pull_log::PullLogsInput;
use crate::execution::operators::pull_log::PullLogsOperator;
use crate::execution::operators::pull_log::PullLogsResult;
use crate::execution::operators::write_segments::WriteSegmentsResult;
use crate::log::log::Log;
use crate::sysdb::sysdb::SysDb;
use crate::system::Component;
use crate::system::Handler;
use crate::system::Receiver;
use crate::system::System;
use crate::types::SegmentFlushInfo;
use arrow::compute::kernels::partition;
use async_trait::async_trait;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
Expand Down Expand Up @@ -45,6 +47,7 @@ enum ExecutionState {
Partition,
Write,
Flush,
Register,
Finished,
}

Expand All @@ -61,6 +64,8 @@ pub struct CompactOrchestrator {
sysdb: Box<dyn SysDb>,
// Dispatcher
dispatcher: Box<dyn Receiver<TaskMessage>>,
// number of write segments tasks
num_write_tasks: i32,
// Result Channel
result_channel:
Option<tokio::sync::oneshot::Sender<Result<CompactionResponse, Box<dyn ChromaError>>>>,
Expand Down Expand Up @@ -95,6 +100,7 @@ impl CompactOrchestrator {
log,
sysdb,
dispatcher,
num_write_tasks: 0,
result_channel,
}
}
Expand Down Expand Up @@ -141,21 +147,27 @@ impl CompactOrchestrator {
}
}

async fn write(&mut self, records: Vec<DataChunk>) {
async fn write(&mut self, partitions: Vec<DataChunk>) {
self.state = ExecutionState::Write;

for record in records {
self.num_write_tasks = partitions.len() as i32;
for partition in partitions {
// TODO: implement write
}
}

async fn flush_s3(&mut self, self_address: Box<dyn Receiver<WriteSegmentsResult>>) {
self.state = ExecutionState::Flush;
// TODO: implement flush to s3
}

async fn flush_sysdb(
&mut self,
log_position: i64,
segment_flush_info: Vec<SegmentFlushInfo>,
self_address: Box<dyn Receiver<FlushSysDbResult>>,
) {
self.state = ExecutionState::Flush;
self.state = ExecutionState::Register;
let operator = FlushSysDbOperator::new();
let input = FlushSysDbInput::new(
self.compaction_job.tenant_id.clone(),
Expand Down Expand Up @@ -264,3 +276,25 @@ impl Handler<PartitionResult> for CompactOrchestrator {
let _ = result_channel.send(Ok(response));
}
}

#[async_trait]
impl Handler<WriteSegmentsResult> for CompactOrchestrator {
async fn handle(
&mut self,
message: WriteSegmentsResult,
_ctx: &crate::system::ComponentContext<CompactOrchestrator>,
) {
match message {
Ok(result) => {
// Log an error
self.num_write_tasks -= 1;
}
Err(e) => {
// Log an error
}
}
if self.num_write_tasks == 0 {
self.flush_s3(_ctx.sender.as_receiver()).await;
}
}
}

0 comments on commit 663a02d

Please sign in to comment.