-
Notifications
You must be signed in to change notification settings - Fork 2k
[CHORE] Move posthog to rust #4142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
a04b186 to
c06c921
Compare
chromadb/api/rust.py
Outdated
| ) | ||
|
|
||
| # Determine telemetry user ID (moved from __init__ attempt) | ||
| user_id_path = str(Path.home() / ".cache" / "chroma" / "telemetry_user_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we handle this on the Rust side instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accessing Path.home() via pyo3?
| use async_trait::async_trait; | ||
|
|
||
| #[async_trait] | ||
| pub trait TelemetryClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be a trait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think so right? so that way on batch we can take in the trait, and then any of the events can be passed. from my understanding, trait is the closest to python interfaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reason to do this is to allow abstracting away the concept of posthog
rust/telemetry/src/events.rs
Outdated
| other: Box<dyn ProductTelemetryEvent + Send + Sync>, | ||
| ) -> Result<Box<dyn ProductTelemetryEvent + Send + Sync>, &'static str>; | ||
| } | ||
| impl_downcast!(sync ProductTelemetryEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all events implement the ProductTelemetryEvent trait, we need a way to only batch the events that match the type. downcast lets you use the .as_any() function, so then on batching we can check if the two events match, and then combine them if they are
|
claiming a review slot |
68e41a2 to
6fbfa01
Compare
|
|
||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
|
||
| [dependencies] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please import from workspace
rust/telemetry/Cargo.toml
Outdated
|
|
||
| [dependencies] | ||
| async-trait = "0.1" | ||
| downcast-rs = "1.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this crate can be implemented in a very simple way ourselves, can we not implement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk will do
| } | ||
|
|
||
| async fn direct_capture(&self, event: Box<dyn ProductTelemetryEvent + Send + Sync>) { | ||
| let event_name = event.name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: in rust there is a TryFrom trait.
You can bound ProductTelemetryEvent: TryInto.
Then this method can just do .tryFrom / into
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe the we can make ProductTelemetry Event -> Posthog Event into a TryFrom/Into, because it uses Posthog.user_id, chroma_version, is_server etc. We also can't populate that data in during direct capture, since to create a posthog event it needs a distnict id, which uses user_id in this case. So i believe this can't be simplified more.
| const UNKNOWN_USER_ID: &str = "UNKNOWN"; | ||
|
|
||
| pub struct Posthog { | ||
| client: Client, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This client doesn't implement batching
We need batching for performance here, and to move the requests out of the critical path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think our implementation is supposed to batch. looking at the python code, it also does a single capture call. the idea is that it "batches" for example 500 requests -> fires once to signify that 500 events of that type have been fired. so the capture itself is singular. i don't think we need to implement batching in the posthog client, since we arent sending 500 capture calls. thoughts?
Can go over the python code to discuss as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The posthog client in python internally batches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HammadB
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make sure distributed chroma disables this by default
rust/telemetry/src/posthog.rs
Outdated
| let batch_key = event.batch_key(); | ||
| let max_batch_size = event.max_batch_size(); | ||
|
|
||
| let mut seen_types = self.seen_event_types.lock().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest a standard sync mutex here
https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html#which-kind-of-mutex-should-you-use
And to not hold the lock across direct_capture.await(). Reason being you can do the lock access and then bookkeep whether you need to direct capture. Races are tolerable.
rust/telemetry/src/posthog.rs
Outdated
| return; | ||
| } | ||
|
|
||
| drop(seen_types); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer guarding locks with a scope instead of dropping.
{
let x = seen_types.lock()
}
rust/telemetry/src/posthog.rs
Outdated
| is_server, | ||
| chroma_version, | ||
| batched_events: Arc::new(Mutex::new(HashMap::new())), | ||
| seen_event_types: Arc::new(Mutex::new(HashSet::new())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RWLock maybe worth it here, since its read-mostly
rust/telemetry/src/events.rs
Outdated
| // ClientStartEvent | ||
| #[derive(Serialize, Debug, Clone)] | ||
| pub struct ClientStartEvent { | ||
| pub in_colab: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would just pipe this from the python context
rust/telemetry/src/posthog.rs
Outdated
| user_id: String, | ||
| is_server: bool, | ||
| chroma_version: String, | ||
| batched_events: Arc<Mutex<HashMap<String, Box<dyn ProductTelemetryEvent + Send + Sync>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the "EventType" not a string, but strongly typed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not able to do this for the HashMap key type, since when inserting to batched_events here
https://github.com/chroma-core/chroma/pull/4142/files#diff-0b3932af99d02d8c0ed92c073152e03b59b9c62f1e438e9447e570378a0789ebR93
it uses the batch_key.
Some batch keys are not strongly typed, and vary based on the collection id. Like line 200 in events.rs
https://github.com/chroma-core/chroma/pull/4142/files#diff-0ca588a7c1ef2ba19cb18d0f33cf33873b37df89fe956ce0ef37ccd3fb00557eR200
rust/telemetry/src/events.rs
Outdated
| fn batch_size(&self) -> usize { | ||
| self.batch_count | ||
| } | ||
| // Include limit in batch key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added this to keep it 1:1 with python, not sure why it was implemented that way. Should I remove it?
chroma/chromadb/telemetry/product/events.py
Line 227 in 15a21ad
| return self.collection_uuid + self.name + str(self.limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove it, not necessary
a3d5459 to
a5f5665
Compare
jairad26
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propogate anonymize telemetry to server, look at how anonymize server telemetry works and add that
rust/telemetry/src/events.rs
Outdated
|
|
||
| pub async fn submit_event(event: Box<dyn ProductTelemetryEvent + Send + Sync>) { | ||
| // Disable telemetry capture when running tests | ||
| let in_pytest = std::env::var("CHROMA_IN_PYTEST").map_or(false, |val| val == "1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more tests in rust
rust/telemetry/src/events.rs
Outdated
| where | ||
| Self: Sized + Send + Sync + 'static, | ||
| { | ||
| submit_event(Box::new(self)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove boxing if possible (copies)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't because handler.send requires a known size at compile time (message is a known size). in the telemetry case, since each event is an unknonw/variant size, we have to box it. however, for clarity i made submit_event implement the generic T like submit, so now boxing only occurs when necessary, at handle.send
rust/telemetry/src/events.rs
Outdated
| if self.batch_key() != other_event.batch_key() { | ||
| return Err("Cannot batch events with different keys"); | ||
| } | ||
| Ok(Box::new(CollectionAddEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mutate self instead of creating new addevent (goes for all events)
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test larger batch sizes, and multiple sets of batches (diff ids, collections), err on diff batches.
rust/telemetry/src/posthog.rs
Outdated
| is_server: bool, | ||
| chroma_version: String, | ||
| last_flush_time: DateTime<Utc>, | ||
| batched_events: Arc<Mutex<HashMap<String, Box<dyn ProductTelemetryEvent + Send + Sync>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove all Arc and mutexs, we're in a single thread
rust/python_bindings/src/bindings.rs
Outdated
| //////////////////////////// Frontend Setup //////////////////////////// | ||
|
|
||
| // Get version dynamically | ||
| let version = Python::with_gil(|py| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove python-isms, read cargo crate version
rust/python_bindings/src/bindings.rs
Outdated
| allow_reset: bool, | ||
| sqlite_db_config: SqliteDBConfig, | ||
| hnsw_cache_size: usize, | ||
| user_id: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to telemetry user id, remove the python fetching user id and move to underlying rust, so that can be reused
d071ff9 to
984df41
Compare
| // Capture lengths of ids, embeddings, documents, uris, and metadatas | ||
| let add_amount = ids.len(); | ||
| let with_documents = documents.as_ref().map_or(0, |_| add_amount); | ||
| let with_metadata = metadatas.as_ref().map_or(0, |_| add_amount); | ||
| let with_uris = uris.as_ref().map_or(0, |_| add_amount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
There's some repeated logic for preparing telemetry data across several methods (add, update, upsert, get, and query).
For instance, the calculation of with_documents, with_metadata, and with_uris is duplicated in add, update, and upsert.
Similarly, the logic to determine include_* counts based on the Include enum is duplicated in get and query.
Consider extracting these into small helper functions to reduce duplication and improve maintainability. This would centralize the telemetry data preparation logic.
rust/telemetry/src/posthog.rs
Outdated
| let should_capture_directly = | ||
| { max_batch_size == 1 || !self.seen_event_types.contains(&batch_key) }; | ||
|
|
||
| if should_capture_directly { | ||
| self.seen_event_types.insert(batch_key.clone()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
This block of code involving should_capture_directly and seen_event_types appears to be unused in the rest of the function. The seen_event_types set is modified here but never read elsewhere, and should_capture_directly is not used to alter control flow. It seems like this might be leftover from a previous implementation. Consider removing it to simplify the logic.
| // --- Telemetry Capture --- | ||
| let event = CollectionAddEvent::new( | ||
| collection_id.to_string(), | ||
| add_amount, | ||
| with_documents, | ||
| with_metadata, | ||
| with_uris, | ||
| add_amount, // batch_size for this event instance | ||
| ); | ||
| event.submit().await; | ||
| // --- End Telemetry Capture --- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The telemetry event is currently submitted before checking if the operation was successful. This could lead to inaccurate telemetry data if the operation fails after retries. To ensure events are only sent for successful operations, the telemetry capture block should be moved inside the Ok(()) arm of the match res block. This also aligns with the TODO: Submit event after the response is sent comment.
This pattern appears in the add (lines 791-801), update (lines 894-904), and upsert (lines 1000-1010) methods.
chromadb/api/rust.py
Outdated
| user_id_path = USER_ID_PATH | ||
| try: | ||
| if os.path.exists(user_id_path): | ||
| with open(user_id_path, "r") as f: | ||
| telemetry_user_id = f.read() | ||
| else: | ||
| # Ensure the directory exists before writing | ||
| os.makedirs(os.path.dirname(user_id_path), exist_ok=True) | ||
| telemetry_user_id = str(uuid.uuid4()) | ||
| with open(user_id_path, "w") as f: | ||
| f.write(telemetry_user_id) | ||
| except Exception: | ||
| telemetry_user_id = UNKNOWN_USER_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Missing error handling for telemetry user ID file operations. If the directory creation fails or the file write operation fails after directory creation succeeds, the code falls back to UNKNOWN_USER_ID but doesn't log the error. This could make debugging telemetry issues difficult.
Suggested Change
| user_id_path = USER_ID_PATH | |
| try: | |
| if os.path.exists(user_id_path): | |
| with open(user_id_path, "r") as f: | |
| telemetry_user_id = f.read() | |
| else: | |
| # Ensure the directory exists before writing | |
| os.makedirs(os.path.dirname(user_id_path), exist_ok=True) | |
| telemetry_user_id = str(uuid.uuid4()) | |
| with open(user_id_path, "w") as f: | |
| f.write(telemetry_user_id) | |
| except Exception: | |
| telemetry_user_id = UNKNOWN_USER_ID | |
| # Determine telemetry user ID (moved from __init__ attempt) | |
| user_id_path = USER_ID_PATH | |
| try: | |
| if os.path.exists(user_id_path): | |
| with open(user_id_path, "r") as f: | |
| telemetry_user_id = f.read().strip() | |
| else: | |
| # Ensure the directory exists before writing | |
| os.makedirs(os.path.dirname(user_id_path), exist_ok=True) | |
| telemetry_user_id = str(uuid.uuid4()) | |
| with open(user_id_path, "w") as f: | |
| f.write(telemetry_user_id) | |
| except Exception as e: | |
| # Log the error for debugging telemetry issues | |
| print(f"Warning: Failed to handle telemetry user ID: {e}") | |
| telemetry_user_id = UNKNOWN_USER_ID |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**BestPractice**]
Missing error handling for telemetry user ID file operations. If the directory creation fails or the file write operation fails after directory creation succeeds, the code falls back to `UNKNOWN_USER_ID` but doesn't log the error. This could make debugging telemetry issues difficult.
<details>
<summary>Suggested Change</summary>
```suggestion
# Determine telemetry user ID (moved from __init__ attempt)
user_id_path = USER_ID_PATH
try:
if os.path.exists(user_id_path):
with open(user_id_path, "r") as f:
telemetry_user_id = f.read().strip()
else:
# Ensure the directory exists before writing
os.makedirs(os.path.dirname(user_id_path), exist_ok=True)
telemetry_user_id = str(uuid.uuid4())
with open(user_id_path, "w") as f:
f.write(telemetry_user_id)
except Exception as e:
# Log the error for debugging telemetry issues
print(f"Warning: Failed to handle telemetry user ID: {e}")
telemetry_user_id = UNKNOWN_USER_ID
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
</details>
File: chromadb/api/rust.py
Line: 132| } else { | ||
| self.batched_events.insert(batch_key, existing_event); | ||
| None | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Potential data loss during batching: When existing_event.batch(event) fails, both the existing event and the new event are discarded. The existing event should be preserved and the new event should be processed individually.
Suggested Change
| } else { | |
| self.batched_events.insert(batch_key, existing_event); | |
| None | |
| } | |
| Err(e) => { | |
| warn!( | |
| "Error batching event type {}: {}. Processing new event individually.", | |
| batch_key, e | |
| ); | |
| // Keep the existing event in batched_events | |
| self.batched_events.insert(batch_key.clone(), existing_event); | |
| // Process the new event directly | |
| self.convert_to_posthog_event(event).await; | |
| } |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**CriticalError**]
Potential data loss during batching: When `existing_event.batch(event)` fails, both the existing event and the new event are discarded. The existing event should be preserved and the new event should be processed individually.
<details>
<summary>Suggested Change</summary>
```suggestion
Err(e) => {
warn!(
"Error batching event type {}: {}. Processing new event individually.",
batch_key, e
);
// Keep the existing event in batched_events
self.batched_events.insert(batch_key.clone(), existing_event);
// Process the new event directly
self.convert_to_posthog_event(event).await;
}
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
</details>
File: rust/telemetry/src/posthog.rs
Line: 205|
|
||
| self.last_flush_time = Utc::now(); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Memory leak in buffered events: The batched_events HashMap will grow indefinitely for events that never reach their max_batch_size(). Consider implementing a time-based flush for batched events or a maximum retention policy.
// Add to PosthogClient struct:
last_batch_cleanup: DateTime<Utc>,
// In batch_and_flush method, add periodic cleanup:
if (Utc::now() - self.last_batch_cleanup).to_std().unwrap() > Duration::from_secs(300) {
// Flush events that have been batched for too long
let old_events = std::mem::take(&mut self.batched_events);
for (_, event) in old_events {
self.convert_to_posthog_event(event).await;
}
self.last_batch_cleanup = Utc::now();
}Context for Agents
[**CriticalError**]
Memory leak in buffered events: The `batched_events` HashMap will grow indefinitely for events that never reach their `max_batch_size()`. Consider implementing a time-based flush for batched events or a maximum retention policy.
```rust
// Add to PosthogClient struct:
last_batch_cleanup: DateTime<Utc>,
// In batch_and_flush method, add periodic cleanup:
if (Utc::now() - self.last_batch_cleanup).to_std().unwrap() > Duration::from_secs(300) {
// Flush events that have been batched for too long
let old_events = std::mem::take(&mut self.batched_events);
for (_, event) in old_events {
self.convert_to_posthog_event(event).await;
}
self.last_batch_cleanup = Utc::now();
}
```
File: rust/telemetry/src/posthog.rs
Line: 238There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is intentional, otherwise it will flush too often. also the batch sizes are large enough that this should be fine
| // --- Telemetry Capture --- | ||
| let event = CollectionAddEvent::new( | ||
| collection_id.to_string(), | ||
| add_amount, | ||
| with_documents, | ||
| with_metadata, | ||
| with_uris, | ||
| add_amount, // batch_size for this event instance | ||
| ); | ||
| event.submit().await; | ||
| // --- End Telemetry Capture --- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The upsert operation is currently sending a CollectionAddEvent for telemetry. This seems incorrect as upsert can behave as an update, not just an add. The previous Python implementation did not have telemetry for upsert operations.
To avoid sending potentially misleading data, it might be better to remove this telemetry capture for now to match the previous behavior. If upsert telemetry is desired, a dedicated CollectionUpsertEvent should probably be created.
Context for Agents
[**BestPractice**]
The `upsert` operation is currently sending a `CollectionAddEvent` for telemetry. This seems incorrect as `upsert` can behave as an update, not just an add. The previous Python implementation did not have telemetry for `upsert` operations.
To avoid sending potentially misleading data, it might be better to remove this telemetry capture for now to match the previous behavior. If `upsert` telemetry is desired, a dedicated `CollectionUpsertEvent` should probably be created.
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1126| if let Some(handler) = EVENT_SENDER.get() { | ||
| if let Err(e) = handler.send(Box::new(event), None).await { | ||
| warn!("Failed to submit telemetry event: {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Critical Error Handling Issue: The submit_event function silently discards failures with only a warning log. If telemetry submission fails (network issues, invalid API key, etc.), the error is lost and the application continues unaware.
if let Err(e) = handler.send(Box::new(event), None).await {
warn!("Failed to submit telemetry event: {}", e); // Error is lost!
}This can mask critical telemetry infrastructure problems. Consider:
- Adding metrics to track failure rates
- Implementing exponential backoff retry logic for transient failures
- Surfacing persistent failures to monitoring systems
Context for Agents
[**BestPractice**]
**Critical Error Handling Issue**: The `submit_event` function silently discards failures with only a warning log. If telemetry submission fails (network issues, invalid API key, etc.), the error is lost and the application continues unaware.
```rust
if let Err(e) = handler.send(Box::new(event), None).await {
warn!("Failed to submit telemetry event: {}", e); // Error is lost!
}
```
This can mask critical telemetry infrastructure problems. Consider:
1. Adding metrics to track failure rates
2. Implementing exponential backoff retry logic for transient failures
3. Surfacing persistent failures to monitoring systems
File: rust/telemetry/src/events.rs
Line: 20| receiver: Box<dyn ReceiverForMessage<Box<dyn ProductTelemetryEvent + Send + Sync>>>, | ||
| ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Potential Deadlock Risk: The EVENT_SENDER.set() operation in init_receiver can fail silently if called multiple times, but the warning doesn't indicate the severity. If initialization fails, all subsequent submit_event calls will silently do nothing (no telemetry captured).
if EVENT_SENDER.set(receiver).is_err() {
warn!("Failed to initialize telemetry sender"); // System continues broken
}This creates a scenario where the application appears healthy but telemetry is completely broken. Consider:
- Using
expect()to panic on initialization failure (fail-fast) - Or implementing a retry mechanism with exponential backoff
- Adding health check endpoints to verify telemetry system status
Context for Agents
[**BestPractice**]
**Potential Deadlock Risk**: The `EVENT_SENDER.set()` operation in `init_receiver` can fail silently if called multiple times, but the warning doesn't indicate the severity. If initialization fails, all subsequent `submit_event` calls will silently do nothing (no telemetry captured).
```rust
if EVENT_SENDER.set(receiver).is_err() {
warn!("Failed to initialize telemetry sender"); // System continues broken
}
```
This creates a scenario where the application appears healthy but telemetry is completely broken. Consider:
1. Using `expect()` to panic on initialization failure (fail-fast)
2. Or implementing a retry mechanism with exponential backoff
3. Adding health check endpoints to verify telemetry system status
File: rust/telemetry/src/posthog.rs
Line: 169| return; | ||
| } | ||
|
|
||
| let buffer = std::mem::take(&mut self.buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Resource Leak on Component Failure: The batch_and_flush method doesn't handle flush failures properly. If flush() fails, the buffer is already consumed by std::mem::take(), causing permanent data loss:
let buffer = std::mem::take(&mut self.buffer); // Buffer emptied first
if let Err(e) = self.client.capture_batch(buffer).await {
warn!("Failed to send telemetry events: {}", e); // Data lost forever
}Recommend:
let buffer_clone = self.buffer.clone();
if let Ok(_) = self.client.capture_batch(buffer_clone).await {
self.buffer.clear(); // Only clear on success
} else {
warn!("Failed to send telemetry events, keeping in buffer for retry");
}Context for Agents
[**CriticalError**]
**Resource Leak on Component Failure**: The `batch_and_flush` method doesn't handle flush failures properly. If `flush()` fails, the `buffer` is already consumed by `std::mem::take()`, causing permanent data loss:
```rust
let buffer = std::mem::take(&mut self.buffer); // Buffer emptied first
if let Err(e) = self.client.capture_batch(buffer).await {
warn!("Failed to send telemetry events: {}", e); // Data lost forever
}
```
Recommend:
```rust
let buffer_clone = self.buffer.clone();
if let Ok(_) = self.client.capture_batch(buffer_clone).await {
self.buffer.clear(); // Only clear on success
} else {
warn!("Failed to send telemetry events, keeping in buffer for retry");
}
```
File: rust/telemetry/src/posthog.rs
Line: 221| if let Some(parent) = path.parent() { | ||
| if fs::create_dir_all(parent).is_err() { | ||
| return UNKNOWN_USER_ID.to_string(); | ||
| } | ||
| } | ||
|
|
||
| let user_id = uuid::Uuid::new_v4().to_string(); | ||
| if fs::write(&path, &user_id).is_err() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
File System Race Condition: The telemetry user ID creation has a race condition where multiple processes could simultaneously create the same directory and file:
if let Some(parent) = path.parent() {
if fs::create_dir_all(parent).is_err() { // Process A creates dir
return UNKNOWN_USER_ID.to_string();
}
}
let user_id = uuid::Uuid::new_v4().to_string();
if fs::write(&path, &user_id).is_err() { // Process B overwrites file
return UNKNOWN_USER_ID.to_string();
}This can result in different processes having different user IDs when they should share the same one. Use file locking or atomic operations:
use std::fs::OpenOptions;
use std::io::Write;
// Try to create file exclusively (atomic)
match OpenOptions::new().create_new(true).write(true).open(&path) {
Ok(mut file) => {
let user_id = uuid::Uuid::new_v4().to_string();
let _ = file.write_all(user_id.as_bytes());
user_id
}
Err(_) => fs::read_to_string(&path).unwrap_or_else(|_| UNKNOWN_USER_ID.to_string())
}Context for Agents
[**CriticalError**]
**File System Race Condition**: The telemetry user ID creation has a race condition where multiple processes could simultaneously create the same directory and file:
```rust
if let Some(parent) = path.parent() {
if fs::create_dir_all(parent).is_err() { // Process A creates dir
return UNKNOWN_USER_ID.to_string();
}
}
let user_id = uuid::Uuid::new_v4().to_string();
if fs::write(&path, &user_id).is_err() { // Process B overwrites file
return UNKNOWN_USER_ID.to_string();
}
```
This can result in different processes having different user IDs when they should share the same one. Use file locking or atomic operations:
```rust
use std::fs::OpenOptions;
use std::io::Write;
// Try to create file exclusively (atomic)
match OpenOptions::new().create_new(true).write(true).open(&path) {
Ok(mut file) => {
let user_id = uuid::Uuid::new_v4().to_string();
let _ = file.write_all(user_id.as_bytes());
user_id
}
Err(_) => fs::read_to_string(&path).unwrap_or_else(|_| UNKNOWN_USER_ID.to_string())
}
```
File: rust/python_bindings/src/bindings.rs
Line: 54| let telemetry_config = TelemetryConfig { | ||
| user_id: Some(telemetry_user_id), | ||
| is_server: false, | ||
| chroma_version: Some(version), | ||
| anonymized_telemetry: Some(anonymized_telemetry), | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The ClientStartEvent, which was previously sent from the Python client constructors, seems to be missing in this new Rust-based telemetry implementation. This event is useful for tracking client initializations. It should probably be sent from the py_new constructor after the telemetry system is configured. This would require adding a ClientStartEvent to rust/telemetry/src/events.rs.
Context for Agents
[**BestPractice**]
The `ClientStartEvent`, which was previously sent from the Python client constructors, seems to be missing in this new Rust-based telemetry implementation. This event is useful for tracking client initializations. It should probably be sent from the `py_new` constructor after the telemetry system is configured. This would require adding a `ClientStartEvent` to `rust/telemetry/src/events.rs`.
File: rust/python_bindings/src/bindings.rs
Line: 158
Description of changes
This PR moves posthog logic to rust for all events. Captures occur on service_based_frontend, and for now only occur when users use via rust bindings.
Test plan
How are these changes tested?
pytestfor python,yarn testfor js,cargo testfor rustDocumentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the docs repository?