Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add rust client to log service #1811

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/worker/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
&[
"../../idl/chromadb/proto/chroma.proto",
"../../idl/chromadb/proto/coordinator.proto",
"../../idl/chromadb/proto/logservice.proto",
],
&["../../idl/"],
)?;
Expand Down
18 changes: 17 additions & 1 deletion rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub(crate) struct WorkerConfig {
pub(crate) sysdb: crate::sysdb::config::SysDbConfig,
pub(crate) segment_manager: crate::segment::config::SegmentManagerConfig,
pub(crate) storage: crate::storage::config::StorageConfig,
pub(crate) log: crate::log::config::LogConfig,
}

/// # Description
Expand Down Expand Up @@ -160,6 +161,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log:
Grpc:
host: "localhost"
port: 50052
"#,
);
let config = RootConfig::load();
Expand Down Expand Up @@ -204,6 +209,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log:
Grpc:
host: "localhost"
port: 50052

"#,
);
Expand Down Expand Up @@ -264,7 +273,10 @@ mod tests {
storage:
S3:
bucket: "chroma"

log:
Grpc:
host: "localhost"
port: 50052
"#,
);
let config = RootConfig::load();
Expand Down Expand Up @@ -305,6 +317,10 @@ mod tests {
storage:
S3:
bucket: "chroma"
log:
Grpc:
host: "localhost"
port: 50052
"#,
);
let config = RootConfig::load();
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod config;
mod errors;
mod index;
mod ingest;
mod log;
mod memberlist;
mod segment;
mod server;
Expand Down
12 changes: 12 additions & 0 deletions rust/worker/src/log/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use serde::Deserialize;

#[derive(Deserialize)]
pub(crate) struct GrpcLogConfig {
pub(crate) host: String,
pub(crate) port: u16,
}

#[derive(Deserialize)]
pub(crate) enum LogConfig {
Grpc(GrpcLogConfig),
}
204 changes: 204 additions & 0 deletions rust/worker/src/log/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
use crate::chroma_proto;
use crate::chroma_proto::log_service_client::LogServiceClient;
use crate::config::Configurable;
use crate::config::WorkerConfig;
use crate::errors::ChromaError;
use crate::errors::ErrorCodes;
use crate::log::config::LogConfig;
use crate::types::EmbeddingRecord;
use crate::types::EmbeddingRecordConversionError;
use async_trait::async_trait;
use thiserror::Error;

// CollectionInfo is a struct that contains information about a collection for the
// compacting process. It contains information about the
pub(crate) struct CollectionInfo {
pub(crate) collection_id: String,
pub(crate) first_log_id: i64,
Ishiihara marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) first_log_id_ts: i64,
}

