Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: trace async to sync demo #3535

Draft
wants to merge 56 commits into
base: main
Choose a base branch
from

Conversation

domyway
Copy link
Contributor

@domyway domyway commented May 17, 2024

Summary by CodeRabbit

  • New Features

    • Introduced new metrics for tracking ingestion and span operations.
    • Added new error handling capabilities with BufferWriteError.
  • Enhancements

    • Improved trace request validation and buffer handling.
    • Enhanced SchemaRecords to include record_id for better traceability.
  • Refactor

    • Updated various structs to derive Debug and Clone for improved debugging and cloning capabilities.
    • Replaced tokio::sync::Mutex with tokio::sync::RwLock for better concurrency handling.
  • Bug Fixes

    • Added default handling for missing headers in trace requests.
  • Chores

    • Updated dependencies to improve performance and stability.

Copy link

coderabbitai bot commented May 27, 2024

Walkthrough

The changes primarily focus on enhancing the tracing and logging capabilities, introducing new metrics for monitoring, and improving error handling. Key updates include adding Clone and Debug traits to several structs, incorporating a new WriteBufferFlusher for trace data management, and adding new metrics for lock times and spans. Additionally, the SchemaRecords struct has been updated across multiple files to include a new record_id field.

Changes

Files Change Summaries
Cargo.toml, src/ingester/Cargo.toml Added workspace settings for opentelemetry-proto, crossbeam-channel, and tonic. Updated dependencies.
src/common/meta/stream.rs Added #[derive(Debug, Clone)] to SchemaRecords.
src/common/meta/traces.rs Added Clone trait to ExportTraceServiceResponse and ExportTracePartialSuccess structs.
src/config/src/metrics.rs Added new static variables for tracking metrics related to lock times and spans.
src/handler/grpc/request/traces.rs Introduced WriteBufferFlusher and refactored request processing logic.
src/handler/http/request/traces/mod.rs Added WriteBufferFlusher and refactored request handling logic to include default value for in_stream_name variable.
src/infra/src/errors/mod.rs Introduced BufferWriteError enum with various error variants.
src/ingester/src/entry.rs Added session_id field to Entry struct and updated relevant methods.
src/ingester/src/lib.rs Made memtable module public.
src/ingester/src/writer.rs Refactored Writer struct and methods to use wal_next_seq and memtable_next_seq. Added new methods for writing to memtable and WAL.
src/job/files/parquet.rs, src/service/enrichment_table/mod.rs, src/service/logs/bulk.rs, src/service/logs/mod.rs, src/service/metadata/distinct_values.rs, src/service/metadata/trace_list_index.rs, src/service/metrics/json.rs, src/service/metrics/otlp_grpc.rs, src/service/metrics/otlp_http.rs, src/service/metrics/prom.rs Added record_id field with a default value to SchemaRecords struct.
src/service/traces/flusher.rs Introduced WriteBufferFlusher and related functionality for buffering and flushing trace data.

Sequence Diagram(s) (Beta)

sequenceDiagram
    participant Client
    participant HTTP_Server
    participant Trace_Handler
    participant WriteBufferFlusher
    participant Buffer
    participant Exporter

    Client->>HTTP_Server: Send Trace Data
    HTTP_Server->>Trace_Handler: Forward Request
    Trace_Handler->>WriteBufferFlusher: Write to Buffer
    WriteBufferFlusher->>Buffer: Buffer Data
    Note right of WriteBufferFlusher: Periodic Flush
    WriteBufferFlusher->>Exporter: Export Buffered Data
    Exporter-->>WriteBufferFlusher: Acknowledge
    WriteBufferFlusher-->>Trace_Handler: Acknowledge
    Trace_Handler-->>HTTP_Server: Response
    HTTP_Server-->>Client: Response

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to full the review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 90cc1b2 and 0dfaaad.
Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
Files selected for processing (19)
  • Cargo.toml (3 hunks)
  • src/common/meta/stream.rs (1 hunks)
  • src/common/meta/traces.rs (2 hunks)
  • src/config/src/metrics.rs (4 hunks)
  • src/handler/grpc/request/traces.rs (2 hunks)
  • src/handler/http/request/traces/mod.rs (3 hunks)
  • src/infra/src/errors/mod.rs (1 hunks)
  • src/ingester/Cargo.toml (1 hunks)
  • src/ingester/src/entry.rs (4 hunks)
  • src/ingester/src/lib.rs (1 hunks)
  • src/ingester/src/writer.rs (7 hunks)
  • src/main.rs (11 hunks)
  • src/router/grpc/ingest/traces.rs (4 hunks)
  • src/service/ingestion/mod.rs (5 hunks)
  • src/service/mod.rs (1 hunks)
  • src/service/traces/flusher.rs (1 hunks)
  • src/service/traces/mod.rs (4 hunks)
  • src/service/traces/otlp_http.rs (1 hunks)
  • src/service/traces/validator.rs (1 hunks)
Files skipped from review due to trivial changes (4)
  • src/common/meta/stream.rs
  • src/common/meta/traces.rs
  • src/ingester/Cargo.toml
  • src/ingester/src/lib.rs
Additional comments not posted (41)
src/service/mod.rs (1)

97-102: The modification to handle empty stream names by returning "default" is a good practice for robustness and fault tolerance.

src/router/grpc/ingest/traces.rs (2)

45-46: Properly generating and inserting the session ID into the metadata enhances traceability and debugging capabilities.


75-85: Including session ID and elapsed time in error logs is a best practice for effective debugging and monitoring.

src/handler/grpc/request/traces.rs (2)

22-30: Introducing a flusher to handle buffered writes of trace data is an effective way to enhance performance and reliability.


47-75: Robust error handling in trace request processing enhances the system's reliability and user experience by providing clear feedback and preventing crashes.

src/service/traces/mod.rs (2)

47-59: Adding new parameters to the handle_trace_request function enhances its flexibility and allows for more detailed handling of trace data.


Line range hint 64-114: Proper error handling and metrics reporting in the handle_trace_request function provide valuable insights and enhance the system's reliability.

src/ingester/src/entry.rs (1)

Line range hint 38-49: Adding a session_id field to the Entry struct is a good practice for enhanced traceability and session management.

src/service/traces/otlp_http.rs (2)

38-70: Introducing new parameters in the traces_proto and traces_json functions enhances their flexibility and allows for more detailed handling of trace data.


47-105: Robust error handling in the traces_proto and traces_json functions enhances the system's reliability and user experience by providing clear feedback and preventing crashes.

Cargo.toml (5)

109-109: Enabled workspace features for opentelemetry-proto.

This change aligns with the PR's focus on enhancing trace functionality.


158-158: Enabled workspace features for crossbeam-channel.

This is likely used to improve the handling of concurrent data structures, which is beneficial for the system's performance.


282-288: Updated opentelemetry-proto dependency with additional features.

The added features (gen-tonic, serde, logs, metrics, trace) are essential for the expanded functionality in trace handling.


289-289: Updated crossbeam-channel dependency.

Ensuring the latest version of crossbeam-channel is used can help leverage the latest improvements and bug fixes.


290-290: Added futures-locks dependency.

This addition supports the new asynchronous to synchronous tracing feature by managing concurrent access more efficiently.

src/service/traces/flusher.rs (4)

36-41: Introduced ExportRequest enum with TraceEntry variant.

This change supports structured handling of trace entries, which is crucial for the new tracing feature.


124-150: Implemented asynchronous write method in WriteBufferFlusher.

This method correctly handles asynchronous writing of trace data, which is essential for performance in a trace-heavy environment.


157-203: Implemented run_trace_io_flush function to handle IO operations.

This function is well-implemented to handle IO operations efficiently, crucial for the performance of trace data flushing.


205-275: Implemented run_trace_op_buffer function for operation buffering.

This function effectively manages trace operations buffering, which is critical for the new tracing functionality.

src/infra/src/errors/mod.rs (1)

79-92: Introduced BufferWriteError enum to handle specific buffer write errors.

This new error type is essential for detailed error handling in the buffer management system, improving the robustness of error reporting.

src/ingester/src/writer.rs (5)

61-63: Updated sequence handling in Writer struct.

Splitting the sequence into wal_next_seq and memtable_next_seq allows for more granular control and is a good practice for handling different types of data storage.


Line range hint 144-174: Refactored Writer::new to initialize new sequence variables.

The refactoring enhances clarity and ensures that sequence numbers are handled correctly from the start.


Line range hint 182-234: Enhanced write method in Writer to handle WAL rotations and memtable checks.

This method is crucial for ensuring data integrity and performance by managing how data is written to storage.


297-352: Added write_memtable method to handle writing to the memtable.

This method is well-implemented to handle the complexities of writing to the memtable, including handling TTL and rotation conditions.


354-412: Added write_wal method to handle writing to the WAL.

This method effectively manages WAL operations, crucial for maintaining the integrity and performance of the logging system.

src/handler/http/request/traces/mod.rs (4)

32-35: Ensure that the added imports are used appropriately in the file.


