Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Add InstallableBuild and SizeAnalysis data categories. ([#5084](https://github.com/getsentry/relay/pull/5084))
- Add dynamic PII derivation to `metastructure`. ([#5107](https://github.com/getsentry/relay/pull/5107))
- Detect PII status of attributes based on `sentry-conventions`. ([#5113](https://github.com/getsentry/relay/pull/5113))
- Add support for an OTLP `/v1/logs` endpoint. This endpoint is gated behind the `organizations:relay-otel-logs-endpoint` feature flag. ([#5130](https://github.com/getsentry/relay/pull/5130))

**Internal**:

Expand Down
2 changes: 2 additions & 0 deletions relay-ourlogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ mod size;

pub use self::otel_to_sentry::otel_to_sentry_log;
pub use self::size::calculate_size;

pub use opentelemetry_proto::tonic::logs::v1 as otel_logs;
8 changes: 5 additions & 3 deletions relay-server/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ mod health_check;
mod minidump;
mod monitor;
mod nel;
mod otlp_log;
mod otlp_traces;
#[cfg(sentry)]
mod playstation;
mod project_configs;
mod public_keys;
mod security_report;
mod statics;
mod store;
mod traces;
mod unreal;

use axum::extract::DefaultBodyLimit;
Expand Down Expand Up @@ -80,8 +81,9 @@ pub fn routes(config: &Config) -> Router<ServiceState>{
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
// Because we initially released this endpoint with a trailing slash, keeping it for
// backwards compatibility.
.route("/api/{project_id}/otlp/v1/traces", traces::route(config))
.route("/api/{project_id}/otlp/v1/traces/", traces::route(config));
.route("/api/{project_id}/otlp/v1/traces", otlp_traces::route(config))
.route("/api/{project_id}/otlp/v1/traces/", otlp_traces::route(config))
.route("/api/{project_id}/otlp/v1/logs", otlp_log::route(config));
// NOTE: If you add a new (non-experimental) route here, please also list it in
// https://github.com/getsentry/sentry-docs/blob/master/docs/product/relay/operating-guidelines.mdx

Expand Down
43 changes: 43 additions & 0 deletions relay-server/src/endpoints/otlp_log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use axum::RequestExt;
use axum::extract::{DefaultBodyLimit, Request};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{MethodRouter, post};
use bytes::Bytes;
use relay_config::Config;
use relay_dynamic_config::Feature;

use crate::endpoints::common;
use crate::envelope::{ContentType, Envelope, Item, ItemType};
use crate::extractors::{RawContentType, RequestMeta};
use crate::service::ServiceState;

async fn handle(
state: ServiceState,
content_type: RawContentType,
meta: RequestMeta,
request: Request,
) -> axum::response::Result<impl IntoResponse> {
let content_type @ (ContentType::Json | ContentType::Protobuf) =
ContentType::from(content_type.as_ref())
else {
return Ok(StatusCode::UNSUPPORTED_MEDIA_TYPE);
};
let payload: Bytes = request.extract().await?;
let mut envelope = Envelope::from_request(None, meta);
envelope.require_feature(Feature::OtelLogsEndpoint);

envelope.add_item({
let mut item = Item::new(ItemType::OtelLogsData);
item.set_payload(content_type, payload);
item
});

common::handle_envelope(&state, envelope).await?;

Ok(StatusCode::ACCEPTED)
}

pub fn route(config: &Config) -> MethodRouter<ServiceState> {
post(handle).route_layer(DefaultBodyLimit::max(config.max_envelope_size()))
}
13 changes: 12 additions & 1 deletion relay-server/src/envelope/item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl Item {
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, item_count)],
// NOTE: semantically wrong, but too expensive to parse.
ItemType::OtelTracesData => smallvec![(DataCategory::Span, item_count)],
ItemType::OtelLogsData => smallvec![
(DataCategory::LogByte, self.len().max(1)),
(DataCategory::LogItem, item_count) // NOTE: semantically wrong, but too expensive to parse.
],
ItemType::ProfileChunk => match self.profile_type() {
Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, item_count)],
Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, item_count)],
Expand Down Expand Up @@ -418,6 +422,7 @@ impl Item {
| ItemType::Log
| ItemType::OtelSpan
| ItemType::OtelTracesData
| ItemType::OtelLogsData
| ItemType::ProfileChunk => false,

// The unknown item type can observe any behavior, most likely there are going to be no
Expand Down Expand Up @@ -453,6 +458,7 @@ impl Item {
ItemType::Log => false,
ItemType::OtelSpan => false,
ItemType::OtelTracesData => false,
ItemType::OtelLogsData => false,
ItemType::ProfileChunk => false,

// Since this Relay cannot interpret the semantics of this item, it does not know
Expand Down Expand Up @@ -530,6 +536,8 @@ pub enum ItemType {
OtelSpan,
/// An OTLP TracesData container.
OtelTracesData,
/// An OTLP LogsData container.
OtelLogsData,
/// UserReport as an Event
UserReportV2,
/// ProfileChunk is a chunk of a profiling session.
Expand Down Expand Up @@ -585,6 +593,7 @@ impl ItemType {
Self::Span => "span",
Self::OtelSpan => "otel_span",
Self::OtelTracesData => "otel_traces_data",
Self::OtelLogsData => "otel_logs_data",
Self::ProfileChunk => "profile_chunk",
Self::Unknown(_) => "unknown",
}
Expand Down Expand Up @@ -635,7 +644,8 @@ impl ItemType {
ItemType::Log => true,
ItemType::Span => true,
ItemType::OtelSpan => true,
ItemType::OtelTracesData => true,
ItemType::OtelTracesData => false,
ItemType::OtelLogsData => false,
ItemType::UserReportV2 => false,
ItemType::ProfileChunk => true,
ItemType::Unknown(_) => true,
Expand Down Expand Up @@ -678,6 +688,7 @@ impl std::str::FromStr for ItemType {
"span" => Self::Span,
"otel_span" => Self::OtelSpan,
"otel_traces_data" => Self::OtelTracesData,
"otel_logs_data" => Self::OtelLogsData,
"profile_chunk" => Self::ProfileChunk,
// "profile_chunk_ui" is to be treated as an alias for `ProfileChunk`
// because Android 8.10.0 and 8.11.0 is sending it as the item type.
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/services/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,8 @@ pub enum DiscardItemType {
OtelSpan,
/// An OTLP TracesData container.
OtelTracesData,
/// An OTLP LogsData container.
OtelLogsData,
/// UserReport as an Event
UserReportV2,
/// ProfileChunk is a chunk of a profiling session.
Expand Down Expand Up @@ -662,6 +664,7 @@ impl DiscardItemType {
Self::Span => "span",
Self::OtelSpan => "otel_span",
Self::OtelTracesData => "otel_traces_data",
Self::OtelLogsData => "otel_logs_data",
Self::UserReportV2 => "user_report_v2",
Self::ProfileChunk => "profile_chunk",
}
Expand Down Expand Up @@ -694,6 +697,7 @@ impl From<&ItemType> for DiscardItemType {
ItemType::Span => Self::Span,
ItemType::OtelSpan => Self::OtelSpan,
ItemType::OtelTracesData => Self::OtelTracesData,
ItemType::OtelLogsData => Self::OtelLogsData,
ItemType::UserReportV2 => Self::UserReportV2,
ItemType::ProfileChunk => Self::ProfileChunk,
ItemType::Unknown(_) => Self::Unknown,
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mod dynamic_sampling;
mod event;
mod metrics;
mod nel;
mod ourlog;
mod profile;
mod profile_chunk;
mod replay;
Expand Down Expand Up @@ -335,7 +336,8 @@ impl ProcessingGroup {
}

// Extract logs.
let logs_items = envelope.take_items_by(|item| matches!(item.ty(), &ItemType::Log));
let logs_items = envelope
.take_items_by(|item| matches!(item.ty(), &ItemType::Log | &ItemType::OtelLogsData));
if !logs_items.is_empty() {
grouped_envelopes.push((
ProcessingGroup::Log,
Expand Down Expand Up @@ -2379,6 +2381,8 @@ impl EnvelopeProcessorService {
if matches!(group, ProcessingGroup::Nel) {
nel::convert_to_logs(&mut managed_envelope);
}
// Convert OTLP logs data to individual log items
ourlog::convert_otel_logs(&mut managed_envelope);
self.process_with_processor(&self.inner.processing.logs, managed_envelope, ctx)
.await
}
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/services/processor/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool {
ItemType::Span => false,
ItemType::OtelSpan => false,
ItemType::OtelTracesData => false,
ItemType::OtelLogsData => false,
ItemType::ProfileChunk => false,

// Without knowing more, `Unknown` items are allowed to be repeated
Expand Down
122 changes: 122 additions & 0 deletions relay-server/src/services/processor/ourlog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use opentelemetry_proto::tonic::common::v1::any_value::Value;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue};
use prost::Message;
use relay_ourlogs::{otel_logs::LogsData, otel_to_sentry_log};
use relay_protocol::Annotated;
use relay_quotas::DataCategory;

use crate::envelope::{ContainerItems, ContentType, Item, ItemContainer, ItemType, WithHeader};
use crate::managed::ManagedEnvelope;
use crate::services::outcome::{DiscardReason, Outcome};

pub fn convert_otel_logs(envelope: &mut ManagedEnvelope) {
let items = envelope
.envelope_mut()
.take_items_by(|item| item.ty() == &ItemType::OtelLogsData);
let mut logs = ContainerItems::new();

for item in items {
match parse_logs_data(item) {
Ok(logs_data) => {
for resource_logs in logs_data.resource_logs {
for scope_logs in resource_logs.scope_logs {
for mut log_record in scope_logs.log_records {
// Denormalize instrumentation scope and resource attributes into every log record.
if let Some(ref scope) = scope_logs.scope {
if !scope.name.is_empty() {
log_record.attributes.push(KeyValue {
key: "instrumentation.name".to_owned(),
value: Some(AnyValue {
value: Some(Value::StringValue(scope.name.clone())),
}),
})
}
if !scope.version.is_empty() {
log_record.attributes.push(KeyValue {
key: "instrumentation.version".to_owned(),
value: Some(AnyValue {
value: Some(Value::StringValue(scope.version.clone())),
}),
})
}
scope.attributes.iter().for_each(|a| {
log_record.attributes.push(KeyValue {
key: format!("instrumentation.{}", a.key),
value: a.value.clone(),
});
});
}
if let Some(ref resource) = resource_logs.resource {
resource.attributes.iter().for_each(|a| {
log_record.attributes.push(KeyValue {
key: format!("resource.{}", a.key),
value: a.value.clone(),
});
});
}

match otel_to_sentry_log(log_record) {
Ok(our_log) => {
logs.push(WithHeader::just(Annotated::new(our_log)));
}
Err(_error) => {
relay_log::debug!(
"Failed to convert OTLP log record to Sentry log"
);
envelope.track_outcome(
Outcome::Invalid(DiscardReason::Internal),
DataCategory::LogItem,
1,
);
}
}
}
}
}
}
Err(reason) => {
relay_log::debug!("Failed to parse OTLP logs data");
envelope.track_outcome(Outcome::Invalid(reason), DataCategory::LogItem, 1);
}
}
}

if logs.is_empty() {
return;
}

let mut item = Item::new(ItemType::Log);
match ItemContainer::from(logs).write_to(&mut item) {
Ok(()) => {
envelope.envelope_mut().add_item(item);
}
Err(err) => {
relay_log::error!("failed to serialize logs: {err}");
envelope.track_outcome(
Outcome::Invalid(DiscardReason::Internal),
DataCategory::LogItem,
1,
);
}
}
}

fn parse_logs_data(item: Item) -> Result<LogsData, DiscardReason> {
match item.content_type() {
Some(&ContentType::Json) => serde_json::from_slice(&item.payload()).map_err(|e| {
relay_log::debug!(
error = &e as &dyn std::error::Error,
"Failed to parse logs data as JSON"
);
DiscardReason::InvalidJson
}),
Some(&ContentType::Protobuf) => LogsData::decode(item.payload()).map_err(|e| {
relay_log::debug!(
error = &e as &dyn std::error::Error,
"Failed to parse logs data as protobuf"
);
DiscardReason::InvalidProtobuf
}),
_ => Err(DiscardReason::ContentType),
}
}
3 changes: 2 additions & 1 deletion relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ fn infer_event_category(item: &Item) -> Option<DataCategory> {
ItemType::Span => None,
ItemType::OtelSpan => None,
ItemType::OtelTracesData => None,
ItemType::OtelLogsData => None,
ItemType::ProfileChunk => None,
ItemType::Unknown(_) => None,
}
Expand Down Expand Up @@ -526,7 +527,7 @@ impl Enforcement {
ItemType::ReplayRecording => !self.replays.is_active(),
ItemType::UserReport => !self.user_reports.is_active(),
ItemType::CheckIn => !self.check_ins.is_active(),
ItemType::Log => {
ItemType::Log | ItemType::OtelLogsData => {
!(self.log_items.is_active() || self.log_bytes.is_active())
}
ItemType::Span | ItemType::OtelSpan | ItemType::OtelTracesData => {
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/utils/sizes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub fn check_envelope_size_limits(
config.max_span_size()
}
ItemType::OtelTracesData => config.max_event_size(), // a spans container similar to `Transaction`
ItemType::OtelLogsData => config.max_event_size(), // a logs container similar to `Transaction`
ItemType::ProfileChunk => config.max_profile_size(),
ItemType::Unknown(_) => NO_LIMIT,
};
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,33 @@ def send_otel_span(

response.raise_for_status()

def send_otel_logs(
self,
project_id,
json=None,
bytes=None,
headers=None,
dsn_key_idx=0,
dsn_key=None,
):

if dsn_key is None:
dsn_key = self.get_dsn_public_key(project_id, dsn_key_idx)

url = f"/api/{project_id}/otlp/v1/logs?sentry_key={dsn_key}"

if json:
headers = {
"Content-Type": "application/json",
**(headers or {}),
}

response = self.post(url, headers=headers, json=json)
else:
response = self.post(url, headers=headers, data=bytes)

response.raise_for_status()

def send_options(self, project_id, headers=None, dsn_key_idx=0):
headers = {
"X-Sentry-Auth": self.get_auth_header(project_id, dsn_key_idx),
Expand Down
Loading
Loading