Skip to content

Commit

Permalink
[BUG] Metadata should be handle uninit'ed record segment. (#2194)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- Metadata merge should handle uninit'ed record segments by not reading
the segment.
	 - Add tracing on demand for debugging in metadata reader
 - New functionality
	 - None

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
HammadB committed May 14, 2024
1 parent 9289bc4 commit 7de7f2d
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 119 deletions.
3 changes: 3 additions & 0 deletions chromadb/segment/impl/metadata/grpc_segment.py
Expand Up @@ -7,6 +7,7 @@
OpenTelemetryGranularity,
trace_method,
)
from chromadb.telemetry.opentelemetry.grpc import OtelInterceptor
from chromadb.types import (
Where,
WhereDocument,
Expand Down Expand Up @@ -36,6 +37,8 @@ def start(self) -> None:
raise Exception("Missing grpc_url in segment metadata")

channel = grpc.insecure_channel(self._segment["metadata"]["grpc_url"])
interceptors = [OtelInterceptor()]
channel = grpc.intercept_channel(channel, *interceptors)
self._metadata_reader_stub = MetadataReaderStub(channel) # type: ignore

@override
Expand Down
87 changes: 56 additions & 31 deletions rust/worker/src/execution/operators/merge_metadata_results.rs
Expand Up @@ -2,14 +2,14 @@ use crate::{
blockstore::provider::BlockfileProvider,
errors::{ChromaError, ErrorCodes},
execution::{data::data_chunk::Chunk, operator::Operator},
segment::record_segment::RecordSegmentReader,
segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
types::{
update_metdata_to_metdata, LogRecord, Metadata, MetadataValueConversionError, Segment,
},
};
use async_trait::async_trait;
use std::f64::consts::E;
use thiserror::Error;
use tracing::{error, trace};

#[derive(Debug)]
pub struct MergeMetadataResultsOperator {}
Expand Down Expand Up @@ -61,7 +61,7 @@ pub struct MergeMetadataResultsOperatorOutput {
#[derive(Error, Debug)]
pub enum MergeMetadataResultsOperatorError {
#[error("Error creating Record Segment")]
RecordSegmentError,
RecordSegmentCreationError(#[from] RecordSegmentReaderCreationError),
#[error("Error reading Record Segment")]
RecordSegmentReadError,
#[error("Error converting metadata")]
Expand All @@ -71,7 +71,7 @@ pub enum MergeMetadataResultsOperatorError {
impl ChromaError for MergeMetadataResultsOperatorError {
fn code(&self) -> ErrorCodes {
match self {
MergeMetadataResultsOperatorError::RecordSegmentError => ErrorCodes::Internal,
MergeMetadataResultsOperatorError::RecordSegmentCreationError(e) => e.code(),
MergeMetadataResultsOperatorError::RecordSegmentReadError => ErrorCodes::Internal,
MergeMetadataResultsOperatorError::MetadataConversionError(e) => e.code(),
}
Expand All @@ -91,6 +91,36 @@ impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOut
&self,
input: &MergeMetadataResultsOperatorInput,
) -> MergeMetadataResultsOperatorResult {
trace!(
"[MergeMetadataResultsOperator] segment id: {}",
input.record_segment_definition.id.to_string()
);

let mut ids: Vec<String> = Vec::new();
let mut metadata = Vec::new();
let mut documents = Vec::new();
// Add the data from the brute force results
for (log_entry, index) in input.filtered_log.iter() {
ids.push(log_entry.record.id.to_string());
let output_metadata = match &log_entry.record.metadata {
Some(log_metadata) => match update_metdata_to_metdata(log_metadata) {
Ok(metadata) => Some(metadata),
Err(e) => {
println!("Error converting log metadata: {:?}", e);
return Err(MergeMetadataResultsOperatorError::MetadataConversionError(
e,
));
}
},
None => {
println!("No metadata found for log entry");
None
}
};
metadata.push(output_metadata);
documents.push(log_entry.record.document.clone());
}

let record_segment_reader = match RecordSegmentReader::from_segment(
&input.record_segment_definition,
&input.blockfile_provider,
Expand All @@ -99,14 +129,31 @@ impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOut
{
Ok(reader) => reader,
Err(e) => {
return Err(MergeMetadataResultsOperatorError::RecordSegmentError);
match *e {
RecordSegmentReaderCreationError::UninitializedSegment => {
// This means no compaction has occured, so we can just return whats on the log.
return Ok(MergeMetadataResultsOperatorOutput {
ids,
metadata,
documents,
});
}
RecordSegmentReaderCreationError::BlockfileOpenError(_) => {
error!("Error creating Record Segment: {:?}", e);
return Err(
MergeMetadataResultsOperatorError::RecordSegmentCreationError(*e),
);
}
RecordSegmentReaderCreationError::InvalidNumberOfFiles => {
error!("Error creating Record Segment: {:?}", e);
return Err(
MergeMetadataResultsOperatorError::RecordSegmentCreationError(*e),
);
}
}
}
};

let mut ids: Vec<String> = Vec::new();
let mut metadata = Vec::new();
let mut documents = Vec::new();

// Hydrate the data from the record segment for filtered data
for index_offset_id in input.filtered_index_offset_ids.iter() {
let record = match record_segment_reader
Expand Down Expand Up @@ -171,28 +218,6 @@ impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOut
}
}

// Merge the data from the brute force results
for (log_entry, index) in input.filtered_log.iter() {
ids.push(log_entry.record.id.to_string());
let output_metadata = match &log_entry.record.metadata {
Some(log_metadata) => match update_metdata_to_metdata(log_metadata) {
Ok(metadata) => Some(metadata),
Err(e) => {
println!("Error converting log metadata: {:?}", e);
return Err(MergeMetadataResultsOperatorError::MetadataConversionError(
e,
));
}
},
None => {
println!("No metadata found for log entry");
None
}
};
metadata.push(output_metadata);
documents.push(log_entry.record.document.clone());
}

Ok(MergeMetadataResultsOperatorOutput {
ids,
metadata,
Expand Down
185 changes: 97 additions & 88 deletions rust/worker/src/server.rs
Expand Up @@ -212,6 +212,98 @@ impl WorkerServer {

return Ok(Response::new(resp));
}

async fn query_metadata_instrumented(
&self,
request: Request<QueryMetadataRequest>,
) -> Result<Response<QueryMetadataResponse>, Status> {
let request = request.into_inner();
let segment_uuid = match Uuid::parse_str(&request.segment_id) {
Ok(uuid) => uuid,
Err(_) => {
return Err(Status::invalid_argument("Invalid Segment UUID"));
}
};

let dispatcher = match self.dispatcher {
Some(ref dispatcher) => dispatcher,
None => {
return Err(Status::internal("No dispatcher found"));
}
};

let system = match self.system {
Some(ref system) => system,
None => {
return Err(Status::internal("No system found"));
}
};

// For now we don't support limit/offset/where/where document
if request.limit.is_some() || request.offset.is_some() {
return Err(Status::unimplemented("Limit and offset not supported"));
}
if request.where_document.is_some() {
return Err(Status::unimplemented("Where document not supported"));
}
if request.r#where.is_some() {
return Err(Status::unimplemented("Where not supported"));
}

let query_ids = request.ids;

let orchestrator = MetadataQueryOrchestrator::new(
system.clone(),
&segment_uuid,
query_ids,
self.log.clone(),
self.sysdb.clone(),
dispatcher.clone(),
self.blockfile_provider.clone(),
);

let result = orchestrator.run().await;
let result = match result {
Ok(result) => result,
Err(e) => {
return Err(Status::internal(format!(
"Error running orchestrator: {}",
e
)))
}
};

let mut output = Vec::new();
let (ids, metadatas, documents) = result;
for ((id, metadata), document) in ids
.into_iter()
.zip(metadatas.into_iter())
.zip(documents.into_iter())
{
// The transport layer assumes the document exists in the metadata
// with the special key "chroma:document"
let mut output_metadata = match metadata {
Some(metadata) => metadata,
None => HashMap::new(),
};
match document {
Some(document) => {
output_metadata
.insert("chroma:document".to_string(), MetadataValue::Str(document));
}
None => {}
}
let record = chroma_proto::MetadataEmbeddingRecord {
id,
metadata: Some(chroma_proto::UpdateMetadata::from(output_metadata)),
};
output.push(record);
}

// This is an implementation stub
let response = chroma_proto::QueryMetadataResponse { records: output };
Ok(Response::new(response))
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -308,93 +400,10 @@ impl chroma_proto::metadata_reader_server::MetadataReader for WorkerServer {
&self,
request: Request<QueryMetadataRequest>,
) -> Result<Response<QueryMetadataResponse>, Status> {
let request = request.into_inner();
let segment_uuid = match Uuid::parse_str(&request.segment_id) {
Ok(uuid) => uuid,
Err(_) => {
return Err(Status::invalid_argument("Invalid Segment UUID"));
}
};

println!("Querying metadata for segment {}", segment_uuid);

let dispatcher = match self.dispatcher {
Some(ref dispatcher) => dispatcher,
None => {
return Err(Status::internal("No dispatcher found"));
}
};

let system = match self.system {
Some(ref system) => system,
None => {
return Err(Status::internal("No system found"));
}
};

// For now we don't support limit/offset/where/where document
if request.limit.is_some() || request.offset.is_some() {
return Err(Status::unimplemented("Limit and offset not supported"));
}
if request.where_document.is_some() {
return Err(Status::unimplemented("Where document not supported"));
}
if request.r#where.is_some() {
return Err(Status::unimplemented("Where not supported"));
}

let query_ids = request.ids;

let orchestrator = MetadataQueryOrchestrator::new(
system.clone(),
&segment_uuid,
query_ids,
self.log.clone(),
self.sysdb.clone(),
dispatcher.clone(),
self.blockfile_provider.clone(),
);

let result = orchestrator.run().await;
let result = match result {
Ok(result) => result,
Err(e) => {
return Err(Status::internal(format!(
"Error running orchestrator: {}",
e
)))
}
};

let mut output = Vec::new();
let (ids, metadatas, documents) = result;
for ((id, metadata), document) in ids
.into_iter()
.zip(metadatas.into_iter())
.zip(documents.into_iter())
{
// The transport layer assumes the document exists in the metadata
// with the special key "chroma:document"
let mut output_metadata = match metadata {
Some(metadata) => metadata,
None => HashMap::new(),
};
match document {
Some(document) => {
output_metadata
.insert("chroma:document".to_string(), MetadataValue::Str(document));
}
None => {}
}
let record = chroma_proto::MetadataEmbeddingRecord {
id,
metadata: Some(chroma_proto::UpdateMetadata::from(output_metadata)),
};
output.push(record);
}

// This is an implementation stub
let response = chroma_proto::QueryMetadataResponse { records: output };
Ok(Response::new(response))
let query_span = trace_span!("Query metadata", segment_id = request.get_ref().segment_id);
let instrumented_span = wrap_span_with_parent_context(query_span, request.metadata());
self.query_metadata_instrumented(request)
.instrument(instrumented_span)
.await
}
}

0 comments on commit 7de7f2d

Please sign in to comment.