From d0eb6671a487a15a57e76933db60537c1d6b6ce6 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Mon, 13 May 2024 01:08:35 -0700 Subject: [PATCH 1/3] Handle deletes and updates for count --- rust/worker/src/blockstore/arrow/blockfile.rs | 16 ++++ .../src/blockstore/memory/reader_writer.rs | 4 + rust/worker/src/blockstore/memory/storage.rs | 52 ++++++++++++ rust/worker/src/blockstore/types.rs | 7 ++ .../src/execution/operators/count_records.rs | 85 ++++++++++++++++--- .../src/execution/orchestration/metadata.rs | 24 +++--- rust/worker/src/segment/record_segment.rs | 16 ++++ 7 files changed, 183 insertions(+), 21 deletions(-) diff --git a/rust/worker/src/blockstore/arrow/blockfile.rs b/rust/worker/src/blockstore/arrow/blockfile.rs index 3c5fe8f1c03..4ad035a7160 100644 --- a/rust/worker/src/blockstore/arrow/blockfile.rs +++ b/rust/worker/src/blockstore/arrow/blockfile.rs @@ -275,6 +275,22 @@ impl<'me, K: ArrowReadableKey<'me>, V: ArrowReadableValue<'me>> ArrowBlockfileRe } } + pub(crate) async fn key_exists(&'me self, prefix: &str, key: K) -> bool { + let search_key = CompositeKey::new(prefix.to_string(), key.clone()); + let target_block_id = self.sparse_index.get_target_block_id(&search_key); + let block = self.get_block(target_block_id).await; + let res: Option = match block { + Some(block) => block.get(prefix, key), + None => { + return false; + } + }; + match res { + Some(_) => true, + None => false, + } + } + // Count the total number of records. pub(crate) async fn count(&self) -> Result> { let mut block_ids: Vec = vec![]; diff --git a/rust/worker/src/blockstore/memory/reader_writer.rs b/rust/worker/src/blockstore/memory/reader_writer.rs index 6542eae7043..ac199c77a23 100644 --- a/rust/worker/src/blockstore/memory/reader_writer.rs +++ b/rust/worker/src/blockstore/memory/reader_writer.rs @@ -171,6 +171,10 @@ impl< V::count(&self.storage) } + pub(crate) fn key_exists(&'storage self, prefix: &str, key: K) -> bool { + V::key_exists(prefix, key.into(), &self.storage) + } + pub(crate) fn id(&self) -> uuid::Uuid { self.storage.id } diff --git a/rust/worker/src/blockstore/memory/storage.rs b/rust/worker/src/blockstore/memory/storage.rs index 482726b776e..c50510e966e 100644 --- a/rust/worker/src/blockstore/memory/storage.rs +++ b/rust/worker/src/blockstore/memory/storage.rs @@ -50,6 +50,8 @@ pub(crate) trait Readable<'referred_data>: Sized { ) -> Vec<(&'referred_data CompositeKey, Self)>; fn count(storage: &Storage) -> Result>; + + fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool; } impl Writeable for &str { @@ -163,6 +165,16 @@ impl<'referred_data> Readable<'referred_data> for &'referred_data str { fn count(storage: &Storage) -> Result> { Ok(storage.string_value_storage.iter().len()) } + + fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + storage + .string_value_storage + .get(&CompositeKey { + prefix: prefix.to_string(), + key, + }) + .is_some() + } } // TODO: remove this and make this all use a unified storage so we don't have two impls @@ -273,6 +285,16 @@ impl<'referred_data> Readable<'referred_data> for Int32Array { fn count(storage: &Storage) -> Result> { Ok(storage.int32_array_storage.iter().len()) } + + fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + storage + .int32_array_storage + .get(&CompositeKey { + prefix: prefix.to_string(), + key, + }) + .is_some() + } } impl Writeable for &RoaringBitmap { @@ -382,6 +404,16 @@ impl<'referred_data> Readable<'referred_data> for RoaringBitmap { fn count(storage: &Storage) -> Result> { Ok(storage.roaring_bitmap_storage.iter().len()) } + + fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + storage + .roaring_bitmap_storage + .get(&CompositeKey { + prefix: prefix.to_string(), + key, + }) + .is_some() + } } impl Writeable for u32 { @@ -486,6 +518,16 @@ impl<'referred_data> Readable<'referred_data> for u32 { fn count(storage: &Storage) -> Result> { Ok(storage.u32_storage.iter().len()) } + + fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + storage + .u32_storage + .get(&CompositeKey { + prefix: prefix.to_string(), + key, + }) + .is_some() + } } impl Writeable for &DataRecord<'_> { @@ -688,6 +730,16 @@ impl<'referred_data> Readable<'referred_data> for DataRecord<'referred_data> { fn count(storage: &Storage) -> Result> { Ok(storage.data_record_id_storage.iter().len()) } + + fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + storage + .data_record_id_storage + .get(&CompositeKey { + prefix: prefix.to_string(), + key, + }) + .is_some() + } } #[derive(Clone)] diff --git a/rust/worker/src/blockstore/types.rs b/rust/worker/src/blockstore/types.rs index 2b166673622..abe7139eeed 100644 --- a/rust/worker/src/blockstore/types.rs +++ b/rust/worker/src/blockstore/types.rs @@ -297,6 +297,13 @@ impl< } } + pub(crate) async fn key_exists(&'referred_data self, prefix: &str, key: K) -> bool { + match self { + BlockfileReader::ArrowBlockfileReader(reader) => reader.key_exists(prefix, key).await, + BlockfileReader::MemoryBlockfileReader(reader) => todo!(), + } + } + pub(crate) async fn count(&'referred_data self) -> Result> { match self { BlockfileReader::MemoryBlockfileReader(reader) => reader.count(), diff --git a/rust/worker/src/execution/operators/count_records.rs b/rust/worker/src/execution/operators/count_records.rs index 1712a999099..726d4cca68a 100644 --- a/rust/worker/src/execution/operators/count_records.rs +++ b/rust/worker/src/execution/operators/count_records.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use thiserror::Error; use tonic::async_trait; @@ -7,7 +9,7 @@ use crate::{ errors::{ChromaError, ErrorCodes}, execution::operator::Operator, segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - types::Segment, + types::{Operation, Segment}, }; #[derive(Debug)] @@ -23,16 +25,21 @@ impl CountRecordsOperator { pub(crate) struct CountRecordsInput { record_segment_definition: Segment, blockfile_provider: BlockfileProvider, + // Note: this vector needs to be in the same order as the log + // for the counting logic to be correct. + log_operation_and_id: Vec<(Operation, String)>, } impl CountRecordsInput { pub(crate) fn new( record_segment_definition: Segment, blockfile_provider: BlockfileProvider, + log_operation_and_id: Vec<(Operation, String)>, ) -> Self { Self { record_segment_definition, blockfile_provider, + log_operation_and_id, } } } @@ -53,7 +60,7 @@ pub(crate) enum CountRecordsError { impl ChromaError for CountRecordsError { fn code(&self) -> ErrorCodes { match self { - CountRecordsError::RecordSegmentError(_) => ErrorCodes::Internal, + CountRecordsError::RecordSegmentError(e) => e.code(), CountRecordsError::RecordSegmentReadError => ErrorCodes::Internal, } } @@ -71,20 +78,78 @@ impl Operator for CountRecordsOperator { &input.blockfile_provider, ) .await; - match segment_reader { - Ok(reader) => match reader.count().await { - Ok(val) => { - return Ok(CountRecordsOutput { count: val }); + let reader = match segment_reader { + Ok(r) => r, + Err(e) => { + println!("Error opening record segment"); + return Err(CountRecordsError::RecordSegmentError(*e)); + } + }; + // Reconcile adds, updates and deletes. + let mut present_id_set: HashSet = HashSet::new(); + let mut res_count: i32 = 0; + for (_, id) in &input.log_operation_and_id { + // In theory, we can sort all the ids here + // and send them to the reader so that the reader + // can process all in one iteration of the sparse index. + // In practice the blocks + // will get cached so overall performance benefits + // should not be significant. + match reader.data_exists_for_user_id(id).await { + Ok(exists) => { + if exists { + present_id_set.insert(id.clone()); + } } Err(_) => { println!("Error reading record segment"); return Err(CountRecordsError::RecordSegmentReadError); } - }, - Err(e) => { - println!("Error opening record segment"); - return Err(CountRecordsError::RecordSegmentError(*e)); } } + let mut present_set_unique: HashSet = present_id_set.clone(); + let mut absent_set_unique: HashSet = HashSet::new(); + for (op, id) in &input.log_operation_and_id { + if present_id_set.contains(id) { + match op { + Operation::Add | Operation::Upsert => { + present_set_unique.insert(id.clone()); + } + Operation::Delete => { + present_set_unique.remove(id); + } + Operation::Update => {} + } + } else { + match op { + Operation::Add | Operation::Upsert => { + absent_set_unique.insert(id.clone()); + } + Operation::Delete => { + absent_set_unique.remove(id); + } + Operation::Update => {} + } + } + } + // These are the records that are present in the record segment but have + // been deleted more recently in the log. + res_count -= (present_id_set.len() - present_set_unique.len()) as i32; + // These are the records that are absent in the record segment but + // have been inserted more recently in the log. + res_count += absent_set_unique.len() as i32; + // Finally, add the count from the record segment. + match reader.count().await { + Ok(val) => { + res_count += val as i32; + } + Err(_) => { + println!("Error reading record segment"); + return Err(CountRecordsError::RecordSegmentReadError); + } + }; + Ok(CountRecordsOutput { + count: res_count as usize, + }) } } diff --git a/rust/worker/src/execution/orchestration/metadata.rs b/rust/worker/src/execution/orchestration/metadata.rs index 68967817747..85e813cae1e 100644 --- a/rust/worker/src/execution/orchestration/metadata.rs +++ b/rust/worker/src/execution/orchestration/metadata.rs @@ -5,13 +5,13 @@ use crate::execution::operators::count_records::{ CountRecordsError, CountRecordsInput, CountRecordsOperator, CountRecordsOutput, }; use crate::execution::operators::merge_metadata_results::{ - MergeMetadataResultsOperator, MergeMetadataResultsOperatorError, - MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorResult, + MergeMetadataResultsOperator, MergeMetadataResultsOperatorInput, + MergeMetadataResultsOperatorResult, }; use crate::execution::operators::pull_log::{PullLogsInput, PullLogsOperator, PullLogsResult}; use crate::sysdb::sysdb::{GetCollectionsError, GetSegmentsError}; use crate::system::{Component, ComponentContext, Handler}; -use crate::types::{Collection, LogRecord, Metadata, SegmentType}; +use crate::types::{Collection, LogRecord, Metadata, Operation, SegmentType}; use crate::{ blockstore::provider::BlockfileProvider, execution::operator::TaskMessage, @@ -79,8 +79,6 @@ pub(crate) struct CountQueryOrchestrator { blockfile_provider: BlockfileProvider, // Result channel result_channel: Option>>>, - // Count of records in the log - log_record_count: usize, } #[derive(Error, Debug)] @@ -136,7 +134,6 @@ impl CountQueryOrchestrator { dispatcher, blockfile_provider, result_channel: None, - log_record_count: 0, } } @@ -227,7 +224,7 @@ impl CountQueryOrchestrator { Err(e) => { // Log an error - this implies the dispatcher was dropped somehow // and is likely fatal - println!("Error sending Metadata Query task: {:?}", e); + println!("Error sending Count Query task: {:?}", e); } } } @@ -373,9 +370,13 @@ impl Handler for CountQueryOrchestrator { async fn handle(&mut self, message: PullLogsResult, ctx: &ComponentContext) { match message { Ok(logs) => { - let logs = logs.logs(); - self.log_record_count = logs.total_len(); - // TODO: Add logic for merging logs with count from record segment. + let mut operation_and_id: Vec<(Operation, String)> = Vec::new(); + for (log_item, _) in logs.logs().iter() { + operation_and_id.push(( + log_item.record.operation.clone(), + log_item.record.id.clone(), + )); + } let operator = CountRecordsOperator::new(); let input = CountRecordsInput::new( self.record_segment @@ -383,6 +384,7 @@ impl Handler for CountQueryOrchestrator { .expect("Expect segment") .clone(), self.blockfile_provider.clone(), + operation_and_id, ); let msg = wrap(operator, input, ctx.sender.as_receiver()); match self.dispatcher.send(msg, None).await { @@ -418,7 +420,7 @@ impl Handler> for CountQueryOrches .result_channel .take() .expect("Expect channel to be present"); - match channel.send(Ok(msg.count + self.log_record_count)) { + match channel.send(Ok(msg.count)) { Ok(_) => (), Err(e) => { // Log an error - this implied the listener was dropped diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index bfdd38ee65c..a99a85b3e1e 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -599,6 +599,22 @@ impl RecordSegmentReader<'_> { self.id_to_data.get("", offset_id).await } + pub(crate) async fn data_exists_for_user_id( + &self, + user_id: &str, + ) -> Result> { + if !self.user_id_to_id.key_exists("", user_id).await { + return Ok(false); + } + let offset_id = match self.user_id_to_id.get("", user_id).await { + Ok(id) => id, + Err(e) => { + return Err(e); + } + }; + Ok(self.id_to_data.key_exists("", offset_id).await) + } + pub(crate) async fn count(&self) -> Result> { self.id_to_data.count().await } From 9bd59b6e885006f5350ab6faf1eb40d47aeb822d Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Mon, 13 May 2024 17:23:23 -0700 Subject: [PATCH 2/3] Review comments --- rust/worker/src/blockstore/arrow/blockfile.rs | 2 +- .../src/blockstore/memory/reader_writer.rs | 18 +- rust/worker/src/blockstore/memory/storage.rs | 12 +- rust/worker/src/blockstore/types.rs | 16 +- .../src/execution/operators/count_records.rs | 199 +++++++++++++++--- .../src/execution/orchestration/metadata.rs | 9 +- rust/worker/src/segment/mod.rs | 2 +- rust/worker/src/segment/record_segment.rs | 4 +- 8 files changed, 202 insertions(+), 60 deletions(-) diff --git a/rust/worker/src/blockstore/arrow/blockfile.rs b/rust/worker/src/blockstore/arrow/blockfile.rs index 4ad035a7160..3f42a328e4b 100644 --- a/rust/worker/src/blockstore/arrow/blockfile.rs +++ b/rust/worker/src/blockstore/arrow/blockfile.rs @@ -275,7 +275,7 @@ impl<'me, K: ArrowReadableKey<'me>, V: ArrowReadableValue<'me>> ArrowBlockfileRe } } - pub(crate) async fn key_exists(&'me self, prefix: &str, key: K) -> bool { + pub(crate) async fn contains(&'me self, prefix: &str, key: K) -> bool { let search_key = CompositeKey::new(prefix.to_string(), key.clone()); let target_block_id = self.sparse_index.get_target_block_id(&search_key); let block = self.get_block(target_block_id).await; diff --git a/rust/worker/src/blockstore/memory/reader_writer.rs b/rust/worker/src/blockstore/memory/reader_writer.rs index ac199c77a23..49c3ae2e693 100644 --- a/rust/worker/src/blockstore/memory/reader_writer.rs +++ b/rust/worker/src/blockstore/memory/reader_writer.rs @@ -11,6 +11,16 @@ pub(crate) struct MemoryBlockfileWriter { id: uuid::Uuid, } +pub(crate) struct MemoryBlockfileFlusher { + id: uuid::Uuid, +} + +impl MemoryBlockfileFlusher { + pub(crate) fn id(&self) -> uuid::Uuid { + self.id + } +} + impl MemoryBlockfileWriter { pub(super) fn new(storage_manager: StorageManager) -> Self { let builder = storage_manager.create(); @@ -22,9 +32,9 @@ impl MemoryBlockfileWriter { } } - pub(crate) fn commit(&self) -> Result<(), Box> { + pub(crate) fn commit(&self) -> Result> { self.storage_manager.commit(self.builder.id); - Ok(()) + Ok(MemoryBlockfileFlusher { id: self.id }) } pub(crate) fn set, V: Value + Writeable>( @@ -171,8 +181,8 @@ impl< V::count(&self.storage) } - pub(crate) fn key_exists(&'storage self, prefix: &str, key: K) -> bool { - V::key_exists(prefix, key.into(), &self.storage) + pub(crate) fn contains(&'storage self, prefix: &str, key: K) -> bool { + V::contains(prefix, key.into(), &self.storage) } pub(crate) fn id(&self) -> uuid::Uuid { diff --git a/rust/worker/src/blockstore/memory/storage.rs b/rust/worker/src/blockstore/memory/storage.rs index c50510e966e..715bc38fc1a 100644 --- a/rust/worker/src/blockstore/memory/storage.rs +++ b/rust/worker/src/blockstore/memory/storage.rs @@ -51,7 +51,7 @@ pub(crate) trait Readable<'referred_data>: Sized { fn count(storage: &Storage) -> Result>; - fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool; + fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool; } impl Writeable for &str { @@ -166,7 +166,7 @@ impl<'referred_data> Readable<'referred_data> for &'referred_data str { Ok(storage.string_value_storage.iter().len()) } - fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { storage .string_value_storage .get(&CompositeKey { @@ -286,7 +286,7 @@ impl<'referred_data> Readable<'referred_data> for Int32Array { Ok(storage.int32_array_storage.iter().len()) } - fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { storage .int32_array_storage .get(&CompositeKey { @@ -405,7 +405,7 @@ impl<'referred_data> Readable<'referred_data> for RoaringBitmap { Ok(storage.roaring_bitmap_storage.iter().len()) } - fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { storage .roaring_bitmap_storage .get(&CompositeKey { @@ -519,7 +519,7 @@ impl<'referred_data> Readable<'referred_data> for u32 { Ok(storage.u32_storage.iter().len()) } - fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { storage .u32_storage .get(&CompositeKey { @@ -731,7 +731,7 @@ impl<'referred_data> Readable<'referred_data> for DataRecord<'referred_data> { Ok(storage.data_record_id_storage.iter().len()) } - fn key_exists(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { + fn contains(prefix: &str, key: KeyWrapper, storage: &'referred_data Storage) -> bool { storage .data_record_id_storage .get(&CompositeKey { diff --git a/rust/worker/src/blockstore/types.rs b/rust/worker/src/blockstore/types.rs index abe7139eeed..c0e835262d5 100644 --- a/rust/worker/src/blockstore/types.rs +++ b/rust/worker/src/blockstore/types.rs @@ -4,7 +4,9 @@ use super::arrow::types::{ ArrowReadableKey, ArrowReadableValue, ArrowWriteableKey, ArrowWriteableValue, }; use super::key::KeyWrapper; -use super::memory::reader_writer::{MemoryBlockfileReader, MemoryBlockfileWriter}; +use super::memory::reader_writer::{ + MemoryBlockfileFlusher, MemoryBlockfileReader, MemoryBlockfileWriter, +}; use super::memory::storage::{Readable, Writeable}; use crate::errors::{ChromaError, ErrorCodes}; use crate::segment::DataRecord; @@ -192,7 +194,7 @@ impl BlockfileWriter { ) -> Result> { match self { BlockfileWriter::MemoryBlockfileWriter(writer) => match writer.commit() { - Ok(_) => Ok(BlockfileFlusher::MemoryBlockfileFlusher(())), + Ok(flusher) => Ok(BlockfileFlusher::MemoryBlockfileFlusher(flusher)), Err(e) => Err(e), }, BlockfileWriter::ArrowBlockfileWriter(writer) => match writer.commit::() { @@ -242,7 +244,7 @@ impl BlockfileWriter { } pub(crate) enum BlockfileFlusher { - MemoryBlockfileFlusher(()), + MemoryBlockfileFlusher(MemoryBlockfileFlusher), ArrowBlockfileFlusher(ArrowBlockfileFlusher), } @@ -262,7 +264,7 @@ impl BlockfileFlusher { pub(crate) fn id(&self) -> uuid::Uuid { match self { // TODO: should memory blockfiles have ids? - BlockfileFlusher::MemoryBlockfileFlusher(_) => uuid::Uuid::nil(), + BlockfileFlusher::MemoryBlockfileFlusher(flusher) => flusher.id(), BlockfileFlusher::ArrowBlockfileFlusher(flusher) => flusher.id(), } } @@ -297,10 +299,10 @@ impl< } } - pub(crate) async fn key_exists(&'referred_data self, prefix: &str, key: K) -> bool { + pub(crate) async fn contains(&'referred_data self, prefix: &str, key: K) -> bool { match self { - BlockfileReader::ArrowBlockfileReader(reader) => reader.key_exists(prefix, key).await, - BlockfileReader::MemoryBlockfileReader(reader) => todo!(), + BlockfileReader::ArrowBlockfileReader(reader) => reader.contains(prefix, key).await, + BlockfileReader::MemoryBlockfileReader(reader) => reader.contains(prefix, key), } } diff --git a/rust/worker/src/execution/operators/count_records.rs b/rust/worker/src/execution/operators/count_records.rs index 726d4cca68a..7ada35597d4 100644 --- a/rust/worker/src/execution/operators/count_records.rs +++ b/rust/worker/src/execution/operators/count_records.rs @@ -7,9 +7,9 @@ use tonic::async_trait; use crate::{ blockstore::provider::BlockfileProvider, errors::{ChromaError, ErrorCodes}, - execution::operator::Operator, + execution::{data::data_chunk::Chunk, operator::Operator}, segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, - types::{Operation, Segment}, + types::{LogRecord, Operation, Segment}, }; #[derive(Debug)] @@ -25,21 +25,19 @@ impl CountRecordsOperator { pub(crate) struct CountRecordsInput { record_segment_definition: Segment, blockfile_provider: BlockfileProvider, - // Note: this vector needs to be in the same order as the log - // for the counting logic to be correct. - log_operation_and_id: Vec<(Operation, String)>, + log_records: Chunk, } impl CountRecordsInput { pub(crate) fn new( record_segment_definition: Segment, blockfile_provider: BlockfileProvider, - log_operation_and_id: Vec<(Operation, String)>, + log_records: Chunk, ) -> Self { Self { record_segment_definition, blockfile_provider, - log_operation_and_id, + log_records, } } } @@ -86,19 +84,25 @@ impl Operator for CountRecordsOperator { } }; // Reconcile adds, updates and deletes. - let mut present_id_set: HashSet = HashSet::new(); + // Ids that exist in both the log and the segment (can be + // in both deleted and not deleted state). + let mut deleted_and_non_deleted_present_in_segment: HashSet = HashSet::new(); let mut res_count: i32 = 0; - for (_, id) in &input.log_operation_and_id { - // In theory, we can sort all the ids here - // and send them to the reader so that the reader - // can process all in one iteration of the sparse index. - // In practice the blocks - // will get cached so overall performance benefits - // should not be significant. - match reader.data_exists_for_user_id(id).await { + // In theory, we can sort all the ids here + // and send them to the reader so that the reader + // can process all in one iteration of the sparse index. + // In practice, the blocks + // will get cached so overall performance benefits + // should not be significant. + for (log_record, _) in input.log_records.iter() { + match reader + .data_exists_for_user_id(log_record.record.id.as_str()) + .await + { Ok(exists) => { if exists { - present_id_set.insert(id.clone()); + deleted_and_non_deleted_present_in_segment + .insert(log_record.record.id.clone()); } } Err(_) => { @@ -107,37 +111,41 @@ impl Operator for CountRecordsOperator { } } } - let mut present_set_unique: HashSet = present_id_set.clone(); - let mut absent_set_unique: HashSet = HashSet::new(); - for (op, id) in &input.log_operation_and_id { - if present_id_set.contains(id) { - match op { + // Ids that are present in the log and segment and their end state is not deleted. + let mut non_deleted_present_in_segment: HashSet = + deleted_and_non_deleted_present_in_segment.clone(); + // Ids that are absent in the segment but present in log in non deleted state. + let mut non_deleted_absent_in_segment: HashSet = HashSet::new(); + for (log_record, _) in input.log_records.iter() { + if deleted_and_non_deleted_present_in_segment.contains(log_record.record.id.as_str()) { + match log_record.record.operation { Operation::Add | Operation::Upsert => { - present_set_unique.insert(id.clone()); + non_deleted_present_in_segment.insert(log_record.record.id.clone()); } Operation::Delete => { - present_set_unique.remove(id); + non_deleted_present_in_segment.remove(log_record.record.id.as_str()); } Operation::Update => {} } } else { - match op { + match log_record.record.operation { Operation::Add | Operation::Upsert => { - absent_set_unique.insert(id.clone()); + non_deleted_absent_in_segment.insert(log_record.record.id.clone()); } Operation::Delete => { - absent_set_unique.remove(id); + non_deleted_absent_in_segment.remove(log_record.record.id.as_str()); } Operation::Update => {} } } } - // These are the records that are present in the record segment but have + // Discount the records that are present in the record segment but have // been deleted more recently in the log. - res_count -= (present_id_set.len() - present_set_unique.len()) as i32; - // These are the records that are absent in the record segment but + res_count -= (deleted_and_non_deleted_present_in_segment.len() + - non_deleted_present_in_segment.len()) as i32; + // Add the records that are absent in the record segment but // have been inserted more recently in the log. - res_count += absent_set_unique.len() as i32; + res_count += non_deleted_absent_in_segment.len() as i32; // Finally, add the count from the record segment. match reader.count().await { Ok(val) => { @@ -153,3 +161,132 @@ impl Operator for CountRecordsOperator { }) } } + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, str::FromStr}; + + use uuid::Uuid; + + use crate::{ + blockstore::provider::BlockfileProvider, + execution::{ + data::data_chunk::Chunk, + operator::Operator, + operators::count_records::{CountRecordsInput, CountRecordsOperator}, + }, + segment::{record_segment::RecordSegmentWriter, LogMaterializer, SegmentWriter}, + types::{LogRecord, Operation, OperationRecord}, + }; + + use crate::segment::types::SegmentFlusher; + + #[tokio::test] + async fn test_merge_log_and_storage() { + let in_memory_provider = BlockfileProvider::new_memory(); + let mut record_segment = crate::types::Segment { + id: Uuid::from_str("00000000-0000-0000-0000-000000000000").expect("parse error"), + r#type: crate::types::SegmentType::Record, + scope: crate::types::SegmentScope::RECORD, + collection: Some( + Uuid::from_str("00000000-0000-0000-0000-000000000000").expect("parse error"), + ), + metadata: None, + file_path: HashMap::new(), + }; + { + let segment_writer = + RecordSegmentWriter::from_segment(&record_segment, &in_memory_provider) + .await + .expect("Error creating segment writer"); + let data = vec![ + LogRecord { + log_offset: 1, + record: OperationRecord { + id: "embedding_id_1".to_string(), + embedding: Some(vec![1.0, 2.0, 3.0]), + encoding: None, + metadata: None, + document: None, + operation: Operation::Add, + }, + }, + LogRecord { + log_offset: 2, + record: OperationRecord { + id: "embedding_id_2".to_string(), + embedding: Some(vec![4.0, 5.0, 6.0]), + encoding: None, + metadata: None, + document: None, + operation: Operation::Add, + }, + }, + LogRecord { + log_offset: 3, + record: OperationRecord { + id: "embedding_id_1".to_string(), + embedding: None, + encoding: None, + metadata: None, + document: None, + operation: Operation::Delete, + }, + }, + ]; + let data: Chunk = Chunk::new(data.into()); + segment_writer.materialize(&data).await; + let flusher = segment_writer + .commit() + .expect("Commit for segment writer failed"); + record_segment.file_path = flusher.flush().await.expect("Flush segment writer failed"); + } + let data = vec![ + LogRecord { + log_offset: 4, + record: OperationRecord { + id: "embedding_id_1".to_string(), + embedding: Some(vec![1.0, 2.0, 3.0]), + encoding: None, + metadata: None, + document: None, + operation: Operation::Add, + }, + }, + LogRecord { + log_offset: 5, + record: OperationRecord { + id: "embedding_id_4".to_string(), + embedding: Some(vec![4.0, 5.0, 6.0]), + encoding: None, + metadata: None, + document: None, + operation: Operation::Add, + }, + }, + LogRecord { + log_offset: 6, + record: OperationRecord { + id: "embedding_id_2".to_string(), + embedding: None, + encoding: None, + metadata: None, + document: None, + operation: Operation::Update, + }, + }, + ]; + let data: Chunk = Chunk::new(data.into()); + let input = CountRecordsInput { + record_segment_definition: record_segment, + blockfile_provider: in_memory_provider, + log_records: data, + }; + let operator = CountRecordsOperator {}; + let count = operator + .run(&input) + .await + .expect("Count operator run failed"); + assert_eq!(3, count.count); + } +} diff --git a/rust/worker/src/execution/orchestration/metadata.rs b/rust/worker/src/execution/orchestration/metadata.rs index 85e813cae1e..95d7beb8254 100644 --- a/rust/worker/src/execution/orchestration/metadata.rs +++ b/rust/worker/src/execution/orchestration/metadata.rs @@ -370,13 +370,6 @@ impl Handler for CountQueryOrchestrator { async fn handle(&mut self, message: PullLogsResult, ctx: &ComponentContext) { match message { Ok(logs) => { - let mut operation_and_id: Vec<(Operation, String)> = Vec::new(); - for (log_item, _) in logs.logs().iter() { - operation_and_id.push(( - log_item.record.operation.clone(), - log_item.record.id.clone(), - )); - } let operator = CountRecordsOperator::new(); let input = CountRecordsInput::new( self.record_segment @@ -384,7 +377,7 @@ impl Handler for CountQueryOrchestrator { .expect("Expect segment") .clone(), self.blockfile_provider.clone(), - operation_and_id, + logs.logs(), ); let msg = wrap(operator, input, ctx.sender.as_receiver()); match self.dispatcher.send(msg, None).await { diff --git a/rust/worker/src/segment/mod.rs b/rust/worker/src/segment/mod.rs index dff3236bc60..509a0b7147c 100644 --- a/rust/worker/src/segment/mod.rs +++ b/rust/worker/src/segment/mod.rs @@ -1,6 +1,6 @@ pub(crate) mod config; pub(crate) mod distributed_hnsw_segment; pub(crate) mod record_segment; -mod types; +pub(crate) mod types; pub(crate) use types::*; diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index a99a85b3e1e..a778c4f0a22 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -603,7 +603,7 @@ impl RecordSegmentReader<'_> { &self, user_id: &str, ) -> Result> { - if !self.user_id_to_id.key_exists("", user_id).await { + if !self.user_id_to_id.contains("", user_id).await { return Ok(false); } let offset_id = match self.user_id_to_id.get("", user_id).await { @@ -612,7 +612,7 @@ impl RecordSegmentReader<'_> { return Err(e); } }; - Ok(self.id_to_data.key_exists("", offset_id).await) + Ok(self.id_to_data.contains("", offset_id).await) } pub(crate) async fn count(&self) -> Result> { From 85b244c71212c2c7f4c427a794748e63d8eef17d Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Mon, 13 May 2024 21:48:11 -0700 Subject: [PATCH 3/3] Review comments --- rust/worker/src/blockstore/types.rs | 1 - .../src/execution/operators/count_records.rs | 18 ++++++------------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/rust/worker/src/blockstore/types.rs b/rust/worker/src/blockstore/types.rs index c0e835262d5..d9898fc84c6 100644 --- a/rust/worker/src/blockstore/types.rs +++ b/rust/worker/src/blockstore/types.rs @@ -263,7 +263,6 @@ impl BlockfileFlusher { pub(crate) fn id(&self) -> uuid::Uuid { match self { - // TODO: should memory blockfiles have ids? BlockfileFlusher::MemoryBlockfileFlusher(flusher) => flusher.id(), BlockfileFlusher::ArrowBlockfileFlusher(flusher) => flusher.id(), } diff --git a/rust/worker/src/execution/operators/count_records.rs b/rust/worker/src/execution/operators/count_records.rs index 7ada35597d4..13d429ae37e 100644 --- a/rust/worker/src/execution/operators/count_records.rs +++ b/rust/worker/src/execution/operators/count_records.rs @@ -1,9 +1,3 @@ -use std::collections::HashSet; - -use thiserror::Error; - -use tonic::async_trait; - use crate::{ blockstore::provider::BlockfileProvider, errors::{ChromaError, ErrorCodes}, @@ -11,6 +5,9 @@ use crate::{ segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, types::{LogRecord, Operation, Segment}, }; +use std::collections::HashSet; +use thiserror::Error; +use tonic::async_trait; #[derive(Debug)] pub(crate) struct CountRecordsOperator {} @@ -164,10 +161,7 @@ impl Operator for CountRecordsOperator { #[cfg(test)] mod tests { - use std::{collections::HashMap, str::FromStr}; - - use uuid::Uuid; - + use crate::segment::types::SegmentFlusher; use crate::{ blockstore::provider::BlockfileProvider, execution::{ @@ -178,8 +172,8 @@ mod tests { segment::{record_segment::RecordSegmentWriter, LogMaterializer, SegmentWriter}, types::{LogRecord, Operation, OperationRecord}, }; - - use crate::segment::types::SegmentFlusher; + use std::{collections::HashMap, str::FromStr}; + use uuid::Uuid; #[tokio::test] async fn test_merge_log_and_storage() {