Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Rust write segments and flush S3 operator skeleton #2009

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions rust/worker/src/execution/operators/flush_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::execution::operator::Operator;
use async_trait::async_trait;

#[derive(Debug)]
pub struct FlushS3Operator {}

#[derive(Debug)]
pub struct FlushS3Input {}

#[derive(Debug)]
pub struct FlushS3Output {}

pub type WriteSegmentsResult = Result<FlushS3Output, ()>;

#[async_trait]
impl Operator<FlushS3Input, FlushS3Output> for FlushS3Operator {
type Error = ();

async fn run(&self, input: &FlushS3Input) -> WriteSegmentsResult {
Ok(FlushS3Output {})
}
}
2 changes: 2 additions & 0 deletions rust/worker/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub(super) mod brute_force_knn;
pub(super) mod flush_s3;
pub(super) mod flush_sysdb;
pub(super) mod normalize_vectors;
pub(super) mod partition;
pub(super) mod pull_log;
pub(super) mod write_segments;
22 changes: 22 additions & 0 deletions rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::execution::operator::Operator;
use async_trait::async_trait;

#[derive(Debug)]
pub struct WriteSegmentsOperator {}

#[derive(Debug)]
pub struct WriteSegmentsInput {}

#[derive(Debug)]
pub struct WriteSegmentsOutput {}

pub type WriteSegmentsResult = Result<WriteSegmentsOutput, ()>;

#[async_trait]
impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator {
type Error = ();

async fn run(&self, input: &WriteSegmentsInput) -> WriteSegmentsResult {
Ok(WriteSegmentsOutput {})
}
}
40 changes: 37 additions & 3 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ use crate::execution::operators::partition::PartitionResult;
use crate::execution::operators::pull_log::PullLogsInput;
use crate::execution::operators::pull_log::PullLogsOperator;
use crate::execution::operators::pull_log::PullLogsResult;
use crate::execution::operators::write_segments::WriteSegmentsResult;
use crate::log::log::Log;
use crate::sysdb::sysdb::SysDb;
use crate::system::Component;
use crate::system::Handler;
use crate::system::Receiver;
use crate::system::System;
use crate::types::SegmentFlushInfo;
use arrow::compute::kernels::partition;
use async_trait::async_trait;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
Expand Down Expand Up @@ -45,6 +47,7 @@ enum ExecutionState {
Partition,
Write,
Flush,
Register,
Finished,
}

Expand All @@ -61,6 +64,8 @@ pub struct CompactOrchestrator {
sysdb: Box<dyn SysDb>,
// Dispatcher
dispatcher: Box<dyn Receiver<TaskMessage>>,
// number of write segments tasks
num_write_tasks: i32,
// Result Channel
result_channel:
Option<tokio::sync::oneshot::Sender<Result<CompactionResponse, Box<dyn ChromaError>>>>,
Expand Down Expand Up @@ -95,6 +100,7 @@ impl CompactOrchestrator {
log,
sysdb,
dispatcher,
num_write_tasks: 0,
result_channel,
}
}
Expand Down Expand Up @@ -141,21 +147,27 @@ impl CompactOrchestrator {
}
}

async fn write(&mut self, records: Vec<DataChunk>) {
async fn write(&mut self, partitions: Vec<DataChunk>) {
self.state = ExecutionState::Write;

for record in records {
self.num_write_tasks = partitions.len() as i32;
for partition in partitions {
// TODO: implement write
}
}

async fn flush_s3(&mut self, self_address: Box<dyn Receiver<WriteSegmentsResult>>) {
self.state = ExecutionState::Flush;
// TODO: implement flush to s3
}

async fn flush_sysdb(
&mut self,
log_position: i64,
segment_flush_info: Vec<SegmentFlushInfo>,
self_address: Box<dyn Receiver<FlushSysDbResult>>,
) {
self.state = ExecutionState::Flush;
self.state = ExecutionState::Register;
let operator = FlushSysDbOperator::new();
let input = FlushSysDbInput::new(
self.compaction_job.tenant_id.clone(),
Expand Down Expand Up @@ -264,3 +276,25 @@ impl Handler<PartitionResult> for CompactOrchestrator {
let _ = result_channel.send(Ok(response));
}
}

#[async_trait]
impl Handler<WriteSegmentsResult> for CompactOrchestrator {
async fn handle(
&mut self,
message: WriteSegmentsResult,
_ctx: &crate::system::ComponentContext<CompactOrchestrator>,
) {
match message {
Ok(result) => {
// Log an error
self.num_write_tasks -= 1;
}
Err(e) => {
// Log an error
}
}
if self.num_write_tasks == 0 {
self.flush_s3(_ctx.sender.as_receiver()).await;
}
}
}
Loading