diff --git a/Cargo.lock b/Cargo.lock index 6b565f69ca6..7a4a53c73a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2962,6 +2962,7 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand 0.9.1", + "serde_json", "thiserror 2.0.12", ] diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 86fab82888c..11d1d977310 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -55,6 +55,11 @@ pub enum Feature { /// Serialized as `projects:relay-otel-endpoint`. #[serde(rename = "projects:relay-otel-endpoint")] OtelEndpoint, + /// Enable standalone log ingestion via the `/logs` OTel endpoint. + /// + /// Serialized as `projects:relay-otel-logs-endpoint` + #[serde(rename = "projects:relay-otel-logs-endpoint")] + OtelLogsEndpoint, /// Enable playstation crash dump ingestion via the `/playstation/` endpoint. /// /// Serialized as `organizations:relay-playstation-ingestion`. diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index f9a1f19d7c8..29971981498 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -57,7 +57,7 @@ mime = { workspace = true } minidump = { workspace = true, optional = true } multer = { workspace = true } once_cell = { workspace = true } -opentelemetry-proto = { workspace = true } +opentelemetry-proto = { workspace = true, features = ["trace", "logs"] } papaya = { workspace = true } pin-project-lite = { workspace = true } priority-queue = { workspace = true } diff --git a/relay-server/src/endpoints/mod.rs b/relay-server/src/endpoints/mod.rs index 073ce5a46c5..861332f7b1b 100644 --- a/relay-server/src/endpoints/mod.rs +++ b/relay-server/src/endpoints/mod.rs @@ -14,6 +14,7 @@ mod health_check; mod minidump; mod monitor; mod nel; +mod ourlogs; #[cfg(sentry)] mod playstation; mod project_configs; @@ -81,7 +82,11 @@ pub fn routes(config: &Config) -> Router{ // Because we initially released this endpoint with a trailing slash, keeping it for // backwards compatibility. .route("/api/{project_id}/otlp/v1/traces", traces::route(config)) - .route("/api/{project_id}/otlp/v1/traces/", traces::route(config)); + .route("/api/{project_id}/otlp/v1/traces/", traces::route(config)) + // The OTLP/HTTP transport defaults to a request suffix of /v1/logs (no trailing slash): + // https://opentelemetry.io/docs/specs/otlp/#otlphttp-request + .route("/api/{project_id}/otlp/v1/logs", ourlogs::route(config)) + .route("/api/{project_id}/otlp/v1/logs/", ourlogs::route(config)); // NOTE: If you add a new (non-experimental) route here, please also list it in // https://github.com/getsentry/sentry-docs/blob/master/docs/product/relay/operating-guidelines.mdx diff --git a/relay-server/src/endpoints/ourlogs.rs b/relay-server/src/endpoints/ourlogs.rs new file mode 100644 index 00000000000..fec9807ae1a --- /dev/null +++ b/relay-server/src/endpoints/ourlogs.rs @@ -0,0 +1,43 @@ +use axum::RequestExt; +use axum::extract::{DefaultBodyLimit, Request}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::routing::{MethodRouter, post}; +use bytes::Bytes; +use relay_config::Config; +use relay_dynamic_config::Feature; + +use crate::endpoints::common; +use crate::envelope::{ContentType, Envelope, Item, ItemType}; +use crate::extractors::{RawContentType, RequestMeta}; +use crate::service::ServiceState; + +async fn handle( + state: ServiceState, + content_type: RawContentType, + meta: RequestMeta, + request: Request, +) -> axum::response::Result { + let content_type @ (ContentType::Json | ContentType::Protobuf) = + ContentType::from(content_type.as_ref()) + else { + return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE); + }; + let payload: Bytes = request.extract().await?; + let mut envelope = Envelope::from_request(None, meta); + envelope.require_feature(Feature::OtelLogsEndpoint); + + envelope.add_item({ + let mut item = Item::new(ItemType::OtelLogsData); + item.set_payload(content_type, payload); + item + }); + + common::handle_envelope(&state, envelope).await?; + + Ok(StatusCode::ACCEPTED) +} + +pub fn route(config: &Config) -> MethodRouter { + post(handle).route_layer(DefaultBodyLimit::max(config.max_envelope_size())) +} diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 4426e3f9bc5..566cad028df 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -136,6 +136,11 @@ impl Item { ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, item_count)], // NOTE: semantically wrong, but too expensive to parse. ItemType::OtelTracesData => smallvec![(DataCategory::Span, item_count)], + // NOTE: OtelLogsData is mapped to Log data category. + ItemType::OtelLogsData => smallvec![ + (DataCategory::LogByte, self.len().max(1)), + (DataCategory::LogItem, item_count) + ], ItemType::ProfileChunk => match self.profile_type() { Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, item_count)], Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, item_count)], @@ -418,6 +423,7 @@ impl Item { | ItemType::Log | ItemType::OtelSpan | ItemType::OtelTracesData + | ItemType::OtelLogsData | ItemType::ProfileChunk => false, // The unknown item type can observe any behavior, most likely there are going to be no @@ -453,6 +459,7 @@ impl Item { ItemType::Log => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, + ItemType::OtelLogsData => false, ItemType::ProfileChunk => false, // Since this Relay cannot interpret the semantics of this item, it does not know @@ -530,6 +537,8 @@ pub enum ItemType { OtelSpan, /// An OTLP TracesData container. OtelTracesData, + /// An OTLP LogsData container. + OtelLogsData, /// UserReport as an Event UserReportV2, /// ProfileChunk is a chunk of a profiling session. @@ -585,6 +594,7 @@ impl ItemType { Self::Span => "span", Self::OtelSpan => "otel_span", Self::OtelTracesData => "otel_traces_data", + Self::OtelLogsData => "otel_logs_data", Self::ProfileChunk => "profile_chunk", Self::Unknown(_) => "unknown", } @@ -636,6 +646,7 @@ impl ItemType { ItemType::Span => true, ItemType::OtelSpan => true, ItemType::OtelTracesData => true, + ItemType::OtelLogsData => true, ItemType::UserReportV2 => false, ItemType::ProfileChunk => true, ItemType::Unknown(_) => true, @@ -678,6 +689,7 @@ impl std::str::FromStr for ItemType { "span" => Self::Span, "otel_span" => Self::OtelSpan, "otel_traces_data" => Self::OtelTracesData, + "otel_logs_data" => Self::OtelLogsData, "profile_chunk" => Self::ProfileChunk, // "profile_chunk_ui" is to be treated as an alias for `ProfileChunk` // because Android 8.10.0 and 8.11.0 is sending it as the item type. diff --git a/relay-server/src/processing/logs/mod.rs b/relay-server/src/processing/logs/mod.rs index 042f38ea1b5..5730d04e9cf 100644 --- a/relay-server/src/processing/logs/mod.rs +++ b/relay-server/src/processing/logs/mod.rs @@ -1,14 +1,23 @@ use std::sync::Arc; +use chrono::{TimeZone, Utc}; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use prost::Message; use relay_event_schema::processor::ProcessingAction; -use relay_event_schema::protocol::OurLog; +use relay_event_schema::protocol::{ + Attribute, AttributeType, Attributes, OurLog, OurLogLevel, SpanId, Timestamp, TraceId, +}; use relay_filter::FilterStatKey; use relay_pii::PiiConfigError; +use relay_protocol::{Annotated, Object, Value}; use relay_quotas::{DataCategory, RateLimits}; use crate::Envelope; use crate::envelope::{ - ContainerItems, ContainerWriteError, EnvelopeHeaders, Item, ItemContainer, ItemType, Items, + ContainerItems, ContainerWriteError, ContentType, EnvelopeHeaders, Item, ItemContainer, + ItemType, Items, }; use crate::managed::{ Counted, Managed, ManagedEnvelope, ManagedResult as _, OutcomeError, Quantities, @@ -107,6 +116,9 @@ impl processing::Processor for LogsProcessor { ) -> Option> { let headers = envelope.envelope().headers().clone(); + // Convert OTLP logs data to individual log items + convert_otel_logs_data(envelope); + let logs = envelope .envelope_mut() .take_items_by(|item| matches!(*item.ty(), ItemType::Log)) @@ -309,3 +321,919 @@ impl ExpandedLogs { impl CountRateLimited for Managed { type Error = Error; } + +/// Converts OTLP logs data items to individual log items. +pub fn convert_otel_logs_data(envelope: &mut ManagedEnvelope) { + for item in envelope + .envelope_mut() + .take_items_by(|item| item.ty() == &ItemType::OtelLogsData) + { + convert_logs_data(item, envelope); + } +} + +fn convert_logs_data(item: Item, envelope: &mut ManagedEnvelope) { + let logs_request = match parse_logs_data(item) { + Ok(logs_request) => logs_request, + Err(reason) => { + // NOTE: logging quantity=1 is semantically wrong, but we cannot know the real quantity + // without parsing. + envelope.track_outcome( + crate::services::outcome::Outcome::Invalid(reason), + DataCategory::LogItem, + 1, + ); + envelope.track_outcome( + crate::services::outcome::Outcome::Invalid(reason), + DataCategory::LogByte, + 1, + ); + return; + } + }; + + for resource_logs in logs_request.resource_logs { + for scope_logs in resource_logs.scope_logs { + for mut log_record in scope_logs.log_records { + // Denormalize instrumentation scope and resource attributes into every log. + if let Some(ref scope) = scope_logs.scope { + if !scope.name.is_empty() { + log_record.attributes.push(KeyValue { + key: "instrumentation.name".to_owned(), + value: Some(AnyValue { + value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(scope.name.clone())), + }), + }); + } + if !scope.version.is_empty() { + log_record.attributes.push(KeyValue { + key: "instrumentation.version".to_owned(), + value: Some(AnyValue { + value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(scope.version.clone())), + }), + }); + } + scope.attributes.iter().for_each(|a| { + log_record.attributes.push(KeyValue { + key: format!("instrumentation.{}", a.key), + value: a.value.clone(), + }); + }); + } + if let Some(ref resource) = resource_logs.resource { + resource.attributes.iter().for_each(|a| { + log_record.attributes.push(KeyValue { + key: format!("resource.{}", a.key), + value: a.value.clone(), + }); + }); + } + + // Convert OTLP log record to OurLog + match otel_to_sentry_log(&log_record) { + Ok(our_log) => { + // Add individual log to container items + let mut logs_container = ContainerItems::new(); + logs_container + .push(crate::envelope::WithHeader::just(Annotated::new(our_log))); + + if !logs_container.is_empty() { + let mut item = Item::new(ItemType::Log); + if ItemContainer::from(logs_container) + .write_to(&mut item) + .is_err() + { + envelope.track_outcome( + crate::services::outcome::Outcome::Invalid( + crate::services::outcome::DiscardReason::Internal, + ), + DataCategory::LogItem, + 1, + ); + continue; + } + envelope.envelope_mut().add_item(item); + } + } + Err(_) => { + envelope.track_outcome( + crate::services::outcome::Outcome::Invalid( + crate::services::outcome::DiscardReason::Internal, + ), + DataCategory::LogItem, + 1, + ); + continue; + } + } + } + } + } +} + +fn parse_logs_data( + item: Item, +) -> Result { + match item.content_type() { + Some(&ContentType::Json) => serde_json::from_slice(&item.payload()).map_err(|e| { + relay_log::debug!( + error = &e as &dyn std::error::Error, + "Failed to parse logs data as JSON" + ); + crate::services::outcome::DiscardReason::InvalidJson + }), + Some(&ContentType::Protobuf) => { + ExportLogsServiceRequest::decode(item.payload()).map_err(|e| { + relay_log::debug!( + error = &e as &dyn std::error::Error, + "Failed to parse logs data as protobuf" + ); + crate::services::outcome::DiscardReason::InvalidProtobuf + }) + } + _ => Err(crate::services::outcome::DiscardReason::ContentType), + } +} + +fn otel_value_to_log_attribute(value: &AnyValue) -> Option { + match &value.value { + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue(v)) => { + Some(Attribute::new(AttributeType::Boolean, Value::Bool(*v))) + } + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue(v)) => { + Some(Attribute::new(AttributeType::Double, Value::F64(*v))) + } + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue(v)) => { + Some(Attribute::new(AttributeType::Integer, Value::I64(*v))) + } + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(v)) => Some( + Attribute::new(AttributeType::String, Value::String(v.clone())), + ), + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::BytesValue(v)) => { + String::from_utf8(v.clone()) + .ok() + .map(|s| Attribute::new(AttributeType::String, Value::String(s))) + } + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::ArrayValue(_)) => None, + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::KvlistValue(_)) => None, + None => None, + } +} + +/// Transform an OpenTelemetry LogRecord to a Sentry log. +fn otel_to_sentry_log(log_record: &LogRecord) -> Result { + let severity_number = log_record.severity_number; + let severity_text = log_record.severity_text.clone(); + + // Convert span_id and trace_id from bytes + let span_id = if log_record.span_id.is_empty() { + SpanId::default() + } else { + SpanId::try_from(log_record.span_id.as_slice()) + .map_err(|_| relay_protocol::Error::expected("valid span_id"))? + }; + + let trace_id = if log_record.trace_id.is_empty() { + return Err(relay_protocol::Error::expected("valid trace_id")); + } else { + TraceId::try_from(log_record.trace_id.as_slice()) + .map_err(|_| relay_protocol::Error::expected("valid trace_id"))? + }; + + // Convert timestamp from nanoseconds + let timestamp = if log_record.time_unix_nano > 0 { + Utc.timestamp_nanos(log_record.time_unix_nano as i64) + } else { + return Err(relay_protocol::Error::expected("valid timestamp")); + }; + + // Extract log body + let body = log_record + .body + .as_ref() + .and_then(|v| v.value.as_ref()) + .and_then(|v| match v { + opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s) => { + Some(s.clone()) + } + _ => None, + }) + .unwrap_or_default(); + + // Initialize attributes with severity number + let mut attribute_data = Attributes::new(); + attribute_data.insert("sentry.severity_number".to_owned(), severity_number as i64); + + // Convert OpenTelemetry attributes + for attribute in &log_record.attributes { + if let Some(value) = &attribute.value { + let key = attribute.key.clone(); + if let Some(v) = otel_value_to_log_attribute(value) { + attribute_data.insert_raw(key, Annotated::new(v)); + } + } + } + + // Map severity_number to OurLogLevel, falling back to severity_text if it's not a number. + // Finally default to Info if severity_number is not in range and severity_text is not a valid + // log level. + let level = match severity_number { + 1..=4 => OurLogLevel::Trace, + 5..=8 => OurLogLevel::Debug, + 9..=12 => OurLogLevel::Info, + 13..=16 => OurLogLevel::Warn, + 17..=20 => OurLogLevel::Error, + 21..=24 => OurLogLevel::Fatal, + _ => match severity_text.as_str() { + "trace" => OurLogLevel::Trace, + "debug" => OurLogLevel::Debug, + "info" => OurLogLevel::Info, + "warn" => OurLogLevel::Warn, + "error" => OurLogLevel::Error, + "fatal" => OurLogLevel::Fatal, + _ => OurLogLevel::Info, + }, + }; + + let ourlog = OurLog { + timestamp: Annotated::new(Timestamp(timestamp)), + trace_id: Annotated::new(trace_id), + span_id: Annotated::new(span_id), + level: Annotated::new(level), + attributes: Annotated::new(attribute_data), + body: Annotated::new(body), + other: Object::default(), + }; + + Ok(ourlog) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Envelope; + use crate::managed::{ManagedEnvelope, TypedEnvelope}; + use crate::services::processor::{LogGroup, ProcessingGroup}; + use bytes::Bytes; + use opentelemetry_proto::tonic::common::v1::any_value::Value as OtelValue; + use opentelemetry_proto::tonic::common::v1::{AnyValue, ArrayValue, KeyValue, KeyValueList}; + use relay_protocol::get_path; + use relay_system::Addr; + + #[test] + fn test_attribute_denormalization() { + // Construct an OTLP logs payload with: + // - a resource with one attribute, containing: + // - an instrumentation scope with one attribute, containing: + // - a log record with one attribute + let logs_data = r#" + { + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "resource_key", + "value": { + "stringValue": "resource_value" + } + } + ] + }, + "scopeLogs": [ + { + "scope": { + "name": "test_instrumentation", + "version": "0.0.1", + "attributes": [ + { + "key": "scope_key", + "value": { + "stringValue": "scope_value" + } + } + ] + }, + "logRecords": [ + { + "timeUnixNano": "1640995200000000000", + "severityNumber": 9, + "severityText": "INFO", + "body": { + "stringValue": "Test log message" + }, + "attributes": [ + { + "key": "log_key", + "value": { + "stringValue": "log_value" + } + } + ], + "traceId": "0102030405060708090a0b0c0d0e0f10", + "spanId": "0102030405060708" + } + ] + } + ] + } + ] + } + "#; + + // Build an envelope containing the OTLP logs data. + let bytes = + Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#); + let envelope = Envelope::parse_bytes(bytes).unwrap(); + let (outcome_aggregator, _) = Addr::custom(); + let managed_envelope = ManagedEnvelope::new(envelope, outcome_aggregator); + let mut typed_envelope: TypedEnvelope = + (managed_envelope, ProcessingGroup::Log).try_into().unwrap(); + let mut item = Item::new(ItemType::OtelLogsData); + item.set_payload(ContentType::Json, logs_data); + typed_envelope.envelope_mut().add_item(item.clone()); + + // Convert the OTLP logs data into Log item(s). + convert_logs_data(item, &mut typed_envelope); + + // Assert that the attributes from the resource and instrumentation + // scope were copied. + let item = typed_envelope + .envelope() + .items() + .find(|i| *i.ty() == ItemType::Log) + .expect("converted log missing from envelope"); + + let container = + ItemContainer::::parse(item).expect("unable to parse log container"); + let log = container + .into_items() + .into_iter() + .next() + .expect("no log in container"); + let log = log.value().expect("log has no value"); + + let attributes = &log.attributes.value().expect("log has no attributes").0; + let get_attribute_value = |key: &str| -> String { + let attr = attributes + .get(key) + .unwrap_or_else(|| panic!("attribute {key} missing")) + .value() + .expect("attribute has no value"); + match attr.value.value.value().expect("attribute value missing") { + Value::String(s) => s.clone(), + _ => panic!("attribute {key} not a string"), + } + }; + + assert_eq!( + get_attribute_value("log_key"), + "log_value".to_owned(), + "original log attribute should be present" + ); + assert_eq!( + get_attribute_value("instrumentation.name"), + "test_instrumentation".to_owned(), + "instrumentation name should be in attributes" + ); + assert_eq!( + get_attribute_value("instrumentation.version"), + "0.0.1".to_owned(), + "instrumentation version should be in attributes" + ); + assert_eq!( + get_attribute_value("resource.resource_key"), + "resource_value".to_owned(), + "resource attribute should be copied with prefix" + ); + assert_eq!( + get_attribute_value("instrumentation.scope_key"), + "scope_value".to_owned(), + "instrumentation scope attribute should be copied with prefix" + ); + + // Verify log content + assert_eq!( + log.body.value().expect("log has no body"), + "Test log message" + ); + assert_eq!( + log.level.value().expect("log has no level"), + &OurLogLevel::Info + ); + + // Verify severity number was added + let severity_attr = attributes + .get("sentry.severity_number") + .expect("severity number attribute missing") + .value() + .expect("severity attribute has no value"); + match severity_attr + .value + .value + .value() + .expect("severity value missing") + { + Value::I64(9) => {} + _ => panic!("severity number should be 9"), + } + } + + #[test] + fn test_severity_mapping() { + let test_cases = vec![ + (1, "trace", OurLogLevel::Trace), + (5, "debug", OurLogLevel::Debug), + (9, "info", OurLogLevel::Info), + (13, "warn", OurLogLevel::Warn), + (17, "error", OurLogLevel::Error), + (21, "fatal", OurLogLevel::Fatal), + (0, "unknown", OurLogLevel::Info), // fallback case + ]; + + for (severity_number, severity_text, expected_level) in test_cases { + let log_record = LogRecord { + time_unix_nano: 1640995200000000000, + severity_number, + severity_text: severity_text.to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("test message".to_owned())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], + span_id: vec![1, 2, 3, 4, 5, 6, 7, 8], + observed_time_unix_nano: 0, + event_name: String::new(), + }; + + let result = otel_to_sentry_log(&log_record).expect("conversion should succeed"); + assert_eq!( + result.level.value().expect("level should be present"), + &expected_level, + "severity {} should map to {:?}", + severity_number, + expected_level + ); + } + } + + #[test] + fn test_attribute_types() { + let log_record = LogRecord { + time_unix_nano: 1640995200000000000, + severity_number: 9, + severity_text: "info".to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("test".to_owned())), + }), + attributes: vec![ + KeyValue { + key: "string_attr".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::StringValue("test_string".to_owned())), + }), + }, + KeyValue { + key: "int_attr".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::IntValue(42)), + }), + }, + KeyValue { + key: "double_attr".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::DoubleValue(std::f64::consts::PI)), + }), + }, + KeyValue { + key: "bool_attr".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::BoolValue(true)), + }), + }, + KeyValue { + key: "bytes_attr".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::BytesValue(b"hello".to_vec())), + }), + }, + ], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], + span_id: vec![1, 2, 3, 4, 5, 6, 7, 8], + observed_time_unix_nano: 0, + event_name: String::new(), + }; + + let result = otel_to_sentry_log(&log_record).expect("conversion should succeed"); + let attributes = &result.attributes.value().expect("log has no attributes").0; + + // Check string attribute + let string_attr = attributes.get("string_attr").unwrap().value().unwrap(); + assert_eq!( + string_attr.value.ty.value().unwrap(), + &AttributeType::String + ); + match string_attr.value.value.value().unwrap() { + Value::String(s) => assert_eq!(s, "test_string"), + _ => panic!("expected string value"), + } + + // Check integer attribute + let int_attr = attributes.get("int_attr").unwrap().value().unwrap(); + assert_eq!(int_attr.value.ty.value().unwrap(), &AttributeType::Integer); + match int_attr.value.value.value().unwrap() { + Value::I64(i) => assert_eq!(*i, 42), + _ => panic!("expected integer value"), + } + + // Check double attribute + let double_attr = attributes.get("double_attr").unwrap().value().unwrap(); + assert_eq!( + double_attr.value.ty.value().unwrap(), + &AttributeType::Double + ); + match double_attr.value.value.value().unwrap() { + Value::F64(f) => assert!((f - std::f64::consts::PI).abs() < f64::EPSILON), + _ => panic!("expected double value"), + } + + // Check boolean attribute + let bool_attr = attributes.get("bool_attr").unwrap().value().unwrap(); + assert_eq!(bool_attr.value.ty.value().unwrap(), &AttributeType::Boolean); + match bool_attr.value.value.value().unwrap() { + Value::Bool(b) => assert!(b), + _ => panic!("expected boolean value"), + } + + // Check bytes attribute (converted to string) + let bytes_attr = attributes.get("bytes_attr").unwrap().value().unwrap(); + assert_eq!(bytes_attr.value.ty.value().unwrap(), &AttributeType::String); + match bytes_attr.value.value.value().unwrap() { + Value::String(s) => assert_eq!(s, "hello"), + _ => panic!("expected string value from bytes"), + } + } + + #[test] + fn test_missing_trace_id_error() { + let log_record = LogRecord { + time_unix_nano: 1640995200000000000, + severity_number: 9, + severity_text: "info".to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("test".to_owned())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![], // Empty trace_id should cause error + span_id: vec![1, 2, 3, 4, 5, 6, 7, 8], + observed_time_unix_nano: 0, + event_name: String::new(), + }; + + let result = otel_to_sentry_log(&log_record); + assert!(result.is_err(), "empty trace_id should cause error"); + } + + #[test] + fn test_missing_timestamp_error() { + let log_record = LogRecord { + time_unix_nano: 0, // Zero timestamp should cause error + severity_number: 9, + severity_text: "info".to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("test".to_owned())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], + span_id: vec![1, 2, 3, 4, 5, 6, 7, 8], + observed_time_unix_nano: 0, + event_name: String::new(), + }; + + let result = otel_to_sentry_log(&log_record); + assert!(result.is_err(), "zero timestamp should cause error"); + } + + #[test] + fn test_empty_span_id_defaults() { + let log_record = LogRecord { + time_unix_nano: 1640995200000000000, + severity_number: 9, + severity_text: "info".to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("test".to_owned())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], + span_id: vec![], // Empty span_id should default + observed_time_unix_nano: 0, + event_name: String::new(), + }; + + let result = otel_to_sentry_log(&log_record).expect("conversion should succeed"); + let span_id = result.span_id.value().expect("span_id should be present"); + assert_eq!(*span_id, SpanId::default(), "empty span_id should default"); + } + + #[test] + fn test_different_body_types() { + let test_cases = vec![ + ( + Some(OtelValue::StringValue("string body".to_owned())), + "string body", + ), + (Some(OtelValue::IntValue(42)), ""), // Non-string values should result in empty body + (Some(OtelValue::BoolValue(true)), ""), + (None, ""), // No body should result in empty body + ]; + + for (body_value, expected_body) in test_cases { + let log_record = LogRecord { + time_unix_nano: 1640995200000000000, + severity_number: 9, + severity_text: "info".to_owned(), + body: body_value.clone().map(|v| AnyValue { value: Some(v) }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], + span_id: vec![1, 2, 3, 4, 5, 6, 7, 8], + observed_time_unix_nano: 0, + event_name: String::new(), + }; + + let result = otel_to_sentry_log(&log_record).expect("conversion should succeed"); + assert_eq!( + result.body.value().expect("body should be present"), + expected_body, + "body value should match expected for {:?}", + body_value + ); + } + } + + #[test] + fn test_parse_otel_log() { + // Based on https://github.com/open-telemetry/opentelemetry-proto/blob/c4214b8168d0ce2a5236185efb8a1c8950cccdd6/examples/logs.json + let log_record = LogRecord { + time_unix_nano: 1544712660300000000, + observed_time_unix_nano: 1544712660300000000, + severity_number: 10, + severity_text: "Information".to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("Example log record".to_owned())), + }), + attributes: vec![ + KeyValue { + key: "string.attribute".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::StringValue("some string".to_owned())), + }), + }, + KeyValue { + key: "boolean.attribute".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::BoolValue(true)), + }), + }, + KeyValue { + key: "int.attribute".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::IntValue(10)), + }), + }, + KeyValue { + key: "double.attribute".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::DoubleValue(637.704)), + }), + }, + KeyValue { + key: "array.attribute".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::ArrayValue(ArrayValue { + values: vec![ + AnyValue { + value: Some(OtelValue::StringValue("many".to_owned())), + }, + AnyValue { + value: Some(OtelValue::StringValue("values".to_owned())), + }, + ], + })), + }), + }, + KeyValue { + key: "map.attribute".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::KvlistValue(KeyValueList { + values: vec![KeyValue { + key: "some.map.key".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::StringValue("some value".to_owned())), + }), + }], + })), + }), + }, + ], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![ + 0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x03, 0x81, 0x03, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, + 0xC6, 0x0C, + ], + span_id: vec![0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x74], + event_name: String::new(), + }; + + let our_log = otel_to_sentry_log(&log_record).expect("conversion should succeed"); + let annotated_log: Annotated = Annotated::new(our_log); + + assert_eq!( + get_path!(annotated_log.body), + Some(&Annotated::new("Example log record".into())) + ); + + // Verify that complex attributes (array, map) are not included since we skip them + let attributes = &annotated_log.value().unwrap().attributes.value().unwrap().0; + assert!(attributes.get("string.attribute").is_some()); + assert!(attributes.get("boolean.attribute").is_some()); + assert!(attributes.get("int.attribute").is_some()); + assert!(attributes.get("double.attribute").is_some()); + assert!(attributes.get("array.attribute").is_none()); // Should be skipped + assert!(attributes.get("map.attribute").is_none()); // Should be skipped + } + + #[test] + fn test_parse_otellog_with_invalid_trace_id() { + let log_record = LogRecord { + time_unix_nano: 1544712660300000000, + observed_time_unix_nano: 1544712660300000000, + severity_number: 10, + severity_text: "Information".to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("Test log".to_owned())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![], // Empty trace_id should cause error + span_id: vec![0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x74], + event_name: String::new(), + }; + + let our_log = otel_to_sentry_log(&log_record); + assert!(our_log.is_err(), "empty trace_id should cause error"); + } + + #[test] + fn test_parse_otel_log_with_db_attributes() { + let log_record = LogRecord { + time_unix_nano: 1544712660300000000, + observed_time_unix_nano: 1544712660300000000, + severity_number: 10, + severity_text: "Information".to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("Database query executed".to_owned())), + }), + attributes: vec![ + KeyValue { + key: "db.name".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::StringValue("database".to_owned())), + }), + }, + KeyValue { + key: "db.type".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::StringValue("sql".to_owned())), + }), + }, + KeyValue { + key: "db.statement".to_owned(), + value: Some(AnyValue { + value: Some(OtelValue::StringValue( + "SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s" + .to_owned(), + )), + }), + }, + ], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![ + 0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x03, 0x81, 0x03, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, + 0xC6, 0x0C, + ], + span_id: vec![0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x74], + event_name: String::new(), + }; + + let our_log = otel_to_sentry_log(&log_record).expect("conversion should succeed"); + let annotated_log: Annotated = Annotated::new(our_log); + + assert_eq!( + get_path!(annotated_log.body), + Some(&Annotated::new("Database query executed".into())) + ); + + let attributes = &annotated_log.value().unwrap().attributes.value().unwrap().0; + let db_statement = attributes.get("db.statement").unwrap().value().unwrap(); + + match db_statement.value.value.value().unwrap() { + Value::String(s) => assert_eq!( + s, + "SELECT \"table\".\"col\" FROM \"table\" WHERE \"table\".\"col\" = %s" + ), + _ => panic!("expected string value for db.statement"), + } + } + + #[test] + fn test_severity_text_fallback() { + // Test severity text fallback when severity number is not in standard range + let test_cases = vec![ + (0, "trace", OurLogLevel::Trace), + (0, "debug", OurLogLevel::Debug), + (0, "info", OurLogLevel::Info), + (0, "warn", OurLogLevel::Warn), + (0, "error", OurLogLevel::Error), + (0, "fatal", OurLogLevel::Fatal), + (99, "unknown", OurLogLevel::Info), // fallback to Info + ]; + + for (severity_number, severity_text, expected_level) in test_cases { + let log_record = LogRecord { + time_unix_nano: 1544712660300000000, + observed_time_unix_nano: 1544712660300000000, + severity_number, + severity_text: severity_text.to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("test message".to_owned())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![ + 0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x03, 0x81, 0x03, 0xD2, 0x69, 0xB6, 0x33, 0x81, + 0x3F, 0xC6, 0x0C, + ], + span_id: vec![0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x74], + event_name: String::new(), + }; + + let result = otel_to_sentry_log(&log_record).expect("conversion should succeed"); + assert_eq!( + result.level.value().expect("level should be present"), + &expected_level, + "severity {} with text '{}' should map to {:?}", + severity_number, + severity_text, + expected_level + ); + } + } + + #[test] + fn test_hex_trace_and_span_id_conversion() { + let log_record = LogRecord { + time_unix_nano: 1544712660300000000, + observed_time_unix_nano: 1544712660300000000, + severity_number: 10, + severity_text: "Information".to_owned(), + body: Some(AnyValue { + value: Some(OtelValue::StringValue("Test message".to_owned())), + }), + attributes: vec![], + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![ + 0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x03, 0x81, 0x03, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, + 0xC6, 0x0C, + ], + span_id: vec![0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x74], + event_name: String::new(), + }; + + let result = otel_to_sentry_log(&log_record).expect("conversion should succeed"); + + // Verify trace_id conversion + let trace_id = result.trace_id.value().expect("trace_id should be present"); + assert_eq!(trace_id.to_string(), "5b8efff798038103d269b633813fc60c"); + + // Verify span_id conversion + let span_id = result.span_id.value().expect("span_id should be present"); + assert_eq!(span_id.to_string(), "eee19b7ec3c1b174"); + } +} diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 353f820396c..5e94acaa426 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -608,6 +608,8 @@ pub enum DiscardItemType { OtelSpan, /// An OTLP TracesData container. OtelTracesData, + /// An OTLP LogsData container. + OtelLogsData, /// UserReport as an Event UserReportV2, /// ProfileChunk is a chunk of a profiling session. @@ -662,6 +664,7 @@ impl DiscardItemType { Self::Span => "span", Self::OtelSpan => "otel_span", Self::OtelTracesData => "otel_traces_data", + Self::OtelLogsData => "otel_logs_data", Self::UserReportV2 => "user_report_v2", Self::ProfileChunk => "profile_chunk", } @@ -694,6 +697,7 @@ impl From<&ItemType> for DiscardItemType { ItemType::Span => Self::Span, ItemType::OtelSpan => Self::OtelSpan, ItemType::OtelTracesData => Self::OtelTracesData, + ItemType::OtelLogsData => Self::OtelLogsData, ItemType::UserReportV2 => Self::UserReportV2, ItemType::ProfileChunk => Self::ProfileChunk, ItemType::Unknown(_) => Self::Unknown, diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 67eda87d4cc..58d95629354 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -467,6 +467,7 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { ItemType::Span => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, + ItemType::OtelLogsData => false, ItemType::ProfileChunk => false, // Without knowing more, `Unknown` items are allowed to be repeated diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 5ed092a937f..7de4eef33c3 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -135,6 +135,7 @@ fn infer_event_category(item: &Item) -> Option { ItemType::Span => None, ItemType::OtelSpan => None, ItemType::OtelTracesData => None, + ItemType::OtelLogsData => None, ItemType::ProfileChunk => None, ItemType::Unknown(_) => None, } @@ -526,7 +527,7 @@ impl Enforcement { ItemType::ReplayRecording => !self.replays.is_active(), ItemType::UserReport => !self.user_reports.is_active(), ItemType::CheckIn => !self.check_ins.is_active(), - ItemType::Log => { + ItemType::Log | ItemType::OtelLogsData => { !(self.log_items.is_active() || self.log_bytes.is_active()) } ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => { diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 2b7a5a3be39..e5e4b64802f 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -82,6 +82,7 @@ pub fn check_envelope_size_limits( span_count += item.item_count().unwrap_or(1) as usize; config.max_span_size() } + ItemType::OtelLogsData => config.max_event_size(), // a logs container ItemType::OtelTracesData => config.max_event_size(), // a spans container similar to `Transaction` ItemType::ProfileChunk => config.max_profile_size(), ItemType::Unknown(_) => NO_LIMIT,