Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(profiling): Add a Stacktrace ItemType to represent the stacktraces sent from Sentry SDKs #1179

Merged
merged 18 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

- Spread out metric aggregation over the aggregation window to avoid concentrated waves of metrics requests to the upstream every 10 seconds. Relay now applies jitter to `initial_delay` to spread out requests more evenly over time. ([#1185](https://github.com/getsentry/relay/pull/1185))
- Add new statsd metrics for bucketing efficiency ([#1199](https://github.com/getsentry/relay/pull/1199), [#1192](https://github.com/getsentry/relay/pull/1192), [#1200](https://github.com/getsentry/relay/pull/1200))
- Add a `Profile` `ItemType` to represent the profiling data sent from Sentry SDKs. ([#1179](https://github.com/getsentry/relay/pull/1179))

## 22.2.0

Expand All @@ -24,7 +25,7 @@
**Internal**:

- Add an option to dispatch billing outcomes to a dedicated topic. ([#1168](https://github.com/getsentry/relay/pull/1168))
- Add new `ItemType` to handle profiling data. ([#1170](https://github.com/getsentry/relay/pull/1170))
- Add new `ItemType` to handle profiling data from Specto SDKs. ([#1170](https://github.com/getsentry/relay/pull/1170))

**Bug Fixes**:

Expand Down
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ pub enum KafkaTopic {
ProfilingSessions,
/// Profiling traces
ProfilingTraces,
/// Profiles
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
Profiles,
}

/// Configuration for topics.
Expand All @@ -790,6 +792,8 @@ pub struct TopicAssignments {
pub profiling_sessions: TopicAssignment,
/// Profiling traces topic name
pub profiling_traces: TopicAssignment,
/// Stacktrace topic name
pub profiles: TopicAssignment,
}

impl TopicAssignments {
Expand All @@ -805,6 +809,7 @@ impl TopicAssignments {
KafkaTopic::Metrics => &self.metrics,
KafkaTopic::ProfilingSessions => &self.profiling_sessions,
KafkaTopic::ProfilingTraces => &self.profiling_traces,
KafkaTopic::Profiles => &self.profiles,
}
}
}
Expand All @@ -821,6 +826,7 @@ impl Default for TopicAssignments {
metrics: "ingest-metrics".to_owned().into(),
profiling_sessions: "profiling-sessions".to_owned().into(),
profiling_traces: "profiling-traces".to_owned().into(),
profiles: "profiles".to_owned().into(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remember to deploy the prod config in getsentry/ops before deploying this PR.

}
}
}
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ processing = [
[dependencies]
actix = "0.7.9"
actix-web = { version = "0.7.19", default-features = false }
android_trace_log = { version = "0.2.0", features = ["serde"] }
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
base64 = "0.10.1"
brotli2 = "0.3.2"
bytes = { version = "0.4.12", features = ["serde"] }
Expand Down
39 changes: 37 additions & 2 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use {
crate::actors::store::{StoreEnvelope, StoreError, StoreForwarder},
crate::metrics_extraction::transactions::extract_transaction_metrics,
crate::service::ServerErrorKind,
crate::utils::{EnvelopeLimiter, ErrorBoundary},
crate::utils::{EnvelopeLimiter, ErrorBoundary, ProfileError},
failure::ResultExt,
relay_general::store::{GeoIpLookup, StoreConfig, StoreProcessor},
relay_quotas::{RateLimitingError, RedisRateLimiter},
Expand Down Expand Up @@ -159,6 +159,10 @@ enum ProcessingError {

#[fail(display = "event dropped by sampling rule {}", _0)]
EventSampled(RuleId),

#[cfg(feature = "processing")]
#[fail(display = "invalid profile")]
InvalidProfile(#[cause] ProfileError),
}

impl ProcessingError {
Expand All @@ -178,6 +182,9 @@ impl ProcessingError {
#[cfg(feature = "processing")]
Self::InvalidUnrealReport(_) => Some(Outcome::Invalid(DiscardReason::ProcessUnreal)),

#[cfg(feature = "processing")]
Self::InvalidProfile(_) => Some(Outcome::Invalid(DiscardReason::ProcessProfile)),

// Internal errors
Self::SerializeFailed(_)
| Self::ProjectFailed(_)
Expand Down Expand Up @@ -220,6 +227,13 @@ impl From<Unreal4Error> for ProcessingError {
}
}

#[cfg(feature = "processing")]
impl From<ProfileError> for ProcessingError {
fn from(err: ProfileError) -> Self {
ProcessingError::InvalidProfile(err)
}
}

/// Contains the required envelope related information to create an outcome.
#[derive(Clone, Copy, Debug)]
pub struct EnvelopeContext {
Expand Down Expand Up @@ -941,7 +955,9 @@ impl EnvelopeProcessor {
let profiling_enabled = state.project_state.has_feature(Feature::Profiling);
state.envelope.retain_items(|item| {
match item.ty() {
ItemType::ProfilingSession | ItemType::ProfilingTrace => profiling_enabled,
ItemType::ProfilingSession | ItemType::ProfilingTrace | ItemType::Profile => {
profiling_enabled
}
_ => true, // Keep all other item types
}
});
Expand Down Expand Up @@ -1022,6 +1038,23 @@ impl EnvelopeProcessor {
Ok(())
}

/// Expands profile items inside an envelope
#[cfg(feature = "processing")]
fn expand_profile(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> {
let envelope = &mut state.envelope;
let meta = envelope.meta();

if meta.client() != Some(utils::ANDROID_SDK_NAME) {
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}

if let Some(item) = &mut envelope.take_item_by(|item| item.ty() == ItemType::Profile) {
utils::expand_profile_envelope(item)?;
}

Ok(())
}

fn event_from_json_payload(
&self,
item: Item,
Expand Down Expand Up @@ -1252,6 +1285,7 @@ impl EnvelopeProcessor {
ItemType::ClientReport => false,
ItemType::ProfilingSession => false,
ItemType::ProfilingTrace => false,
ItemType::Profile => false,
}
}

Expand Down Expand Up @@ -1761,6 +1795,7 @@ impl EnvelopeProcessor {
if state.creates_event() {
if_processing!({
self.expand_unreal(state)?;
self.expand_profile(state)?;
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
});

self.extract_event(state)?;
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ pub enum DiscardReason {
/// [Relay] The envelope, which contained only a transaction, was discarded by the
/// dynamic sampling rules.
TransactionSampled,

/// [Relay] We failed to parse the profile so we discard the profile.
ProcessProfile,
}

impl DiscardReason {
Expand All @@ -321,6 +324,7 @@ impl DiscardReason {
DiscardReason::SecurityReport => "security_report",
DiscardReason::Cors => "cors",
DiscardReason::ProcessUnreal => "process_unreal",
DiscardReason::ProcessProfile => "process_profile",

// Relay specific reasons (not present in Sentry)
DiscardReason::Payload => "payload",
Expand Down
33 changes: 31 additions & 2 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct Producers {
metrics: Producer,
profiling_sessions: Producer,
profiling_traces: Producer,
profiles: Producer,
}

impl Producers {
Expand All @@ -78,6 +79,7 @@ impl Producers {
KafkaTopic::Metrics => Some(&self.metrics),
KafkaTopic::ProfilingSessions => Some(&self.profiling_sessions),
KafkaTopic::ProfilingTraces => Some(&self.profiling_traces),
KafkaTopic::Profiles => Some(&self.profiles),
}
}
}
Expand Down Expand Up @@ -144,6 +146,7 @@ impl StoreForwarder {
&mut reused_producers,
KafkaTopic::ProfilingTraces,
)?,
profiles: make_producer(&*config, &mut reused_producers, KafkaTopic::Profiles)?,
};

Ok(Self { config, producers })
Expand Down Expand Up @@ -464,6 +467,26 @@ impl StoreForwarder {
)?;
Ok(())
}

fn produce_profile(
&self,
organization_id: u64,
project_id: ProjectId,
item: &Item,
) -> Result<(), StoreError> {
let message = ProfilingKafkaMessage {
organization_id,
project_id,
payload: item.payload(),
};
relay_log::trace!("Sending profile to Kafka");
self.produce(KafkaTopic::Profiles, KafkaMessage::Profile(message))?;
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "profile"
);
Ok(())
}
}

/// StoreMessageForwarder is an async actor since the only thing it does is put the messages
Expand Down Expand Up @@ -650,6 +673,7 @@ enum KafkaMessage {
Metric(MetricKafkaMessage),
ProfilingSession(ProfilingKafkaMessage),
ProfilingTrace(ProfilingKafkaMessage),
Profile(ProfilingKafkaMessage),
}

impl KafkaMessage {
Expand All @@ -663,6 +687,7 @@ impl KafkaMessage {
KafkaMessage::Metric(_) => "metric",
KafkaMessage::ProfilingSession(_) => "profiling_session",
KafkaMessage::ProfilingTrace(_) => "profiling_trace",
KafkaMessage::Profile(_) => "profile",
}
}

Expand All @@ -675,8 +700,9 @@ impl KafkaMessage {
Self::UserReport(message) => message.event_id.0,
Self::Session(message) => message.session_id,
Self::Metric(_message) => Uuid::nil(), // TODO(ja): Determine a partitioning key
Self::ProfilingTrace(_message) => Uuid::nil(), // TODO(pierre): parse session_id as uuid
Self::ProfilingSession(_message) => Uuid::nil(), // TODO(pierre): parse session_id as uuid
Self::ProfilingTrace(_message) => Uuid::nil(),
Self::ProfilingSession(_message) => Uuid::nil(),
Self::Profile(_message) => Uuid::nil(),
};

if uuid.is_nil() {
Expand Down Expand Up @@ -795,6 +821,9 @@ impl Handler<StoreEnvelope> for StoreForwarder {
scoping.project_id,
item,
)?,
ItemType::Profile => {
self.produce_profile(scoping.organization_id, scoping.project_id, item)?
}
_ => {}
}
}
Expand Down
7 changes: 6 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pub enum ItemType {
ProfilingSession,
/// Profiling data
ProfilingTrace,
/// Profile event payload encoded in JSON
Profile,
}

impl ItemType {
Expand Down Expand Up @@ -139,6 +141,7 @@ impl fmt::Display for ItemType {
Self::ClientReport => write!(f, "client report"),
Self::ProfilingSession => write!(f, "profiling session"),
Self::ProfilingTrace => write!(f, "profiling trace"),
Self::Profile => write!(f, "profile"),
}
}
}
Expand Down Expand Up @@ -584,7 +587,8 @@ impl Item {
| ItemType::MetricBuckets
| ItemType::ClientReport
| ItemType::ProfilingSession
| ItemType::ProfilingTrace => false,
| ItemType::ProfilingTrace
| ItemType::Profile => false,
}
}

Expand All @@ -608,6 +612,7 @@ impl Item {
ItemType::ClientReport => false,
ItemType::ProfilingSession => false,
ItemType::ProfilingTrace => false,
ItemType::Profile => false,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mod kafka;
#[cfg(feature = "processing")]
mod native;
#[cfg(feature = "processing")]
mod profile;
#[cfg(feature = "processing")]
mod unreal;

pub use self::actix::*;
Expand All @@ -38,4 +40,6 @@ pub use self::kafka::*;
#[cfg(feature = "processing")]
pub use self::native::*;
#[cfg(feature = "processing")]
pub use self::profile::*;
#[cfg(feature = "processing")]
pub use self::unreal::*;