#[async_trait]
pub(crate) trait Log: Send + Sync + LogClone {
async fn read(
&mut self,
collection_id: String,
offset: i64,
batch_size: i32,
) -> Result<Vec<Box<EmbeddingRecord>>, PullLogsError>;

async fn get_collections_with_new_data(
&mut self,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError>;
}

pub(crate) trait LogClone {
fn clone_box(&self) -> Box<dyn Log>;
}

impl<T> LogClone for T
where
T: 'static + Log + Clone,
{
fn clone_box(&self) -> Box<dyn Log> {
Box::new(self.clone())
}
}

impl Clone for Box<dyn Log> {
fn clone(&self) -> Box<dyn Log> {
self.clone_box()
}
}

#[derive(Clone)]
pub(crate) struct GrpcLog {
client: LogServiceClient<tonic::transport::Channel>,
}

impl GrpcLog {
pub(crate) fn new(client: LogServiceClient<tonic::transport::Channel>) -> Self {
Self { client }
}
}

#[derive(Error, Debug)]
pub(crate) enum GrpcLogError {
#[error("Failed to connect to log service")]
FailedToConnect(#[from] tonic::transport::Error),
}

impl ChromaError for GrpcLogError {
fn code(&self) -> ErrorCodes {
match self {
GrpcLogError::FailedToConnect(_) => ErrorCodes::Internal,
}
}
}

#[async_trait]
impl Configurable for GrpcLog {
async fn try_from_config(worker_config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> {
Ishiihara marked this conversation as resolved.
Show resolved Hide resolved
match &worker_config.log {
LogConfig::Grpc(my_config) => {
let host = &my_config.host;
let port = &my_config.port;
// TODO: switch to logging when logging is implemented
println!("Connecting to log service at {}:{}", host, port);
Ishiihara marked this conversation as resolved.
Show resolved Hide resolved
let connection_string = format!("http://{}:{}", host, port);
let client = LogServiceClient::connect(connection_string).await;
match client {
Ok(client) => {
return Ok(GrpcLog::new(client));
}
Err(e) => {
return Err(Box::new(GrpcLogError::FailedToConnect(e)));
}
}
}
}
}
}

#[async_trait]
impl Log for GrpcLog {
async fn read(
&mut self,
collection_id: String,
offset: i64,
batch_size: i32,
) -> Result<Vec<Box<EmbeddingRecord>>, PullLogsError> {
let request = self.client.pull_logs(chroma_proto::PullLogsRequest {
Ishiihara marked this conversation as resolved.
Show resolved Hide resolved
collection_id: collection_id,
start_from_id: offset,
batch_size: batch_size,
});
let response = request.await;
match response {
Ok(response) => {
let logs = response.into_inner().records;
let mut result = Vec::new();
for log in logs {
let embedding_record: Result<EmbeddingRecord, EmbeddingRecordConversionError> =
log.try_into();
match embedding_record {
Ok(embedding_record) => {
result.push(Box::new(embedding_record));
Ishiihara marked this conversation as resolved.
Show resolved Hide resolved
}
Err(err) => {
return Err(PullLogsError::ConversionError(err));
}
}
}
Ok(result)
}
Err(e) => {
// TODO: switch to logging when logging is implemented
println!("Failed to pull logs: {}", e);
Err(PullLogsError::FailedToPullLogs(e))
}
}
}

async fn get_collections_with_new_data(
&mut self,
) -> Result<Vec<CollectionInfo>, GetCollectionsWithNewDataError> {
let request = self.client.get_all_collection_info_to_compact(
chroma_proto::GetAllCollectionInfoToCompactRequest {},
);
let response = request.await;

match response {
Ok(response) => {
let collections = response.into_inner().all_collection_info;
let mut result = Vec::new();
for collection in collections {
result.push(CollectionInfo {
collection_id: collection.collection_id,
first_log_id: collection.first_log_id,
first_log_id_ts: collection.first_log_id_ts,
});
}
Ok(result)
}
Err(e) => {
// TODO: switch to logging when logging is implemented
println!("Failed to get collections: {}", e);
Err(GetCollectionsWithNewDataError::FailedGetCollectionsWithNewData(e))
}
}
}
}

#[derive(Error, Debug)]
pub(crate) enum PullLogsError {
#[error("Failed to fetch")]
FailedToPullLogs(#[from] tonic::Status),
#[error("Failed to convert proto segment")]
ConversionError(#[from] EmbeddingRecordConversionError),
}

impl ChromaError for PullLogsError {
fn code(&self) -> ErrorCodes {
match self {
PullLogsError::FailedToPullLogs(_) => ErrorCodes::Internal,
PullLogsError::ConversionError(_) => ErrorCodes::Internal,
}
}
}

#[derive(Error, Debug)]
pub(crate) enum GetCollectionsWithNewDataError {
#[error("Failed to fetch")]
FailedGetCollectionsWithNewData(#[from] tonic::Status),
}

impl ChromaError for GetCollectionsWithNewDataError {
fn code(&self) -> ErrorCodes {
match self {
GetCollectionsWithNewDataError::FailedGetCollectionsWithNewData(_) => {
ErrorCodes::Internal
}
}
}
}
2 changes: 2 additions & 0 deletions rust/worker/src/log/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod config;
pub(crate) mod log;
Loading
Loading