58-58: Ensure that the flusher parameter is properly utilized within the function.


89-91: Check the integration of flusher in the conditional logic for content type handling.


79-87: Validate the default handling for in_stream_name and ensure it aligns with expected behavior when headers are missing.

#!/bin/bash
# Description: Verify the default handling for `in_stream_name`.

# Test: Search for the usage of `in_stream_name`. Expect: Default value "default" when headers are missing.
rg --type rust $'in_stream_name'
src/service/ingestion/mod.rs (3)

388-388: Ensure that the session_id is correctly passed to the write function.


403-438: Review the handling of session_id in the write_wal_file function. Ensure consistency with write_file.


367-372: Check the extraction logic for stream_name and session_id from a combined string. Ensure it handles cases where the separator is not present.

#!/bin/bash
# Description: Verify the session ID extraction logic.

# Test: Search for the usage of `stream_name.split('^')`. Expect: Proper handling when separator is not present.
rg --type rust $'stream_name.split'
src/config/src/metrics.rs (4)

201-212: The definition and registration of INGEST_MEMTABLE_LOCK_TIME are correctly implemented.

Also applies to: 650-651


214-225: The definition and registration of INGEST_WAL_LOCK_TIME are correctly implemented.

Also applies to: 653-654


569-584: The definition and registration of SPAN_CALLS are correctly implemented.

Also applies to: 754-755


586-605: The definition and registration of SPAN_DURATION_MILLISECONDS are correctly implemented.

Also applies to: 757-758

src/main.rs (3)

88-88: The integration of WriteBufferFlusher into the application's gRPC and HTTP servers is correctly implemented, aligning with the PR's objectives.

Also applies to: 212-218, 243-246, 294-327, 395-484, 490-492


Line range hint 294-327: The modifications in the init_common_grpc_server function to accommodate tracer_flusher are correctly implemented.


Line range hint 395-484: The integration of tracer_flusher in the HTTP server initialization functions is correctly implemented.

Also applies to: 490-492

src/service/traces/validator.rs (2)

92-368: Refactor large function to improve maintainability.
[REFACTOR_SUGGESTion]
The function validate_trace_request is quite large and handles multiple responsibilities. Consider breaking it down into smaller, more manageable functions. This can improve readability and maintainability.

- async fn validate_trace_request(org_id: &str, request: ExportTraceServiceRequest, in_stream_name: Option<&str>) -> Result<...> {
+ // Refactored into smaller functions
+ async fn validate_trace_request(org_id: &str, request: ExportTraceServiceRequest, in_stream_name: Option<&str>) -> Result<...> {
+     let traces_stream_name = determine_stream_name(in_stream_name);
+     let partition_details = fetch_partition_details(org_id, &traces_stream_name).await?;
+     process_trace_request(org_id, request, &traces_stream_name, partition_details).await
+ }

59-91: Ensure proper error handling for asynchronous operations.

The function get_partition_keys performs several asynchronous operations but does not handle potential errors explicitly, which might lead to panics or unhandled errors. Consider adding error handling for these operations.

#!/bin/bash
# Description: Verify proper error handling in asynchronous operations in `get_partition_keys`.

# Test: Search for error handling patterns in the function.
ast-grep --lang rust --pattern $'async fn get_partition_keys($_, $_, $_) {
  $$$
  .await;
  $$$
}'

Comment on lines 370 to 699
CONFIG.common.column_timestamp.clone(),
json::Value::Number(timestamp.into()),
);

// get distinct_value item
for field in DISTINCT_FIELDS.iter() {
if let Some(val) = record_val.get(field) {
if !val.is_null() {
let (filter_name, filter_value) = if field == "operation_name" {
("service_name".to_string(), service_name.clone())
} else {
("".to_string(), "".to_string())
};
distinct_values.push(MetadataItem::DistinctValues(DvItem {
stream_type: StreamType::Traces,
stream_name: traces_stream_name.to_string(),
field_name: field.to_string(),
field_value: val.as_str().unwrap().to_string(),
filter_name,
filter_value,
}));
}
}
}

// build trace metadata
trace_index.push(MetadataItem::TraceListIndexer(TraceListItem {
stream_name: traces_stream_name.to_string(),
service_name: service_name.clone(),
trace_id,
_timestamp: start_time / 1000,
}));

// check schema
let _ = check_for_schema(
org_id,
&traces_stream_name,
StreamType::Traces,
&mut traces_schema_map,
vec![&record_val],
timestamp.try_into().unwrap(),
)
.await;

if trigger.is_none() && !stream_alerts_map.is_empty() {
// Start check for alert trigger
let key =
format!("{}/{}/{}", &org_id, StreamType::Traces, traces_stream_name);
if let Some(alerts) = stream_alerts_map.get(&key) {
let mut trigger_alerts: TriggerAlertData = Vec::new();
for alert in alerts {
if let Ok(Some(v)) = alert.evaluate(Some(&record_val)).await {
trigger_alerts.push((alert.clone(), v));
}
}
trigger = Some(trigger_alerts);
}
// End check for alert trigger
}

// get hour key
let rec_schema = traces_schema_map
.get(&traces_stream_name)
.unwrap()
.schema()
.clone()
.with_metadata(HashMap::new());
let schema_key = rec_schema.hash_key();
let hour_key = crate::service::ingestion::get_wal_time_key(
timestamp.try_into().unwrap(),
&partition_keys,
partition_time_level,
&record_val,
Some(&schema_key),
);

let hour_buf = data_buf.entry(hour_key).or_insert_with(|| SchemaRecords {
schema_key,
schema: Arc::new(rec_schema),
records: vec![],
records_size: 0,
});
// let record_val = record_val.to_owned();
let record_val = json::Value::Object(record_val);
let record_size = json::estimate_json_bytes(&record_val);
hour_buf.records.push(Arc::new(record_val));
hour_buf.records_size += record_size;
}
}
}
}

Ok((
data_buf,
trigger,
distinct_values,
trace_index,
partial_success,
))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using more robust error handling for JSON parsing.

The function validate_json_trace_request assumes the presence of certain JSON keys and structures without validation, which could lead to panics if the expected structure is not met. Consider adding checks before unwrapping.

