Skip to content

Commit

Permalink
[ENH] Fetch last compaction time from sysdb
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Apr 10, 2024
1 parent 6fc18be commit 2e03e28
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 28 deletions.
11 changes: 9 additions & 2 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,30 +308,37 @@ mod tests {

let mut sysdb = Box::new(TestSysDb::new());

let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
id: collection_uuid_1,
name: "collection_1".to_string(),
metadata: None,
dimension: Some(1),
tenant: "tenant_1".to_string(),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
version: 0,
};

let tenant_2 = "tenant_2".to_string();
let collection_2 = Collection {
id: collection_uuid_2,
name: "collection_2".to_string(),
metadata: None,
dimension: Some(1),
tenant: "tenant_2".to_string(),
tenant: tenant_2.clone(),
database: "database_2".to_string(),
log_position: 0,
version: 0,
};
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);
let last_compaction_time_2 = 1;
sysdb.add_tenant_last_compaction_time(tenant_2, last_compaction_time_2);

let my_ip = "127.0.0.1".to_string();
let compaction_manager_queue_size = 1000;
let max_concurrent_jobs = 10;
Expand Down
66 changes: 48 additions & 18 deletions rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,26 @@ impl Scheduler {
println!("Collection not found: {:?}", collection_info.collection_id);
continue;
}

// TODO: make querying the last compaction time in batch
let tenant_ids = vec![collection[0].tenant.clone()];
let tenant = self.sysdb.get_last_compaction_time(tenant_ids).await;

let last_compaction_time = match tenant {
Ok(tenant) => tenant[0].last_compaction_time,
Err(e) => {
// TODO: Log error
println!("Error: {:?}", e);
// Ignore this collection id for this compaction iteration
println!("Ignoring collection: {:?}", collection_info.collection_id);
continue;
}
};

collection_records.push(CollectionRecord {
id: collection[0].id.to_string(),
tenant_id: collection[0].tenant.clone(),
// TODO: get the last compaction time from the sysdb
last_compaction_time: 0,
last_compaction_time,
first_record_time: collection_info.first_log_ts,
offset: collection_info.first_log_offset,
});
Expand Down Expand Up @@ -125,16 +140,14 @@ impl Scheduler {
let jobs = self
.policy
.determine(collection_records, self.max_concurrent_jobs as i32);
{
self.job_queue.clear();
self.job_queue.extend(jobs);
}
self.job_queue.clear();
self.job_queue.extend(jobs);
}

