diff --git a/CHANGELOG.md b/CHANGELOG.md index 539c8a468f5..e6ea4f1ad8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - Add InstallableBuild and SizeAnalysis data categories. ([#5084](https://github.com/getsentry/relay/pull/5084)) - Add dynamic PII derivation to `metastructure`. ([#5107](https://github.com/getsentry/relay/pull/5107)) - Detect PII status of attributes based on `sentry-conventions`. ([#5113](https://github.com/getsentry/relay/pull/5113)) +- Add support for an OTLP `/v1/logs` endpoint. This endpoint is gated behind the `organizations:relay-otel-logs-endpoint` feature flag. ([#5130](https://github.com/getsentry/relay/pull/5130)) **Internal**: diff --git a/relay-ourlogs/src/lib.rs b/relay-ourlogs/src/lib.rs index 5296956879c..2b07fc659ab 100644 --- a/relay-ourlogs/src/lib.rs +++ b/relay-ourlogs/src/lib.rs @@ -11,3 +11,5 @@ mod size; pub use self::otel_to_sentry::otel_to_sentry_log; pub use self::size::calculate_size; + +pub use opentelemetry_proto::tonic::logs::v1 as otel_logs; diff --git a/relay-server/src/endpoints/mod.rs b/relay-server/src/endpoints/mod.rs index 073ce5a46c5..ed65186c9eb 100644 --- a/relay-server/src/endpoints/mod.rs +++ b/relay-server/src/endpoints/mod.rs @@ -14,6 +14,8 @@ mod health_check; mod minidump; mod monitor; mod nel; +mod otlp_log; +mod otlp_traces; #[cfg(sentry)] mod playstation; mod project_configs; @@ -21,7 +23,6 @@ mod public_keys; mod security_report; mod statics; mod store; -mod traces; mod unreal; use axum::extract::DefaultBodyLimit; @@ -80,8 +81,9 @@ pub fn routes(config: &Config) -> Router{ // https://opentelemetry.io/docs/specs/otlp/#otlphttp-request // Because we initially released this endpoint with a trailing slash, keeping it for // backwards compatibility. - .route("/api/{project_id}/otlp/v1/traces", traces::route(config)) - .route("/api/{project_id}/otlp/v1/traces/", traces::route(config)); + .route("/api/{project_id}/otlp/v1/traces", otlp_traces::route(config)) + .route("/api/{project_id}/otlp/v1/traces/", otlp_traces::route(config)) + .route("/api/{project_id}/otlp/v1/logs", otlp_log::route(config)); // NOTE: If you add a new (non-experimental) route here, please also list it in // https://github.com/getsentry/sentry-docs/blob/master/docs/product/relay/operating-guidelines.mdx diff --git a/relay-server/src/endpoints/otlp_log.rs b/relay-server/src/endpoints/otlp_log.rs new file mode 100644 index 00000000000..fec9807ae1a --- /dev/null +++ b/relay-server/src/endpoints/otlp_log.rs @@ -0,0 +1,43 @@ +use axum::RequestExt; +use axum::extract::{DefaultBodyLimit, Request}; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::routing::{MethodRouter, post}; +use bytes::Bytes; +use relay_config::Config; +use relay_dynamic_config::Feature; + +use crate::endpoints::common; +use crate::envelope::{ContentType, Envelope, Item, ItemType}; +use crate::extractors::{RawContentType, RequestMeta}; +use crate::service::ServiceState; + +async fn handle( + state: ServiceState, + content_type: RawContentType, + meta: RequestMeta, + request: Request, +) -> axum::response::Result { + let content_type @ (ContentType::Json | ContentType::Protobuf) = + ContentType::from(content_type.as_ref()) + else { + return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE); + }; + let payload: Bytes = request.extract().await?; + let mut envelope = Envelope::from_request(None, meta); + envelope.require_feature(Feature::OtelLogsEndpoint); + + envelope.add_item({ + let mut item = Item::new(ItemType::OtelLogsData); + item.set_payload(content_type, payload); + item + }); + + common::handle_envelope(&state, envelope).await?; + + Ok(StatusCode::ACCEPTED) +} + +pub fn route(config: &Config) -> MethodRouter { + post(handle).route_layer(DefaultBodyLimit::max(config.max_envelope_size())) +} diff --git a/relay-server/src/endpoints/traces.rs b/relay-server/src/endpoints/otlp_traces.rs similarity index 100% rename from relay-server/src/endpoints/traces.rs rename to relay-server/src/endpoints/otlp_traces.rs diff --git a/relay-server/src/envelope/item.rs b/relay-server/src/envelope/item.rs index 4426e3f9bc5..9a5bd64a82c 100644 --- a/relay-server/src/envelope/item.rs +++ b/relay-server/src/envelope/item.rs @@ -136,6 +136,10 @@ impl Item { ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, item_count)], // NOTE: semantically wrong, but too expensive to parse. ItemType::OtelTracesData => smallvec![(DataCategory::Span, item_count)], + ItemType::OtelLogsData => smallvec![ + (DataCategory::LogByte, self.len().max(1)), + (DataCategory::LogItem, item_count) // NOTE: semantically wrong, but too expensive to parse. + ], ItemType::ProfileChunk => match self.profile_type() { Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, item_count)], Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, item_count)], @@ -418,6 +422,7 @@ impl Item { | ItemType::Log | ItemType::OtelSpan | ItemType::OtelTracesData + | ItemType::OtelLogsData | ItemType::ProfileChunk => false, // The unknown item type can observe any behavior, most likely there are going to be no @@ -453,6 +458,7 @@ impl Item { ItemType::Log => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, + ItemType::OtelLogsData => false, ItemType::ProfileChunk => false, // Since this Relay cannot interpret the semantics of this item, it does not know @@ -530,6 +536,8 @@ pub enum ItemType { OtelSpan, /// An OTLP TracesData container. OtelTracesData, + /// An OTLP LogsData container. + OtelLogsData, /// UserReport as an Event UserReportV2, /// ProfileChunk is a chunk of a profiling session. @@ -585,6 +593,7 @@ impl ItemType { Self::Span => "span", Self::OtelSpan => "otel_span", Self::OtelTracesData => "otel_traces_data", + Self::OtelLogsData => "otel_logs_data", Self::ProfileChunk => "profile_chunk", Self::Unknown(_) => "unknown", } @@ -635,7 +644,8 @@ impl ItemType { ItemType::Log => true, ItemType::Span => true, ItemType::OtelSpan => true, - ItemType::OtelTracesData => true, + ItemType::OtelTracesData => false, + ItemType::OtelLogsData => false, ItemType::UserReportV2 => false, ItemType::ProfileChunk => true, ItemType::Unknown(_) => true, @@ -678,6 +688,7 @@ impl std::str::FromStr for ItemType { "span" => Self::Span, "otel_span" => Self::OtelSpan, "otel_traces_data" => Self::OtelTracesData, + "otel_logs_data" => Self::OtelLogsData, "profile_chunk" => Self::ProfileChunk, // "profile_chunk_ui" is to be treated as an alias for `ProfileChunk` // because Android 8.10.0 and 8.11.0 is sending it as the item type. diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 353f820396c..5e94acaa426 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -608,6 +608,8 @@ pub enum DiscardItemType { OtelSpan, /// An OTLP TracesData container. OtelTracesData, + /// An OTLP LogsData container. + OtelLogsData, /// UserReport as an Event UserReportV2, /// ProfileChunk is a chunk of a profiling session. @@ -662,6 +664,7 @@ impl DiscardItemType { Self::Span => "span", Self::OtelSpan => "otel_span", Self::OtelTracesData => "otel_traces_data", + Self::OtelLogsData => "otel_logs_data", Self::UserReportV2 => "user_report_v2", Self::ProfileChunk => "profile_chunk", } @@ -694,6 +697,7 @@ impl From<&ItemType> for DiscardItemType { ItemType::Span => Self::Span, ItemType::OtelSpan => Self::OtelSpan, ItemType::OtelTracesData => Self::OtelTracesData, + ItemType::OtelLogsData => Self::OtelLogsData, ItemType::UserReportV2 => Self::UserReportV2, ItemType::ProfileChunk => Self::ProfileChunk, ItemType::Unknown(_) => Self::Unknown, diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index e711dd736ae..5e53834e210 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -91,6 +91,7 @@ mod dynamic_sampling; mod event; mod metrics; mod nel; +mod ourlog; mod profile; mod profile_chunk; mod replay; @@ -335,7 +336,8 @@ impl ProcessingGroup { } // Extract logs. - let logs_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Log)); + let logs_items = envelope + .take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLogsData)); if !logs_items.is_empty() { grouped_envelopes.push(( ProcessingGroup::Log, @@ -2379,6 +2381,8 @@ impl EnvelopeProcessorService { if matches!(group, ProcessingGroup::Nel) { nel::convert_to_logs(&mut managed_envelope); } + // Convert OTLP logs data to individual log items + ourlog::convert_otel_logs(&mut managed_envelope); self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx) .await } diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 67eda87d4cc..58d95629354 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -467,6 +467,7 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { ItemType::Span => false, ItemType::OtelSpan => false, ItemType::OtelTracesData => false, + ItemType::OtelLogsData => false, ItemType::ProfileChunk => false, // Without knowing more, `Unknown` items are allowed to be repeated diff --git a/relay-server/src/services/processor/ourlog.rs b/relay-server/src/services/processor/ourlog.rs new file mode 100644 index 00000000000..86580b3be9b --- /dev/null +++ b/relay-server/src/services/processor/ourlog.rs @@ -0,0 +1,122 @@ +use opentelemetry_proto::tonic::common::v1::any_value::Value; +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; +use prost::Message; +use relay_ourlogs::{otel_logs::LogsData, otel_to_sentry_log}; +use relay_protocol::Annotated; +use relay_quotas::DataCategory; + +use crate::envelope::{ContainerItems, ContentType, Item, ItemContainer, ItemType, WithHeader}; +use crate::managed::ManagedEnvelope; +use crate::services::outcome::{DiscardReason, Outcome}; + +pub fn convert_otel_logs(envelope: &mut ManagedEnvelope) { + let items = envelope + .envelope_mut() + .take_items_by(|item| item.ty() == &ItemType::OtelLogsData); + let mut logs = ContainerItems::new(); + + for item in items { + match parse_logs_data(item) { + Ok(logs_data) => { + for resource_logs in logs_data.resource_logs { + for scope_logs in resource_logs.scope_logs { + for mut log_record in scope_logs.log_records { + // Denormalize instrumentation scope and resource attributes into every log record. + if let Some(ref scope) = scope_logs.scope { + if !scope.name.is_empty() { + log_record.attributes.push(KeyValue { + key: "instrumentation.name".to_owned(), + value: Some(AnyValue { + value: Some(Value::StringValue(scope.name.clone())), + }), + }) + } + if !scope.version.is_empty() { + log_record.attributes.push(KeyValue { + key: "instrumentation.version".to_owned(), + value: Some(AnyValue { + value: Some(Value::StringValue(scope.version.clone())), + }), + }) + } + scope.attributes.iter().for_each(|a| { + log_record.attributes.push(KeyValue { + key: format!("instrumentation.{}", a.key), + value: a.value.clone(), + }); + }); + } + if let Some(ref resource) = resource_logs.resource { + resource.attributes.iter().for_each(|a| { + log_record.attributes.push(KeyValue { + key: format!("resource.{}", a.key), + value: a.value.clone(), + }); + }); + } + + match otel_to_sentry_log(log_record) { + Ok(our_log) => { + logs.push(WithHeader::just(Annotated::new(our_log))); + } + Err(_error) => { + relay_log::debug!( + "Failed to convert OTLP log record to Sentry log" + ); + envelope.track_outcome( + Outcome::Invalid(DiscardReason::Internal), + DataCategory::LogItem, + 1, + ); + } + } + } + } + } + } + Err(reason) => { + relay_log::debug!("Failed to parse OTLP logs data"); + envelope.track_outcome(Outcome::Invalid(reason), DataCategory::LogItem, 1); + } + } + } + + if logs.is_empty() { + return; + } + + let mut item = Item::new(ItemType::Log); + match ItemContainer::from(logs).write_to(&mut item) { + Ok(()) => { + envelope.envelope_mut().add_item(item); + } + Err(err) => { + relay_log::error!("failed to serialize logs: {err}"); + envelope.track_outcome( + Outcome::Invalid(DiscardReason::Internal), + DataCategory::LogItem, + 1, + ); + } + } +} + +fn parse_logs_data(item: Item) -> Result { + match item.content_type() { + Some(&ContentType::Json) => serde_json::from_slice(&item.payload()).map_err(|e| { + relay_log::debug!( + error = &e as &dyn std::error::Error, + "Failed to parse logs data as JSON" + ); + DiscardReason::InvalidJson + }), + Some(&ContentType::Protobuf) => LogsData::decode(item.payload()).map_err(|e| { + relay_log::debug!( + error = &e as &dyn std::error::Error, + "Failed to parse logs data as protobuf" + ); + DiscardReason::InvalidProtobuf + }), + _ => Err(DiscardReason::ContentType), + } +} diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 5ed092a937f..7de4eef33c3 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -135,6 +135,7 @@ fn infer_event_category(item: &Item) -> Option { ItemType::Span => None, ItemType::OtelSpan => None, ItemType::OtelTracesData => None, + ItemType::OtelLogsData => None, ItemType::ProfileChunk => None, ItemType::Unknown(_) => None, } @@ -526,7 +527,7 @@ impl Enforcement { ItemType::ReplayRecording => !self.replays.is_active(), ItemType::UserReport => !self.user_reports.is_active(), ItemType::CheckIn => !self.check_ins.is_active(), - ItemType::Log => { + ItemType::Log | ItemType::OtelLogsData => { !(self.log_items.is_active() || self.log_bytes.is_active()) } ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => { diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 2b7a5a3be39..523923f9d93 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -83,6 +83,7 @@ pub fn check_envelope_size_limits( config.max_span_size() } ItemType::OtelTracesData => config.max_event_size(), // a spans container similar to `Transaction` + ItemType::OtelLogsData => config.max_event_size(), // a logs container similar to `Transaction` ItemType::ProfileChunk => config.max_profile_size(), ItemType::Unknown(_) => NO_LIMIT, }; diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 865e206d86a..f54161a492e 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -230,6 +230,33 @@ def send_otel_span( response.raise_for_status() + def send_otel_logs( + self, + project_id, + json=None, + bytes=None, + headers=None, + dsn_key_idx=0, + dsn_key=None, + ): + + if dsn_key is None: + dsn_key = self.get_dsn_public_key(project_id, dsn_key_idx) + + url = f"/api/{project_id}/otlp/v1/logs?sentry_key={dsn_key}" + + if json: + headers = { + "Content-Type": "application/json", + **(headers or {}), + } + + response = self.post(url, headers=headers, json=json) + else: + response = self.post(url, headers=headers, data=bytes) + + response.raise_for_status() + def send_options(self, project_id, headers=None, dsn_key_idx=0): headers = { "X-Sentry-Auth": self.get_auth_header(project_id, dsn_key_idx), diff --git a/tests/integration/test_otlp_logs.py b/tests/integration/test_otlp_logs.py new file mode 100644 index 00000000000..9a6992eae27 --- /dev/null +++ b/tests/integration/test_otlp_logs.py @@ -0,0 +1,212 @@ +import json +from datetime import datetime, timezone + +from .asserts import time_within_delta + + +def test_otlp_logs_conversion(mini_sentry, relay): + """Test OTLP logs conversion including basic and complex attributes.""" + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:ourlogs-ingestion", + "organizations:relay-otel-logs-endpoint", + ] + relay = relay(mini_sentry) + + otel_logs_payload = { + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": {"stringValue": "test-service"}, + } + ] + }, + "scopeLogs": [ + { + "scope": {"name": "test-library"}, + "logRecords": [ + { + "timeUnixNano": "1544712660300000000", + "observedTimeUnixNano": "1544712660300000000", + "severityNumber": 10, + "severityText": "Information", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": {"stringValue": "Example log record"}, + "attributes": [ + { + "key": "string.attribute", + "value": {"stringValue": "some string"}, + }, + { + "key": "boolean.attribute", + "value": {"boolValue": True}, + }, + { + "key": "int.attribute", + "value": {"intValue": "10"}, + }, + { + "key": "double.attribute", + "value": {"doubleValue": 637.704}, + }, + { + "key": "array.attribute", + "value": { + "arrayValue": { + "values": [ + {"stringValue": "first"}, + {"stringValue": "second"}, + ] + } + }, + }, + { + "key": "map.attribute", + "value": { + "kvlistValue": { + "values": [ + { + "key": "nested.key", + "value": { + "stringValue": "nested value" + }, + } + ] + } + }, + }, + ], + } + ], + } + ], + } + ] + } + + relay.send_otel_logs(project_id, json=otel_logs_payload) + + # Add some debugging to see if any envelope is captured + envelope = mini_sentry.captured_events.get(timeout=5) + + assert [item.type for item in envelope.items] == ["log"] + + log_item = json.loads(envelope.items[0].payload.bytes) + + # Verify the structure contains our log entry + assert "items" in log_item + assert len(log_item["items"]) == 1 + + log_entry = log_item["items"][0] + + # Verify basic structure + assert "timestamp" in log_entry + assert "trace_id" in log_entry + assert "level" in log_entry + assert "body" in log_entry + assert "attributes" in log_entry + + # Verify specific values + assert log_entry["body"] == "Example log record" + assert log_entry["level"] == "info" + assert log_entry["trace_id"] == "5b8efff798038103d269b633813fc60c" + + # Verify attributes were preserved + attributes = log_entry["attributes"] + assert "string.attribute" in attributes + assert attributes["string.attribute"]["value"] == "some string" + assert attributes["string.attribute"]["type"] == "string" + + assert "boolean.attribute" in attributes + assert attributes["boolean.attribute"]["value"] is True + assert attributes["boolean.attribute"]["type"] == "boolean" + + assert "int.attribute" in attributes + assert attributes["int.attribute"]["value"] == 10 + assert attributes["int.attribute"]["type"] == "integer" + + assert "double.attribute" in attributes + assert attributes["double.attribute"]["value"] == 637.704 + assert attributes["double.attribute"]["type"] == "double" + + # Test array attribute (should be serialized as JSON string) + assert "array.attribute" in attributes + assert attributes["array.attribute"]["type"] == "string" + assert attributes["array.attribute"]["value"] == '["first","second"]' + + # Test map attribute (should be serialized as JSON string) + assert "map.attribute" in attributes + assert attributes["map.attribute"]["type"] == "string" + assert attributes["map.attribute"]["value"] == '{"nested.key":"nested value"}' + + # Verify timestamp conversion (from unix nano to seconds) + expected_timestamp = datetime.fromtimestamp(1544712660.3, tz=timezone.utc) + assert log_entry["timestamp"] == time_within_delta(expected_timestamp) + + assert mini_sentry.captured_events.empty() + + +def test_otlp_logs_multiple_records(mini_sentry, relay): + """Test multiple log records in a single payload.""" + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:ourlogs-ingestion", + "organizations:relay-otel-logs-endpoint", + ] + relay = relay(mini_sentry) + + otel_logs_payload = { + "resourceLogs": [ + { + "scopeLogs": [ + { + "logRecords": [ + { + "timeUnixNano": "1544712660300000000", + "severityNumber": 18, + "severityText": "Error", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B174", + "body": {"stringValue": "First log entry"}, + }, + { + "timeUnixNano": "1544712661300000000", + "severityNumber": 6, + "severityText": "Debug", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B175", + "body": {"stringValue": "Second log entry"}, + }, + ] + } + ] + } + ] + } + + relay.send_otel_logs(project_id, json=otel_logs_payload) + envelope = mini_sentry.captured_events.get(timeout=3) + log_item = json.loads(envelope.items[0].payload.bytes) + + # Should have both log entries + assert len(log_item["items"]) == 2 + + # Verify first log entry + first_log = log_item["items"][0] + assert first_log["body"] == "First log entry" + assert first_log["level"] == "error" + assert first_log["span_id"] == "eee19b7ec3c1b174" + + # Verify second log entry + second_log = log_item["items"][1] + assert second_log["body"] == "Second log entry" + assert second_log["level"] == "debug" + assert second_log["span_id"] == "eee19b7ec3c1b175" + + assert mini_sentry.captured_events.empty()