- let resource = res_span.get("resource").unwrap().as_object().unwrap();
+ let resource = res_span.get("resource").ok_or(BufferWriteError::HttpBadRequest("Missing 'resource' key".to_string()))?.as_object().ok_or(BufferWriteError::HttpBadRequest("Expected 'resource' to be an object".to_string()))?;

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
pub async fn validate_json_trace_request(
org_id: &str,
body: web::Bytes,
in_stream_name: Option<&str>,
) -> Result<
(
HashMap<String, SchemaRecords>,
Option<TriggerAlertData>,
Vec<MetadataItem>,
Vec<MetadataItem>,
ExportTracePartialSuccess,
),
BufferWriteError,
> {
basic_check(org_id)?;
let mut runtime = crate::service::ingestion::init_functions_runtime();
let mut traces_schema_map: HashMap<String, SchemaCache> = HashMap::new();
let mut stream_alerts_map: HashMap<String, Vec<Alert>> = HashMap::new();
let mut distinct_values = Vec::with_capacity(16);
let traces_stream_name = match in_stream_name {
Some(name) => format_stream_name(name),
None => "default".to_string(),
};
let (partition_keys, partition_time_level) =
get_partition_keys(org_id, traces_stream_name.as_str(), &mut traces_schema_map).await;
// Start get stream alerts
crate::service::ingestion::get_stream_alerts(
&[StreamParams {
org_id: org_id.to_owned().into(),
stream_name: traces_stream_name.to_owned().into(),
stream_type: StreamType::Traces,
}],
&mut stream_alerts_map,
)
.await;
// End get stream alert
// Start Register Transforms for stream
let (local_trans, stream_vrl_map) = crate::service::ingestion::register_stream_functions(
org_id,
&StreamType::Traces,
&traces_stream_name,
);
// End Register Transforms for stream
let mut trigger: Option<TriggerAlertData> = None;
let min_ts = (Utc::now() - Duration::try_hours(CONFIG.limit.ingest_allowed_upto).unwrap())
.timestamp_micros();
let mut partial_success = crate::common::meta::traces::ExportTracePartialSuccess::default();
let mut data_buf: HashMap<String, SchemaRecords> = HashMap::new();
let mut service_name: String = traces_stream_name.to_string();
// let export_req: ExportTraceServiceRequest =
// json::from_slice(body.as_ref()).unwrap();
let body: json::Value = match json::from_slice(body.as_ref()) {
Ok(v) => v,
Err(e) => {
return Err(BufferWriteError::HttpBadRequest(format!(
"Invalid json: {}",
e
)));
}
};
let spans = match body.get("resourceSpans") {
Some(json::Value::Array(v)) => v,
_ => {
return Err(BufferWriteError::HttpBadRequest(
"Invalid json: the structure must be {{\"resourceSpans\":[]}}".to_string(),
));
}
};
let mut trace_index = Vec::with_capacity(spans.len());
for res_span in spans.iter() {
let mut service_att_map: HashMap<String, json::Value> = HashMap::new();
if res_span.get("resource").is_some() {
let resource = res_span.get("resource").unwrap().as_object().unwrap();
if resource.get("attributes").is_some() {
let attributes = resource.get("attributes").unwrap().as_array().unwrap();
for res_attr in attributes {
let local_attr = res_attr.as_object().unwrap();
if local_attr
.get("key")
.unwrap()
.as_str()
.unwrap()
.eq(SERVICE_NAME)
{
let loc_service_name =
local_attr.get("value").unwrap().as_object().unwrap();
for item in loc_service_name {
service_name = item.1.as_str().unwrap().to_string();
service_att_map.insert(SERVICE_NAME.to_string(), item.1.clone());
}
} else {
service_att_map.insert(
format!(
"{}.{}",
SERVICE,
local_attr.get("key").unwrap().as_str().unwrap()
),
get_val_for_attr(local_attr.get("value").unwrap().clone()),
);
}
}
}
}
let scope_resources = res_span.get("scopeSpans");
let inst_resources = if let Some(v) = scope_resources {
v.as_array().unwrap()
} else {
res_span
.get("instrumentationLibrarySpans")
.unwrap()
.as_array()
.unwrap()
};
for inst_span in inst_resources {
if inst_span.get("spans").is_some() {
let spans = inst_span.get("spans").unwrap().as_array().unwrap();
for span in spans {
let span_id: String = span.get("spanId").unwrap().as_str().unwrap().to_string();
let trace_id: String =
span.get("traceId").unwrap().as_str().unwrap().to_string();
let mut span_ref = HashMap::new();
if span.get("parentSpanId").is_some() {
span_ref.insert(
PARENT_SPAN_ID.to_string(),
span.get("parentSpanId")
.unwrap()
.as_str()
.unwrap()
.to_string(),
);
span_ref.insert(PARENT_TRACE_ID.to_string(), trace_id.clone());
span_ref
.insert(REF_TYPE.to_string(), format!("{:?}", SpanRefType::ChildOf));
}
let start_time = json::get_uint_value(span.get("startTimeUnixNano").unwrap());
let end_time = json::get_uint_value(span.get("endTimeUnixNano").unwrap());
let mut span_att_map: HashMap<String, json::Value> = HashMap::new();
let attributes = span.get("attributes").unwrap().as_array().unwrap();
for span_att in attributes {
let mut key = span_att.get("key").unwrap().as_str().unwrap().to_string();
if BLOCK_FIELDS.contains(&key.as_str()) {
key = format!("attr_{}", key);
}
span_att_map.insert(
key,
get_val_for_attr(span_att.get("value").unwrap().clone()),
);
}
let mut events = vec![];
let mut event_att_map: HashMap<String, json::Value> = HashMap::new();
let empty_vec = Vec::new();
let span_events = match span.get("events") {
Some(v) => v.as_array().unwrap(),
None => &empty_vec,
};
for event in span_events {
let attributes = event.get("attributes").unwrap().as_array().unwrap();
for event_att in attributes {
event_att_map.insert(
event_att.get("key").unwrap().as_str().unwrap().to_string(),
get_val_for_attr(event_att.get("value").unwrap().clone()),
);
}
events.push(Event {
name: event.get("name").unwrap().as_str().unwrap().to_string(),
_timestamp: json::get_uint_value(event.get("timeUnixNano").unwrap()),
attributes: event_att_map.clone(),
})
}
let timestamp = start_time / 1000;
let local_val = Span {
trace_id: trace_id.clone(),
span_id,
span_kind: span.get("kind").unwrap().to_string(),
span_status: json::get_string_value(
span.get("status")
.unwrap_or(&json::Value::String("UNSET".to_string())),
),
operation_name: span.get("name").unwrap().as_str().unwrap().to_string(),
start_time,
end_time,
duration: (end_time - start_time) / 1000, // microseconds
reference: span_ref,
service_name: service_name.clone(),
attributes: span_att_map,
service: service_att_map.clone(),
flags: 1, // TODO add appropriate value
events: json::to_string(&events).unwrap(),
};
if timestamp < min_ts.try_into().unwrap() {
partial_success.rejected_spans += 1;
continue;
}
let mut value: json::Value = json::to_value(local_val).unwrap();
// JSON Flattening
value = flatten::flatten(value)
.map_err(|e| BufferWriteError::IoError(e.to_string()))?;
if !local_trans.is_empty() {
value = crate::service::ingestion::apply_stream_functions(
&local_trans,
value,
&stream_vrl_map,
&traces_stream_name,
&mut runtime,
)
.map_err(|e| BufferWriteError::IoError(e.to_string()))?;
}
// End row based transform */
// get json object
let mut record_val = match value.take() {
json::Value::Object(v) => v,
_ => unreachable!(),
};
record_val.insert(
CONFIG.common.column_timestamp.clone(),
json::Value::Number(timestamp.into()),
);
// get distinct_value item
for field in DISTINCT_FIELDS.iter() {
if let Some(val) = record_val.get(field) {
if !val.is_null() {
let (filter_name, filter_value) = if field == "operation_name" {
("service_name".to_string(), service_name.clone())
} else {
("".to_string(), "".to_string())
};
distinct_values.push(MetadataItem::DistinctValues(DvItem {
stream_type: StreamType::Traces,
stream_name: traces_stream_name.to_string(),
field_name: field.to_string(),
field_value: val.as_str().unwrap().to_string(),
filter_name,
filter_value,
}));
}
}
}
// build trace metadata
trace_index.push(MetadataItem::TraceListIndexer(TraceListItem {
stream_name: traces_stream_name.to_string(),
service_name: service_name.clone(),
trace_id,
_timestamp: start_time / 1000,
}));
// check schema
let _ = check_for_schema(
org_id,
&traces_stream_name,
StreamType::Traces,
&mut traces_schema_map,
vec![&record_val],
timestamp.try_into().unwrap(),
)
.await;
if trigger.is_none() && !stream_alerts_map.is_empty() {
// Start check for alert trigger
let key =
format!("{}/{}/{}", &org_id, StreamType::Traces, traces_stream_name);
if let Some(alerts) = stream_alerts_map.get(&key) {
let mut trigger_alerts: TriggerAlertData = Vec::new();
for alert in alerts {
if let Ok(Some(v)) = alert.evaluate(Some(&record_val)).await {
trigger_alerts.push((alert.clone(), v));
}
}
trigger = Some(trigger_alerts);
}
// End check for alert trigger
}
// get hour key
let rec_schema = traces_schema_map
.get(&traces_stream_name)
.unwrap()
.schema()
.clone()
.with_metadata(HashMap::new());
let schema_key = rec_schema.hash_key();
let hour_key = crate::service::ingestion::get_wal_time_key(
timestamp.try_into().unwrap(),
&partition_keys,
partition_time_level,
&record_val,
Some(&schema_key),
);
let hour_buf = data_buf.entry(hour_key).or_insert_with(|| SchemaRecords {
schema_key,
schema: Arc::new(rec_schema),
records: vec![],
records_size: 0,
});
// let record_val = record_val.to_owned();
let record_val = json::Value::Object(record_val);
let record_size = json::estimate_json_bytes(&record_val);
hour_buf.records.push(Arc::new(record_val));
hour_buf.records_size += record_size;
}
}
}
}
Ok((
data_buf,
trigger,
distinct_values,
trace_index,
partial_success,
))
}
for res_span in spans.iter() {
let mut service_att_map: HashMap<String, json::Value> = HashMap::new();
if res_span.get("resource").is_some() {
let resource = res_span.get("resource").ok_or(BufferWriteError::HttpBadRequest("Missing 'resource' key".to_string()))?.as_object().ok_or(BufferWriteError::HttpBadRequest("Expected 'resource' to be an object".to_string()))?;
if resource.get("attributes").is_some() {
let attributes = resource.get("attributes").unwrap().as_array().unwrap();
for res_attr in attributes {
let local_attr = res_attr.as_object().unwrap();
if local_attr
.get("key")
.unwrap()
.as_str()
.unwrap()
.eq(SERVICE_NAME)
{
let loc_service_name =
local_attr.get("value").unwrap().as_object().unwrap();
for item in loc_service_name {
service_name = item.1.as_str().unwrap().to_string();
service_att_map.insert(SERVICE_NAME.to_string(), item.1.clone());
}
} else {
service_att_map.insert(
format!(
"{}.{}",
SERVICE,
local_attr.get("key").unwrap().as_str().unwrap()
),
get_val_for_attr(local_attr.get("value").unwrap().clone()),
);
}
}
}
}

Comment on lines +74 to +122
#[derive(Debug)]
pub struct WriteBufferFlusher {
join_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
wal_io_handle: Mutex<Option<std::thread::JoinHandle<()>>>,
pub shutdown_tx: Option<watch::Sender<()>>,
buffer_tx: mpsc::Sender<BufferedWrite>,
}

impl Default for WriteBufferFlusher {
fn default() -> Self {
Self::new()
}
}

impl WriteBufferFlusher {
pub fn new() -> Self {
let (trace_shutdown_tx, trace_shutdown_rx) = watch::channel(());
let (buffer_tx, buffer_rx) = mpsc::channel::<BufferedWrite>(flusher::BUFFER_CHANNEL_LIMIT);
let (io_flush_tx, io_flush_rx) = crossbeam_channel::bounded(1);
let (io_flush_notify_tx, io_flush_notify_rx) = crossbeam_channel::bounded(1);

let flusher = Self {
join_handle: Default::default(),
wal_io_handle: Default::default(),
shutdown_tx: Some(trace_shutdown_tx),
buffer_tx,
};

*flusher.wal_io_handle.lock() = Some(
std::thread::Builder::new()
.name("write buffer io flusher".to_string())
.spawn(move || {
run_trace_io_flush(io_flush_rx, io_flush_notify_tx);
})
.expect("failed to spawn write buffer io flusher thread"),
);

*flusher.join_handle.lock() = Some(tokio::task::spawn(async move {
run_trace_op_buffer(
buffer_rx,
io_flush_tx,
io_flush_notify_rx,
trace_shutdown_rx,
)
.await
}));

flusher
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored WriteBufferFlusher to handle trace flushing.

The initialization of WriteBufferFlusher could be simplified to improve readability and maintainability.

impl WriteBufferFlusher {
    pub fn new() -> Self {
        let (trace_shutdown_tx, trace_shutdown_rx, buffer_tx, buffer_rx, io_flush_tx, io_flush_rx, io_flush_notify_tx, io_flush_notify_rx) = Self::setup_channels();
        let flusher = Self {
            join_handle: Default::default(),
            wal_io_handle: Default::default(),
            shutdown_tx: Some(trace_shutdown_tx),
            buffer_tx,
        };
        flusher.start_io_flush_thread(io_flush_rx, io_flush_notify_tx);
        flusher.start_trace_op_buffer_thread(buffer_rx, io_flush_tx, io_flush_notify_rx, trace_shutdown_rx);
        flusher
    }
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 0dfaaad and fe25392.
Files selected for processing (14)
  • src/common/meta/stream.rs (1 hunks)
  • src/ingester/src/writer.rs (5 hunks)
  • src/job/files/parquet.rs (1 hunks)
  • src/service/enrichment_table/mod.rs (1 hunks)
  • src/service/ingestion/mod.rs (5 hunks)
  • src/service/logs/bulk.rs (1 hunks)
  • src/service/logs/mod.rs (2 hunks)
  • src/service/metadata/distinct_values.rs (1 hunks)
  • src/service/metadata/trace_list_index.rs (1 hunks)
  • src/service/metrics/json.rs (1 hunks)
  • src/service/metrics/otlp_grpc.rs (1 hunks)
  • src/service/metrics/otlp_http.rs (1 hunks)
  • src/service/metrics/prom.rs (1 hunks)
  • src/service/traces/validator.rs (1 hunks)
Files skipped from review as they are similar to previous changes (2)
  • src/common/meta/stream.rs
  • src/service/ingestion/mod.rs
Additional comments not posted (12)
src/service/metadata/trace_list_index.rs (1)

112-112: The addition of record_id initialized to 0 in SchemaRecords should be reviewed for its intended use and consistency across the application.

src/service/enrichment_table/mod.rs (1)

213-213: The addition of record_id initialized to 0 in SchemaRecords should be reviewed for its intended use and consistency across the application.

src/service/metrics/json.rs (1)

266-266: The addition of record_id initialized to 0 in SchemaRecords should be reviewed for its intended use and consistency across the application.

Verification successful

The record_id field is consistently initialized to 0 across most modules, except in src/service/traces/validator.rs, where it starts at 1 and is incremented within loops. This discrepancy suggests different usage contexts for record_id.

  • In src/service/traces/validator.rs, record_id is used to uniquely identify records within a loop, hence the need for incrementation.
  • In other modules, record_id is initialized to 0, likely serving as a static identifier or placeholder.

Given this context, the addition of record_id initialized to 0 in SchemaRecords in src/service/metrics/json.rs appears consistent with its usage in other modules, except for the specific case in src/service/traces/validator.rs.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage of `record_id` across different modules to ensure consistency.

# Test: Search for `record_id` usage in other modules. Expect: Consistent initialization and usage.
rg --type rust $'record_id'

Length of output: 1344



Script:

#!/bin/bash
# Extract relevant code sections from `src/service/traces/validator.rs` to analyze the usage of `record_id`.

# Extract lines containing `record_id` and their surrounding context.
rg --type rust --context 5 'record_id' src/service/traces/validator.rs

Length of output: 2375

src/ingester/src/writer.rs (3)

30-30: Usage of tokio::sync::RwLock is appropriate here for asynchronous read/write operations.


223-227: Good use of metrics to track the lock time for WAL operations.


Line range hint 175-189: Ensure proper error handling for entry.into_bytes() as it might throw an error which is not currently handled.

Verification successful

The rg command output shows two instances of entry.into_bytes() in src/ingester/src/writer.rs. Both instances use the ? operator, which propagates errors to the calling function. This indicates that errors from entry.into_bytes() are being handled by propagating them up the call stack.

To ensure that this error propagation is correctly handled, we need to verify the context in which these instances are used. Specifically, we should check the function signatures and the surrounding code to confirm that errors are properly managed.


The write and write_wal methods in src/ingester/src/writer.rs both use the ? operator to propagate errors from entry.into_bytes(). The write method returns Result<()>, and the write_wal method returns Result<Option<(String, String)>>, ensuring that any errors are properly propagated to the caller.

  • write method:

    pub async fn write(
        &self,
        _schema: Arc<Schema>,
        mut entry: Entry,
        check_ttl: bool,
    ) -> Result<()> {
        if entry.data.is_empty() && !check_ttl {
            return Ok(());
        }
        let entry_bytes = if !check_ttl {
            entry.into_bytes()?
        } else {
            Vec::new()
        };
        // ...
    }
  • write_wal method:

    pub async fn write_wal(&self, mut entry: Entry) -> Result<Option<(String, String)>> {
        if entry.data.is_empty() {
            return Ok(None);
        }
        let entry_bytes = entry.into_bytes()?;
        // ...
    }

Both methods correctly handle errors by propagating them using the ? operator, ensuring that any issues with entry.into_bytes() are managed appropriately.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify proper error handling for `entry.into_bytes()` in `write` method.

# Test: Search for error handling patterns around `entry.into_bytes()`.
rg --type rust 'entry.into_bytes()'

Length of output: 170



Script:

#!/bin/bash
# Description: Verify the context and error handling for `entry.into_bytes()` in `src/ingester/src/writer.rs`.

# Extract the function definitions and surrounding code for the instances of `entry.into_bytes()`.
rg -A 10 -B 10 'entry.into_bytes()' src/ingester/src/writer.rs

Length of output: 1624

src/service/logs/mod.rs (2)

393-393: Adding a record_id with a default value of 0 to SchemaRecords is a straightforward change. Ensure that all uses of SchemaRecords properly handle this new field.

Verification successful

The addition of the record_id field with a default value of 0 to SchemaRecords is handled consistently across the codebase. The field is being set and incremented appropriately in various files.

  • src/job/files/parquet.rs
  • src/service/enrichment_table/mod.rs
  • src/service/ingestion/mod.rs
  • src/service/logs/bulk.rs
  • src/service/logs/mod.rs
  • src/service/metrics/prom.rs
  • src/service/metrics/json.rs
  • src/service/metrics/otlp_grpc.rs
  • src/service/metrics/otlp_http.rs
  • src/service/metadata/distinct_values.rs
  • src/service/metadata/trace_list_index.rs
  • src/service/traces/validator.rs
Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all uses of `SchemaRecords` handle the new `record_id` field.

# Test: Search for usage of `SchemaRecords` and check handling of `record_id`.
rg --type rust 'SchemaRecords' --files-with-matches | xargs rg 'record_id'

Length of output: 1390


444-444: The addition of record_id in another context within SchemaRecords. Consistency in handling this field across different modules should be verified.

Verification successful

The search results show that record_id is used across multiple files, often initialized to 0 or manipulated in some way. Here are the key observations:

  • In most files, record_id is initialized to 0.
  • In src/service/traces/validator.rs, record_id is manipulated (incremented).
  • In src/service/ingestion/mod.rs, record_id is used in formatting strings.

Given these observations, it appears that record_id is consistently handled as an identifier, often initialized to 0 and sometimes manipulated or used in string formatting. This suggests a consistent pattern across the codebase.

Files with record_id initialization or manipulation:

  • src/service/traces/validator.rs
  • src/service/metrics/prom.rs
  • src/service/metadata/trace_list_index.rs
  • src/service/metrics/otlp_http.rs
  • src/service/metadata/distinct_values.rs
  • src/service/logs/mod.rs
  • src/service/logs/bulk.rs
  • src/service/metrics/json.rs
  • src/service/enrichment_table/mod.rs
  • src/service/metrics/otlp_grpc.rs
  • src/job/files/parquet.rs
  • src/common/meta/stream.rs

Files with record_id used in formatting:

  • src/service/ingestion/mod.rs

Based on this analysis, the handling of record_id appears consistent across different modules.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify consistent handling of `record_id` across different modules.

# Test: Search for `record_id` across all Rust files to ensure consistent handling.
rg --type rust 'record_id' --files-with-matches

Length of output: 447



Script:

#!/bin/bash
# Description: Extract lines of code where `record_id` is used in each file to verify consistent handling.

# List of files containing `record_id`
files=(
  "src/service/traces/validator.rs"
  "src/service/metrics/prom.rs"
  "src/service/metadata/trace_list_index.rs"
  "src/service/metrics/otlp_http.rs"
  "src/service/metadata/distinct_values.rs"
  "src/service/logs/mod.rs"
  "src/service/logs/bulk.rs"
  "src/service/ingestion/mod.rs"
  "src/service/metrics/json.rs"
  "src/service/enrichment_table/mod.rs"
  "src/service/metrics/otlp_grpc.rs"
  "src/job/files/parquet.rs"
  "src/common/meta/stream.rs"
)

# Extract lines containing `record_id` from each file
for file in "${files[@]}"; do
  echo "File: $file"
  rg 'record_id' "$file"
  echo
done

Length of output: 3276

src/service/logs/bulk.rs (1)

657-657: Added record_id field to SchemaRecords.

Ensure that the record_id is being used appropriately across all instances where SchemaRecords is utilized, especially in serialization and deserialization processes.

src/service/metrics/prom.rs (1)

362-362: Added record_id field to SchemaRecords.

Ensure that the record_id is being used appropriately across all instances where SchemaRecords is utilized, especially in serialization and deserialization processes.

src/job/files/parquet.rs (1)

674-674: Confirm the appropriateness of the default value for record_id.

Verification successful

The default value of 0 for record_id is consistently used across the codebase and aligns with its purpose as a counter or unique identifier. This initialization does not conflict with existing functionality.

  • record_id is initialized with 0 in multiple files.
  • It is incremented in some cases, indicating its use as a unique identifier.
Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage of `record_id` across the system to ensure it does not conflict with existing functionality.

# Test: Search for `record_id` usage in the codebase. Expect: Consistent usage and initialization.
rg --type rust $'record_id'

Length of output: 1344

src/service/metrics/otlp_http.rs (1)

443-443: Add default value for record_id in SchemaRecords.

The addition of a default value for record_id in SchemaRecords is consistent with the changes described in the AI-generated summary. This ensures that every record can be uniquely identified, which is crucial for data integrity and traceability.

Comment on lines 373 to 707

// get distinct_value item
for field in DISTINCT_FIELDS.iter() {
if let Some(val) = record_val.get(field) {
if !val.is_null() {
let (filter_name, filter_value) = if field == "operation_name" {
("service_name".to_string(), service_name.clone())
} else {
("".to_string(), "".to_string())
};
distinct_values.push(MetadataItem::DistinctValues(DvItem {
stream_type: StreamType::Traces,
stream_name: traces_stream_name.to_string(),
field_name: field.to_string(),
field_value: val.as_str().unwrap().to_string(),
filter_name,
filter_value,
}));
}
}
}

// build trace metadata
trace_index.push(MetadataItem::TraceListIndexer(TraceListItem {
stream_name: traces_stream_name.to_string(),
service_name: service_name.clone(),
trace_id,
_timestamp: start_time / 1000,
}));

// check schema
let _ = check_for_schema(
org_id,
&traces_stream_name,
StreamType::Traces,
&mut traces_schema_map,
vec![&record_val],
timestamp.try_into().unwrap(),
)
.await;

if trigger.is_none() && !stream_alerts_map.is_empty() {
// Start check for alert trigger
let key =
format!("{}/{}/{}", &org_id, StreamType::Traces, traces_stream_name);
if let Some(alerts) = stream_alerts_map.get(&key) {
let mut trigger_alerts: TriggerAlertData = Vec::new();
for alert in alerts {
if let Ok(Some(v)) = alert.evaluate(Some(&record_val)).await {
trigger_alerts.push((alert.clone(), v));
}
}
trigger = Some(trigger_alerts);
}
// End check for alert trigger
}

// get hour key
let rec_schema = traces_schema_map
.get(&traces_stream_name)
.unwrap()
.schema()
.clone()
.with_metadata(HashMap::new());
let schema_key = rec_schema.hash_key();
let hour_key = crate::service::ingestion::get_wal_time_key(
timestamp.try_into().unwrap(),
&partition_keys,
partition_time_level,
&record_val,
Some(&schema_key),
);

let hour_buf = data_buf.entry(hour_key).or_insert_with(|| SchemaRecords {
record_id,
schema_key,
schema: Arc::new(rec_schema),
records: vec![],
records_size: 0,
});
record_id += 1;

// let record_val = record_val.to_owned();
let record_val = json::Value::Object(record_val);
let record_size = json::estimate_json_bytes(&record_val);
hour_buf.records.push(Arc::new(record_val));
hour_buf.records_size += record_size;
}
}
}
}