pub(crate) async fn schedule(&mut self) {
if self.memberlist.is_none() {
if self.memberlist.is_none() || self.memberlist.as_ref().unwrap().is_empty() {
// TODO: Log error
println!("Memberlist is not set");
println!("Memberlist is not set or empty. Cannot schedule compaction jobs.");
return;
}
let collections = self.get_collections_with_new_data().await;
Expand All @@ -152,6 +165,11 @@ impl Scheduler {
pub(crate) fn set_memberlist(&mut self, memberlist: Memberlist) {
self.memberlist = Some(memberlist);
}

// For testing
pub(crate) fn set_sysdb(&mut self, sysdb: Box<dyn SysDb>) {
self.sysdb = sysdb;
}
}

#[cfg(test)]
Expand Down Expand Up @@ -217,30 +235,35 @@ mod tests {

let mut sysdb = Box::new(TestSysDb::new());

let tenant_1 = "tenant_1".to_string();
let collection_1 = Collection {
id: collection_uuid_1,
name: "collection_1".to_string(),
metadata: None,
dimension: Some(1),
tenant: "tenant_1".to_string(),
tenant: tenant_1.clone(),
database: "database_1".to_string(),
log_position: 0,
version: 0,
};

let tenant_2 = "tenant_2".to_string();
let collection_2 = Collection {
id: collection_uuid_2,
name: "collection_2".to_string(),
metadata: None,
dimension: Some(1),
tenant: "tenant_2".to_string(),
tenant: tenant_2.clone(),
database: "database_2".to_string(),
log_position: 0,
version: 0,
};
sysdb.add_collection(collection_1);
sysdb.add_collection(collection_2);

let last_compaction_time_1 = 2;
sysdb.add_tenant_last_compaction_time(tenant_1, last_compaction_time_1);

let my_ip = "0.0.0.1".to_string();
let scheduler_policy = Box::new(LasCompactionTimeSchedulerPolicy {});
let max_concurrent_jobs = 1000;
Expand All @@ -252,7 +275,7 @@ mod tests {
let mut scheduler = Scheduler::new(
my_ip.clone(),
log,
sysdb,
sysdb.clone(),
scheduler_policy,
max_concurrent_jobs,
assignment_policy,
Expand All @@ -266,14 +289,21 @@ mod tests {
scheduler.set_memberlist(vec![my_ip.clone()]);
scheduler.schedule().await;
let jobs = scheduler.get_jobs();
let jobs = jobs.collect::<Vec<&CompactionJob>>();
// Scheduler ignores collection that failed to fetch last compaction time
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].collection_id, collection_id_1,);

// TODO: 3/9 Tasks may be out of order since we have not yet implemented SysDB Get last compaction time. Use contains instead of equal.
let job_ids = jobs
.map(|t| t.collection_id.clone())
.collect::<Vec<String>>();
assert_eq!(job_ids.len(), 2);
assert!(job_ids.contains(&collection_id_1));
assert!(job_ids.contains(&collection_id_2));
let last_compaction_time_2 = 1;
sysdb.add_tenant_last_compaction_time(tenant_2, last_compaction_time_2);
scheduler.set_sysdb(sysdb.clone());
scheduler.schedule().await;
let jobs = scheduler.get_jobs();
let jobs = jobs.collect::<Vec<&CompactionJob>>();
// Scheduler schedules collections based on last compaction time
assert_eq!(jobs.len(), 2);
assert_eq!(jobs[0].collection_id, collection_id_2,);
assert_eq!(jobs[1].collection_id, collection_id_1,);

// Test filter_collections
let member_1 = "0.0.0.1".to_string();
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/distance/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ impl Into<String> for DistanceFunction {
#[cfg(test)]
mod tests {
use super::*;
use std::convert::TryInto;

#[test]
fn test_distance_function_try_from() {
Expand Down
67 changes: 61 additions & 6 deletions rust/worker/src/sysdb/sysdb.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use super::config::SysDbConfig;
use crate::chroma_proto;
use crate::chroma_proto::sys_db_client;
use crate::config::Configurable;
use crate::types::{CollectionConversionError, SegmentConversionError};
use crate::{
chroma_proto::sys_db_client,
errors::{ChromaError, ErrorCodes},
types::{Collection, Segment, SegmentScope},
};
use crate::errors::ChromaError;
use crate::errors::ErrorCodes;
use crate::types::Collection;
use crate::types::CollectionConversionError;
use crate::types::Segment;
use crate::types::SegmentConversionError;
use crate::types::SegmentScope;
use crate::types::Tenant;
use async_trait::async_trait;
use std::fmt::Debug;
use thiserror::Error;
Expand All @@ -32,6 +35,11 @@ pub(crate) trait SysDb: Send + Sync + SysDbClone + Debug {
scope: Option<SegmentScope>,
collection: Option<Uuid>,
) -> Result<Vec<Segment>, GetSegmentsError>;

async fn get_last_compaction_time(
&mut self,
tanant_ids: Vec<String>,
) -> Result<Vec<Tenant>, GetLastCompactionTimeError>;
}

// We'd like to be able to clone the trait object, so we need to use the
Expand Down Expand Up @@ -213,6 +221,33 @@ impl SysDb for GrpcSysDb {
}
}
}

async fn get_last_compaction_time(
&mut self,
tenant_ids: Vec<String>,
) -> Result<Vec<Tenant>, GetLastCompactionTimeError> {
let res = self
.client
.get_last_compaction_time_for_tenant(
chroma_proto::GetLastCompactionTimeForTenantRequest {
tenant_id: tenant_ids,
},
)
.await;
match res {
Ok(res) => {
let last_compaction_times = res.into_inner().tenant_last_compaction_time;
let last_compaction_times = last_compaction_times
.into_iter()
.map(|proto_tenant| proto_tenant.try_into())
.collect::<Result<Vec<Tenant>, ()>>();
return Ok(last_compaction_times.unwrap());
}
Err(e) => {
return Err(GetLastCompactionTimeError::FailedToGetLastCompactionTime(e));
}
}
}
}

#[derive(Error, Debug)]
Expand All @@ -235,6 +270,8 @@ impl ChromaError for GetCollectionsError {
}

#[derive(Error, Debug)]
// TODO: This should use our sysdb errors from the proto definition
// We will have to do an error uniformization pass at some point
pub(crate) enum GetSegmentsError {
#[error("Failed to fetch")]
FailedToGetSegments(#[from] tonic::Status),
Expand All @@ -250,3 +287,21 @@ impl ChromaError for GetSegmentsError {
}
}
}

#[derive(Error, Debug)]
pub(crate) enum GetLastCompactionTimeError {
#[error("Failed to fetch")]
FailedToGetLastCompactionTime(#[from] tonic::Status),

#[error("Tenant not found in sysdb")]
TenantNotFound,
}

impl ChromaError for GetLastCompactionTimeError {
fn code(&self) -> ErrorCodes {
match self {
GetLastCompactionTimeError::FailedToGetLastCompactionTime(_) => ErrorCodes::Internal,
GetLastCompactionTimeError::TenantNotFound => ErrorCodes::Internal,
}
}
}
35 changes: 35 additions & 0 deletions rust/worker/src/sysdb/test_sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,40 @@ use crate::sysdb::sysdb::SysDb;
use crate::types::Collection;
use crate::types::Segment;
use crate::types::SegmentScope;
use crate::types::Tenant;
use async_trait::async_trait;
use std::collections::HashMap;
use uuid::Uuid;

use super::sysdb::GetLastCompactionTimeError;

#[derive(Clone, Debug)]
pub(crate) struct TestSysDb {
collections: HashMap<Uuid, Collection>,
tenant_last_compaction_time: HashMap<String, i64>,
}

impl TestSysDb {
pub(crate) fn new() -> Self {
TestSysDb {
collections: HashMap::new(),
tenant_last_compaction_time: HashMap::new(),
}
}

pub(crate) fn add_collection(&mut self, collection: Collection) {
self.collections.insert(collection.id, collection);
}

pub(crate) fn add_tenant_last_compaction_time(
&mut self,
tenant: String,
last_compaction_time: i64,
) {
self.tenant_last_compaction_time
.insert(tenant, last_compaction_time);
}

fn filter_collections(
collection: &Collection,
collection_id: Option<Uuid>,
Expand Down Expand Up @@ -81,4 +95,25 @@ impl SysDb for TestSysDb {
) -> Result<Vec<Segment>, GetSegmentsError> {
Ok(Vec::new())
}

async fn get_last_compaction_time(
&mut self,
tenant_ids: Vec<String>,
) -> Result<Vec<Tenant>, GetLastCompactionTimeError> {
let mut tenants = Vec::new();
for tenant_id in tenant_ids {
let last_compaction_time = match self.tenant_last_compaction_time.get(&tenant_id) {
Some(last_compaction_time) => *last_compaction_time,
None => {
// TODO: Log an error
return Err(GetLastCompactionTimeError::TenantNotFound);
}
};
tenants.push(Tenant {
id: tenant_id,
last_compaction_time,
});
}
Ok(tenants)
}
}
2 changes: 2 additions & 0 deletions rust/worker/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod record;
mod scalar_encoding;
mod segment;
mod segment_scope;
mod tenant;

// Re-export the types module, so that we can use it as a single import in other modules.
pub(crate) use collection::*;
Expand All @@ -16,4 +17,5 @@ pub(crate) use record::*;
pub(crate) use scalar_encoding::*;
pub(crate) use segment::*;
pub(crate) use segment_scope::*;
pub(crate) use tenant::*;
pub(crate) use types::*;
1 change: 0 additions & 1 deletion rust/worker/src/types/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
errors::{ChromaError, ErrorCodes},
};
use std::collections::HashMap;
use std::vec::Vec;
use thiserror::Error;
use uuid::Uuid;

Expand Down
17 changes: 17 additions & 0 deletions rust/worker/src/types/tenant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::chroma_proto::TenantLastCompactionTime;

pub(crate) struct Tenant {
pub(crate) id: String,
pub(crate) last_compaction_time: i64,
}

impl TryFrom<TenantLastCompactionTime> for Tenant {
type Error = ();

fn try_from(proto_tenant: TenantLastCompactionTime) -> Result<Self, Self::Error> {
Ok(Tenant {
id: proto_tenant.tenant_id,
last_compaction_time: proto_tenant.last_compaction_time,
})
}
}

0 comments on commit 2e03e28

Please sign in to comment.