Ok((
data_buf,
trigger,
distinct_values,
trace_index,
partial_success,
))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure robust error handling for JSON parsing in validate_json_trace_request.

- let resource = res_span.get("resource").unwrap().as_object().unwrap();
+ let resource = res_span.get("resource").ok_or(BufferWriteError::HttpBadRequest("Missing 'resource' key".to_string()))?.as_object().ok_or(BufferWriteError::HttpBadRequest("Expected 'resource' to be an object".to_string()))?;

This modification adds necessary checks before unwrapping, which prevents potential panics if the expected JSON structure is not met. It's crucial to validate the presence and type of JSON keys to ensure the stability and reliability of the parsing logic.


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
pub async fn validate_json_trace_request(
org_id: &str,
body: web::Bytes,
in_stream_name: Option<&str>,
) -> Result<
(
HashMap<String, SchemaRecords>,
Option<TriggerAlertData>,
Vec<MetadataItem>,
Vec<MetadataItem>,
ExportTracePartialSuccess,
),
BufferWriteError,
> {
basic_check(org_id)?;
let mut runtime = crate::service::ingestion::init_functions_runtime();
let mut traces_schema_map: HashMap<String, SchemaCache> = HashMap::new();
let mut stream_alerts_map: HashMap<String, Vec<Alert>> = HashMap::new();
let mut distinct_values = Vec::with_capacity(16);
let traces_stream_name = match in_stream_name {
Some(name) => format_stream_name(name),
None => "default".to_string(),
};
let (partition_keys, partition_time_level) =
get_partition_keys(org_id, traces_stream_name.as_str(), &mut traces_schema_map).await;
// Start get stream alerts
crate::service::ingestion::get_stream_alerts(
&[StreamParams {
org_id: org_id.to_owned().into(),
stream_name: traces_stream_name.to_owned().into(),
stream_type: StreamType::Traces,
}],
&mut stream_alerts_map,
)
.await;
// End get stream alert
// Start Register Transforms for stream
let (local_trans, stream_vrl_map) = crate::service::ingestion::register_stream_functions(
org_id,
&StreamType::Traces,
&traces_stream_name,
);
// End Register Transforms for stream
let mut trigger: Option<TriggerAlertData> = None;
let min_ts = (Utc::now() - Duration::try_hours(CONFIG.limit.ingest_allowed_upto).unwrap())
.timestamp_micros();
let mut partial_success = crate::common::meta::traces::ExportTracePartialSuccess::default();
let mut data_buf: HashMap<String, SchemaRecords> = HashMap::new();
let mut service_name: String = traces_stream_name.to_string();
// let export_req: ExportTraceServiceRequest =
// json::from_slice(body.as_ref()).unwrap();
let body: json::Value = match json::from_slice(body.as_ref()) {
Ok(v) => v,
Err(e) => {
return Err(BufferWriteError::HttpBadRequest(format!(
"Invalid json: {}",
e
)));
}
};
let spans = match body.get("resourceSpans") {
Some(json::Value::Array(v)) => v,
_ => {
return Err(BufferWriteError::HttpBadRequest(
"Invalid json: the structure must be {{\"resourceSpans\":[]}}".to_string(),
));
}
};
let mut trace_index = Vec::with_capacity(spans.len());
for res_span in spans.iter() {
let mut service_att_map: HashMap<String, json::Value> = HashMap::new();
if res_span.get("resource").is_some() {
let resource = res_span.get("resource").unwrap().as_object().unwrap();
if resource.get("attributes").is_some() {
let attributes = resource.get("attributes").unwrap().as_array().unwrap();
for res_attr in attributes {
let local_attr = res_attr.as_object().unwrap();
if local_attr
.get("key")
.unwrap()
.as_str()
.unwrap()
.eq(SERVICE_NAME)
{
let loc_service_name =
local_attr.get("value").unwrap().as_object().unwrap();
for item in loc_service_name {
service_name = item.1.as_str().unwrap().to_string();
service_att_map.insert(SERVICE_NAME.to_string(), item.1.clone());
}
} else {
service_att_map.insert(
format!(
"{}.{}",
SERVICE,
local_attr.get("key").unwrap().as_str().unwrap()
),
get_val_for_attr(local_attr.get("value").unwrap().clone()),
);
}
}
}
}
let scope_resources = res_span.get("scopeSpans");
let inst_resources = if let Some(v) = scope_resources {
v.as_array().unwrap()
} else {
res_span
.get("instrumentationLibrarySpans")
.unwrap()
.as_array()
.unwrap()
};
let mut record_id = 1;
for inst_span in inst_resources {
if inst_span.get("spans").is_some() {
let spans = inst_span.get("spans").unwrap().as_array().unwrap();
for span in spans {
let span_id: String = span.get("spanId").unwrap().as_str().unwrap().to_string();
let trace_id: String =
span.get("traceId").unwrap().as_str().unwrap().to_string();
let mut span_ref = HashMap::new();
if span.get("parentSpanId").is_some() {
span_ref.insert(
PARENT_SPAN_ID.to_string(),
span.get("parentSpanId")
.unwrap()
.as_str()
.unwrap()
.to_string(),
);
span_ref.insert(PARENT_TRACE_ID.to_string(), trace_id.clone());
span_ref
.insert(REF_TYPE.to_string(), format!("{:?}", SpanRefType::ChildOf));
}
let start_time = json::get_uint_value(span.get("startTimeUnixNano").unwrap());
let end_time = json::get_uint_value(span.get("endTimeUnixNano").unwrap());
let mut span_att_map: HashMap<String, json::Value> = HashMap::new();
let attributes = span.get("attributes").unwrap().as_array().unwrap();
for span_att in attributes {
let mut key = span_att.get("key").unwrap().as_str().unwrap().to_string();
if BLOCK_FIELDS.contains(&key.as_str()) {
key = format!("attr_{}", key);
}
span_att_map.insert(
key,
get_val_for_attr(span_att.get("value").unwrap().clone()),
);
}
let mut events = vec![];
let mut event_att_map: HashMap<String, json::Value> = HashMap::new();
let empty_vec = Vec::new();
let span_events = match span.get("events") {
Some(v) => v.as_array().unwrap(),
None => &empty_vec,
};
for event in span_events {
let attributes = event.get("attributes").unwrap().as_array().unwrap();
for event_att in attributes {
event_att_map.insert(
event_att.get("key").unwrap().as_str().unwrap().to_string(),
get_val_for_attr(event_att.get("value").unwrap().clone()),
);
}
events.push(Event {
name: event.get("name").unwrap().as_str().unwrap().to_string(),
_timestamp: json::get_uint_value(event.get("timeUnixNano").unwrap()),
attributes: event_att_map.clone(),
})
}
let timestamp = start_time / 1000;
let local_val = Span {
trace_id: trace_id.clone(),
span_id,
span_kind: span.get("kind").unwrap().to_string(),
span_status: json::get_string_value(
span.get("status")
.unwrap_or(&json::Value::String("UNSET".to_string())),
),
operation_name: span.get("name").unwrap().as_str().unwrap().to_string(),
start_time,
end_time,
duration: (end_time - start_time) / 1000, // microseconds
reference: span_ref,
service_name: service_name.clone(),
attributes: span_att_map,
service: service_att_map.clone(),
flags: 1, // TODO add appropriate value
events: json::to_string(&events).unwrap(),
};
if timestamp < min_ts.try_into().unwrap() {
partial_success.rejected_spans += 1;
continue;
}
let mut value: json::Value = json::to_value(local_val).unwrap();
// JSON Flattening
value = flatten::flatten(value)
.map_err(|e| BufferWriteError::IoError(e.to_string()))?;
if !local_trans.is_empty() {
value = crate::service::ingestion::apply_stream_functions(
&local_trans,
value,
&stream_vrl_map,
&traces_stream_name,
&mut runtime,
)
.map_err(|e| BufferWriteError::IoError(e.to_string()))?;
}
// End row based transform */
// get json object
let mut record_val = match value.take() {
json::Value::Object(v) => v,
_ => unreachable!(),
};
record_val.insert(
CONFIG.common.column_timestamp.clone(),
json::Value::Number(timestamp.into()),
);
// get distinct_value item
for field in DISTINCT_FIELDS.iter() {
if let Some(val) = record_val.get(field) {
if !val.is_null() {
let (filter_name, filter_value) = if field == "operation_name" {
("service_name".to_string(), service_name.clone())
} else {
("".to_string(), "".to_string())
};
distinct_values.push(MetadataItem::DistinctValues(DvItem {
stream_type: StreamType::Traces,
stream_name: traces_stream_name.to_string(),
field_name: field.to_string(),
field_value: val.as_str().unwrap().to_string(),
filter_name,
filter_value,
}));
}
}
}
// build trace metadata
trace_index.push(MetadataItem::TraceListIndexer(TraceListItem {
stream_name: traces_stream_name.to_string(),
service_name: service_name.clone(),
trace_id,
_timestamp: start_time / 1000,
}));
// check schema
let _ = check_for_schema(
org_id,
&traces_stream_name,
StreamType::Traces,
&mut traces_schema_map,
vec![&record_val],
timestamp.try_into().unwrap(),
)
.await;
if trigger.is_none() && !stream_alerts_map.is_empty() {
// Start check for alert trigger
let key =
format!("{}/{}/{}", &org_id, StreamType::Traces, traces_stream_name);
if let Some(alerts) = stream_alerts_map.get(&key) {
let mut trigger_alerts: TriggerAlertData = Vec::new();
for alert in alerts {
if let Ok(Some(v)) = alert.evaluate(Some(&record_val)).await {
trigger_alerts.push((alert.clone(), v));
}
}
trigger = Some(trigger_alerts);
}
// End check for alert trigger
}
// get hour key
let rec_schema = traces_schema_map
.get(&traces_stream_name)
.unwrap()
.schema()
.clone()
.with_metadata(HashMap::new());
let schema_key = rec_schema.hash_key();
let hour_key = crate::service::ingestion::get_wal_time_key(
timestamp.try_into().unwrap(),
&partition_keys,
partition_time_level,
&record_val,
Some(&schema_key),
);
let hour_buf = data_buf.entry(hour_key).or_insert_with(|| SchemaRecords {
record_id,
schema_key,
schema: Arc::new(rec_schema),
records: vec![],
records_size: 0,
});
record_id += 1;
// let record_val = record_val.to_owned();
let record_val = json::Value::Object(record_val);
let record_size = json::estimate_json_bytes(&record_val);
hour_buf.records.push(Arc::new(record_val));
hour_buf.records_size += record_size;
}
}
}
}
Ok((
data_buf,
trigger,
distinct_values,
trace_index,
partial_success,
))
}
for res_span in spans.iter() {
let mut service_att_map: HashMap<String, json::Value> = HashMap::new();
if res_span.get("resource").is_some() {
let resource = res_span.get("resource").ok_or(BufferWriteError::HttpBadRequest("Missing 'resource' key".to_string()))?.as_object().ok_or(BufferWriteError::HttpBadRequest("Expected 'resource' to be an object".to_string()))?;
if resource.get("attributes").is_some() {
let attributes = resource.get("attributes").unwrap().as_array().unwrap();
for res_attr in attributes {
let local_attr = res_attr.as_object().unwrap();
if local_attr
.get("key")
.unwrap()
.as_str()
.unwrap()
.eq(SERVICE_NAME)
{
let loc_service_name =
local_attr.get("value").unwrap().as_object().unwrap();
for item in loc_service_name {
service_name = item.1.as_str().unwrap().to_string();
service_att_map.insert(SERVICE_NAME.to_string(), item.1.clone());
}
} else {
service_att_map.insert(
format!(
"{}.{}",
SERVICE,
local_attr.get("key").unwrap().as_str().unwrap()
),
get_val_for_attr(local_attr.get("value").unwrap().clone()),
);
}
}
}
}

@@ -347,6 +347,7 @@ pub async fn handle_grpc_request(
Some(&schema_key),
);
let hour_buf = buf.entry(hour_key).or_insert_with(|| SchemaRecords {
record_id: 0,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The record_id field is initialized to 0 in multiple files, but there is no evidence of increment logic in these files, unlike in src/service/traces/validator.rs. This could potentially lead to issues if record_id is expected to be unique or incremented.

  • Files with record_id initialized to 0 without increment logic:
    • src/service/metadata/trace_list_index.rs
    • src/service/metadata/distinct_values.rs
    • src/service/logs/bulk.rs
    • src/service/metrics/prom.rs
    • src/service/logs/mod.rs
    • src/service/metrics/json.rs
    • src/service/metrics/otlp_http.rs
    • src/service/enrichment_table/mod.rs
    • src/service/metrics/otlp_grpc.rs
    • src/job/files/parquet.rs

Please review these files to ensure that record_id is correctly incremented where necessary.

Analysis chain

Verify the usage and increment logic of record_id.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage and increment logic of `record_id` across the codebase.

# Test: Search for the usage of `record_id`. Expect: Proper initialization and increment logic.
rg --type rust $'record_id'

Length of output: 1344



Script:

#!/bin/bash
# Description: Extract relevant code snippets from `src/service/traces/validator.rs` to review the increment logic of `record_id`.

# Extract lines containing `record_id` and their surrounding context.
rg --type rust --context 5 'record_id' src/service/traces/validator.rs

Length of output: 2375



Script:

#!/bin/bash
# Description: Extract relevant code snippets from other files where `record_id` is initialized to 0 to review their usage and increment logic.

# Extract lines containing `record_id` and their surrounding context from files where it is initialized to 0.
rg --type rust --context 5 'record_id: 0' src/service/metadata/trace_list_index.rs src/service/metadata/distinct_values.rs src/service/logs/bulk.rs src/service/metrics/prom.rs src/service/logs/mod.rs src/service/metrics/json.rs src/service/metrics/otlp_http.rs src/service/enrichment_table/mod.rs src/service/metrics/otlp_grpc.rs src/job/files/parquet.rs

Length of output: 8628

@@ -24,9 +24,10 @@ use std::{
use arrow_schema::Schema;
use chrono::{Duration, Utc};
use config::{metrics, CONFIG};
use futures::lock::Mutex;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using tokio::sync::Mutex instead of futures::lock::Mutex for consistency with other asynchronous operations in the codebase.

@@ -210,34 +210,21 @@ impl Writer {
CONFIG.limit.max_file_size_on_disk as u64,
)
.context(WalSnafu)?;
let old_wal = std::mem::replace(&mut *wal, new_wal);
let _ = std::mem::replace(&mut *wal, new_wal);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The search results indicate that there is no explicit error handling around the std::mem::replace usage in the src/ingester/src/writer.rs file. The std::mem::replace function itself does not return a Result type, so any error handling would need to be implemented around the creation or validation of the new_wal instance before the replacement.

Given the absence of such error handling in the surrounding lines, it is clear that the concern raised in the review comment is valid. Proper error handling should be added to ensure the new WAL instance is valid before replacing the old one.

  • Location in codebase:
    • src/ingester/src/writer.rs, line 213
Analysis chain

Replacing the WAL writer instance should be accompanied by proper error handling to ensure the new WAL is valid.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify error handling when replacing the WAL writer instance.

# Test: Search for error handling patterns around `std::mem::replace`.
rg --type rust 'std::mem::replace'

Length of output: 625



Script:

#!/bin/bash
# Description: Verify error handling around the usage of `std::mem::replace` in the codebase.

# Search for error handling patterns around the identified lines in the codebase.
rg --type rust -A 5 -B 5 'std::mem::replace'

Length of output: 4605

Comment on lines +290 to +338
pub async fn write_memtable(
&self,
schema: Arc<Schema>,
entry: Entry,
check_ttl: bool,
rotate_mem: Option<&(String, String)>,
) -> Result<()> {
if entry.data.is_empty() && !check_ttl {
return Ok(());
}

let sid = entry.session_id.clone();
if rotate_mem.is_some() {
let mut mem = self.memtable.write().await;
let new_mem = MemTable::new();
let old_mem = std::mem::replace(&mut *mem, new_mem);
drop(mem);
// update created_at
self.created_at
.store(Utc::now().timestamp_micros(), Ordering::Release);
let thread_id = self.thread_id;
let key = self.key.clone();
let path = PathBuf::from(rotate_mem.unwrap().1.as_str());
let path_str = path.display().to_string();
tokio::task::spawn(async move {
log::info!(
"[INGESTER:WAL] [{sid}] start add to IMMUTABLES, file: {}",
path_str
);
IMMUTABLES.write().await.insert(
path.clone(),
Arc::new(immutable::Immutable::new(thread_id, key.clone(), old_mem)),
);
log::info!(
"[INGESTER:WAL] [{sid}] dones add to IMMUTABLES, file: {}",
path_str
);
});
}

let start = std::time::Instant::now();
let mut mem = self.memtable.write().await;
mem.write(schema, entry).await?;
drop(mem);
metrics::INGEST_MEMTABLE_LOCK_TIME
.with_label_values(&[&self.key.org_id])
.observe(start.elapsed().as_millis() as f64);
Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The search results indicate that there are no direct tests for the write_memtable method in the src/ingester/src/writer.rs file. This suggests that the rotation logic introduced in the write_memtable method may not be thoroughly tested.

  • Location of write_memtable method: src/ingester/src/writer.rs
  • Tests found: None directly referencing write_memtable

To ensure the robustness of the new rotation logic, it is crucial to add or verify existing tests that cover this functionality.

Analysis chain

The new write_memtable method introduces handling for rotating the memtable which is a significant enhancement. Ensure that the rotation logic is thoroughly tested, especially the handling of session IDs and paths.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify thorough testing of memtable rotation logic in `write_memtable`.

# Test: Search for tests covering `write_memtable`.
rg --type rust 'write_memtable' --files-with-matches | xargs rg 'test'

Length of output: 981



Script:

#!/bin/bash
# Description: Identify the file containing the `write_memtable` method and search for related tests.

# Step 1: Locate the file containing the `write_memtable` method.
write_memtable_file=$(rg --type rust 'pub async fn write_memtable' --files-with-matches)

# Step 2: Search for test functions within the same module or related modules.
if [ -n "$write_memtable_file" ]; then
  rg --type rust 'test' $(dirname "$write_memtable_file")
else
  echo "write_memtable method not found."
fi

Length of output: 1269



Script:

#!/bin/bash
# Description: Verify thorough testing of memtable rotation logic in `write_memtable`.

# Step 1: Locate the file containing the `write_memtable` method.
write_memtable_file=$(rg --type rust 'pub async fn write_memtable' --files-with-matches)

# Step 2: Search for test functions within the same module or related modules.
if [ -n "$write_memtable_file" ]; then
  for file in $write_memtable_file; do
    echo "Searching for tests in $file"
    rg --type rust 'write_memtable' $(dirname "$file")
  done
else
  echo "write_memtable method not found."
fi

Length of output: 868

@@ -236,6 +236,7 @@ impl Metadata for DistinctValues {
let data_size = json::to_vec(&data).unwrap_or_default().len();

let hour_buf = buf.entry(hour_key).or_insert_with(|| SchemaRecords {
record_id: 0,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The record_id field is consistently initialized to 0 across multiple modules, except in src/service/traces/validator.rs, where it is initialized to 1 and incremented. This discrepancy should be reviewed to ensure consistent behavior across the application.

  • src/service/traces/validator.rs: record_id is initialized to 1 and incremented.
  • Other modules: record_id is initialized to 0.

Please review the usage of record_id in src/service/traces/validator.rs to ensure it aligns with the intended behavior across the application.

Analysis chain

The addition of record_id initialized to 0 in SchemaRecords should be reviewed for its intended use and consistency across the application.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage of `record_id` across different modules to ensure consistency.

# Test: Search for `record_id` usage in other modules. Expect: Consistent initialization and usage.
rg --type rust $'record_id'

Length of output: 1344

Comment on lines +340 to +391
pub async fn write_wal(&self, mut entry: Entry) -> Result<Option<(String, String)>> {
if entry.data.is_empty() {
return Ok(None);
}
let entry_bytes = entry.into_bytes()?;
let mut wal = self.wal.lock().await;
let mut res = None;
if self.check_wal_threshold(wal.size(), entry_bytes.len())
|| self.check_mem_threshold(self.memtable.read().await.size(), entry.data_size)
{
// sync wal before rotation
wal.sync().context(WalSnafu)?;
// rotation wal
let wal_id = self.next_seq.fetch_add(1, Ordering::SeqCst);
let wal_dir = PathBuf::from(&CONFIG.common.data_wal_dir)
.join("logs")
.join(self.thread_id.to_string());
let sid = entry.session_id;
log::info!(
"[INGESTER:WAL] [{sid}] create file: {}/{}/{}/{}.wal",
wal_dir.display().to_string(),
&self.key.org_id,
&self.key.stream_type,
wal_id
);
let new_wal = WalWriter::new(
wal_dir,
&self.key.org_id,
&self.key.stream_type,
wal_id,
CONFIG.limit.max_file_size_on_disk as u64,
)
.context(WalSnafu)?;
let old_wal = std::mem::replace(&mut *wal, new_wal);
let path_str = old_wal.path().clone().display().to_string();
log::info!(
"[{sid}] old_wal path_str will be record in immutable : {:?}",
path_str
);

res = Some((sid.to_string(), path_str));
}

// write into wal
let start = std::time::Instant::now();
wal.write(&entry_bytes, false).context(WalSnafu)?;
metrics::INGEST_WAL_LOCK_TIME
.with_label_values(&[&self.key.org_id])
.observe(start.elapsed().as_millis() as f64);

Ok(res)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The check_wal_threshold and check_mem_threshold methods are implemented in src/ingester/src/writer.rs. However, there are no specific test cases related to WAL rotation logic found in the codebase.

  • check_wal_threshold method implementation: src/ingester/src/writer.rs
  • check_mem_threshold method implementation: src/ingester/src/writer.rs

It is crucial to add test cases to ensure the WAL rotation logic is correctly implemented and tested.

Analysis chain

The write_wal method has been updated to handle WAL rotation based on thresholds. It's crucial to ensure that the thresholds and rotation logic are correctly implemented and tested.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify implementation and testing of WAL rotation logic in `write_wal`.

# Test: Search for implementation details and tests for WAL rotation.
rg --type rust 'write_wal' --files-with-matches | xargs rg 'threshold|test'

Length of output: 1843



Script:

#!/bin/bash
# Retrieve the implementation of `check_wal_threshold` and `check_mem_threshold` methods
ast-grep --lang rust --pattern 'fn check_wal_threshold($_, $_) { $$$ }'
ast-grep --lang rust --pattern 'fn check_mem_threshold($_, $_) { $$$ }'

# Search for any test cases related to WAL rotation logic in the codebase
rg --type rust 'wal rotation|rotate wal|rotate_wal|wal rotation logic' --tests

Length of output: 660



Script:

#!/bin/bash
# Retrieve the implementation of `check_wal_threshold` and `check_mem_threshold` methods
ast-grep --lang rust --pattern 'fn check_wal_threshold($_, $_) { $$$ }'
ast-grep --lang rust --pattern 'fn check_mem_threshold($_, $_) { $$$ }'

# Correct the `rg` command to search for test cases related to WAL rotation logic
rg --type rust 'wal rotation|rotate wal|rotate_wal|wal rotation logic' --text

Length of output: 227



Script:

#!/bin/bash
# Search for the definitions of `check_wal_threshold` and `check_mem_threshold` methods
rg 'fn check_wal_threshold' --type rust
rg 'fn check_mem_threshold' --type rust

# Search for any test cases related to WAL rotation logic using a broader pattern
rg --type rust 'wal|rotate' --text

Length of output: 32152

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between fe25392 and 17730c1.
Files selected for processing (3)
  • src/common/meta/stream.rs (1 hunks)
  • src/service/ingestion/mod.rs (5 hunks)
  • src/service/traces/validator.rs (1 hunks)
Files skipped from review as they are similar to previous changes (2)
  • src/common/meta/stream.rs
  • src/service/ingestion/mod.rs
Additional comments not posted (4)
src/service/traces/validator.rs (4)

39-57: Refine error handling for better clarity and diagnostics.


370-701: Ensure robust error handling for JSON parsing in validate_json_trace_request.


59-91: The implementation of partition key retrieval appears robust and well-handled.


370-701: The JSON trace request validation is handled appropriately with robust error checks.

Comment on lines +92 to +368

// get distinct_value item
for field in DISTINCT_FIELDS.iter() {
if let Some(val) = record_val.get(field) {
if let Some(val) = val.as_str() {
let (filter_name, filter_value) = if field == "operation_name" {
("service_name".to_string(), service_name.clone())
} else {
("".to_string(), "".to_string())
};
distinct_values.push(MetadataItem::DistinctValues(DvItem {
stream_type: StreamType::Traces,
stream_name: traces_stream_name.to_string(),
field_name: field.to_string(),
field_value: val.to_string(),
filter_name,
filter_value,
}));
}
}
}

// build trace metadata
trace_index.push(MetadataItem::TraceListIndexer(TraceListItem {
stream_name: traces_stream_name.to_string(),
service_name: service_name.clone(),
trace_id,
_timestamp: start_time / 1000,
}));

// check schema
let _ = check_for_schema(
org_id,
&traces_stream_name,
StreamType::Traces,
&mut traces_schema_map,
vec![&record_val],
timestamp.try_into().unwrap(),
)
.await;

if trigger.is_none() && !stream_alerts_map.is_empty() {
// Start check for alert trigger
let key = format!("{}/{}/{}", &org_id, StreamType::Traces, traces_stream_name);
if let Some(alerts) = stream_alerts_map.get(&key) {
let mut trigger_alerts: TriggerAlertData = Vec::new();
for alert in alerts {
if let Ok(Some(v)) = alert.evaluate(Some(&record_val)).await {
trigger_alerts.push((alert.clone(), v));
}
}
trigger = Some(trigger_alerts);
}
// End check for alert trigger
}

// get hour key
let rec_schema = traces_schema_map
.get(&traces_stream_name)
.unwrap()
.schema()
.clone()
.with_metadata(HashMap::new());
let schema_key = rec_schema.hash_key();
let hour_key = crate::service::ingestion::get_wal_time_key(
timestamp.try_into().unwrap(),
&partition_keys,
partition_time_level,
&record_val,
Some(&schema_key),
);

let hour_buf = data_buf.entry(hour_key).or_insert_with(|| SchemaRecords {
schema_key,
schema: Arc::new(rec_schema),
records: vec![],
records_size: 0,
});
// let record_val = record_val.to_owned();
let record_val = json::Value::Object(record_val);
let record_size = json::estimate_json_bytes(&record_val);
hour_buf.records.push(Arc::new(record_val));
hour_buf.records_size += record_size;

if timestamp < min_ts.try_into().unwrap() {
partial_success.rejected_spans += 1;
continue;
}
}
}
}

Ok((
data_buf,
trigger,
distinct_values,
trace_index,
partial_success,
))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider refactoring to improve readability and maintainability.

The function validate_trace_request is quite complex and handles multiple responsibilities. Consider breaking it down into smaller, more focused functions, each handling a specific part of the trace validation process. This will improve readability and maintainability.

Comment on lines +39 to +57
fn basic_check(org_id: &str) -> Result<(), BufferWriteError> {
if !cluster::is_ingester(&cluster::LOCAL_NODE_ROLE) {
return Err(BufferWriteError::InternalServerError);
}

if !db::file_list::BLOCKED_ORGS.is_empty() && db::file_list::BLOCKED_ORGS.contains(&org_id) {
return Err(BufferWriteError::HttpForbidden(format!(
"Quota exceeded for this organization [{}]",
org_id
)));
}

// check memtable
if let Err(e) = ingester::check_memtable_size() {
return Err(BufferWriteError::HttpServiceUnavailable(e.to_string()));
}

Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider enhancing error messages for better diagnostics and user feedback.

- return Err(BufferWriteError::InternalServerError);
+ return Err(BufferWriteError::InternalServerError("Node role is not set to ingester".to_string()));

- return Err(BufferWriteError::HttpServiceUnavailable(e.to_string()));
+ return Err(BufferWriteError::HttpServiceUnavailable(format!("Memtable size check failed: {}", e)));

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
fn basic_check(org_id: &str) -> Result<(), BufferWriteError> {
if !cluster::is_ingester(&cluster::LOCAL_NODE_ROLE) {
return Err(BufferWriteError::InternalServerError);
}
if !db::file_list::BLOCKED_ORGS.is_empty() && db::file_list::BLOCKED_ORGS.contains(&org_id) {
return Err(BufferWriteError::HttpForbidden(format!(
"Quota exceeded for this organization [{}]",
org_id
)));
}
// check memtable
if let Err(e) = ingester::check_memtable_size() {
return Err(BufferWriteError::HttpServiceUnavailable(e.to_string()));
}
Ok(())
}
fn basic_check(org_id: &str) -> Result<(), BufferWriteError> {
if !cluster::is_ingester(&cluster::LOCAL_NODE_ROLE) {
return Err(BufferWriteError::InternalServerError("Node role is not set to ingester".to_string()));
}
if !db::file_list::BLOCKED_ORGS.is_empty() && db::file_list::BLOCKED_ORGS.contains(&org_id) {
return Err(BufferWriteError::HttpForbidden(format!(
"Quota exceeded for this organization [{}]",
org_id
)));
}
// check memtable
if let Err(e) = ingester::check_memtable_size() {
return Err(BufferWriteError::HttpServiceUnavailable(format!("Memtable size check failed: {}", e)));
}
Ok(())
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review Details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits Files that changed from the base of the PR and between 17730c1 and a955f2f.
Files selected for processing (2)
  • src/ingester/src/writer.rs (4 hunks)
  • src/service/traces/flusher.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/service/traces/flusher.rs
Additional comments not posted (4)
src/ingester/src/writer.rs (4)

27-27: Consider using tokio::sync::Mutex instead of futures::lock::Mutex for consistency with other asynchronous operations in the codebase.


213-213: Ensure proper error handling when replacing the WAL writer instance.


305-353: Verify thorough testing of memtable rotation logic in write_memtable.


355-406: Add test cases to ensure the WAL rotation logic is correctly implemented and tested.

@hengfeiyang hengfeiyang marked this pull request as draft May 30, 2024 15:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
☢️ Bug Something isn't working ✏️ Feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants