Skip to content

Commit

Permalink
feat: Add Session Aggregates as new Item
Browse files Browse the repository at this point in the history
The new sessions item has pre-aggregated counts for different session
outcomes.

It is configurable if the aggregation should be exploded into individual
session updates, or rather sent as aggregates to the kafka topic.
  • Loading branch information
Swatinem committed Nov 18, 2020
1 parent cc63c7c commit 458c348
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 18 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

**Features**:

- Relay is now able to ingest pre-aggregated sessions, which will make it possible to efficiently handle applications that produce thousands of sessions per second. ([#815](https://github.com/getsentry/relay/pull/815))

## 20.11.1

- No documented changes.
Expand Down
13 changes: 13 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,11 +734,18 @@ fn default_max_rate_limit() -> Option<u32> {
Some(300) // 5 minutes
}

fn default_explode_session_aggregates() -> bool {
true
}

/// Controls Sentry-internal event processing.
#[derive(Serialize, Deserialize, Debug)]
pub struct Processing {
/// True if the Relay should do processing. Defaults to `false`.
pub enabled: bool,
/// Indicates if session aggregates should be exploded into individual session updates.
#[serde(default = "default_explode_session_aggregates")]
pub explode_session_aggregates: bool,
/// GeoIp DB file source.
#[serde(default)]
pub geoip_path: Option<PathBuf>,
Expand Down Expand Up @@ -775,6 +782,7 @@ impl Default for Processing {
fn default() -> Self {
Self {
enabled: false,
explode_session_aggregates: default_explode_session_aggregates(),
geoip_path: None,
max_secs_in_future: 0,
max_secs_in_past: 0,
Expand Down Expand Up @@ -1434,6 +1442,11 @@ impl Config {
self.values.processing.enabled
}

/// Indicates if session aggregates should be exploded into individual session updates.
pub fn explode_session_aggregates(&self) -> bool {
self.values.processing.explode_session_aggregates
}

/// The path to the GeoIp database required for event processing.
pub fn geoip_path(&self) -> Option<&Path> {
self.values.processing.geoip_path.as_deref()
Expand Down
4 changes: 3 additions & 1 deletion relay-general/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Reques
#[cfg(feature = "jsonschema")]
pub use self::schema::event_json_schema;
pub use self::security_report::{Csp, ExpectCt, ExpectStaple, Hpkp, SecurityReportType};
pub use self::session::{ParseSessionStatusError, SessionAttributes, SessionStatus, SessionUpdate};
pub use self::session::{
ParseSessionStatusError, SessionAggregates, SessionAttributes, SessionStatus, SessionUpdate,
};
pub use self::span::Span;
pub use self::stacktrace::{Frame, FrameData, FrameVars, RawStacktrace, Stacktrace};
pub use self::tags::{TagEntry, Tags};
Expand Down
195 changes: 195 additions & 0 deletions relay-general/src/protocol/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub enum SessionStatus {
Crashed,
/// The session had an unexpected abrupt termination (not crashing).
Abnormal,
/// The session exited cleanly but experienced some errors during its run.
Errored,
}

impl Default for SessionStatus {
Expand All @@ -39,6 +41,7 @@ derive_fromstr_and_display!(SessionStatus, ParseSessionStatusError, {
SessionStatus::Crashed => "crashed",
SessionStatus::Abnormal => "abnormal",
SessionStatus::Exited => "exited",
SessionStatus::Errored => "errored",
});

/// Additional attributes for Sessions.
Expand Down Expand Up @@ -117,6 +120,103 @@ impl SessionUpdate {
}
}

#[allow(clippy::trivially_copy_pass_by_ref)]
fn is_zero(val: &u32) -> bool {
*val == 0
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SessionAggregateItem {
/// The timestamp of when the session itself started.
pub started: DateTime<Utc>,
/// The distinct identifier.
#[serde(rename = "did", default, skip_serializing_if = "Option::is_none")]
pub distinct_id: Option<String>,
/// The number of exited sessions that ocurred.
#[serde(default, skip_serializing_if = "is_zero")]
pub exited: u32,
/// The number of errored sessions that ocurred, not including the abnormal and crashed ones.
#[serde(default, skip_serializing_if = "is_zero")]
pub errored: u32,
/// The number of abnormal sessions that ocurred.
#[serde(default, skip_serializing_if = "is_zero")]
pub abnormal: u32,
/// The number of crashed sessions that ocurred.
#[serde(default, skip_serializing_if = "is_zero")]
pub crashed: u32,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SessionAggregates {
/// A batch of sessions that were started.
#[serde(default)]
pub aggregates: Vec<SessionAggregateItem>,
/// The shared session event attributes.
#[serde(rename = "attrs")]
pub attributes: SessionAttributes,
}

impl SessionAggregates {
/// Parses a session batch from JSON.
pub fn parse(payload: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(payload)
}

/// Serializes a session batch back into JSON.
pub fn serialize(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}

/// The total number of sessions in this aggregate.
pub fn num_sessions(&self) -> u32 {
self.aggregates
.iter()
.map(|i| i.exited + i.errored + i.abnormal + i.crashed)
.sum()
}

/// Creates individual session updates from the aggregates.
pub fn into_updates_iter(self) -> impl Iterator<Item = SessionUpdate> {
let attributes = self.attributes;
let mut items = self.aggregates;
let mut item_opt = items.pop();
std::iter::from_fn(move || loop {
let item = item_opt.as_mut()?;

let (status, errors) = if item.exited > 0 {
item.exited -= 1;
(SessionStatus::Exited, 0)
} else if item.errored > 0 {
item.errored -= 1;
// when exploding, we create "legacy" session updates that have no `errored` state
(SessionStatus::Exited, 1)
} else if item.abnormal > 0 {
item.abnormal -= 1;
(SessionStatus::Abnormal, 1)
} else if item.crashed > 0 {
item.crashed -= 1;
(SessionStatus::Crashed, 1)
} else {
item_opt = items.pop();
continue;
};
let attributes = attributes.clone();
return Some(SessionUpdate {
session_id: Uuid::new_v4(),
distinct_id: item.distinct_id.clone(),
sequence: 0,
init: true,
timestamp: Utc::now(),
started: item.started,
duration: None,
status,
errors,
attributes,
});
})
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -241,4 +341,99 @@ mod tests {
let update = SessionUpdate::parse(json.as_bytes()).unwrap();
assert_eq_dbg!(update.attributes.ip_address, Some(IpAddr::auto()));
}

#[test]
fn test_session_aggregates() {
let json = r#"{
"aggregates": [{
"started": "2020-02-07T14:16:00Z",
"exited": 2,
"abnormal": 1
},{
"started": "2020-02-07T14:17:00Z",
"did": "some-user",
"errored": 1
}],
"attrs": {
"release": "sentry-test@1.0.0",
"environment": "production",
"ip_address": "::1",
"user_agent": "Firefox/72.0"
}
}"#;
let aggregates = SessionAggregates::parse(json.as_bytes()).unwrap();
let mut iter = aggregates.into_updates_iter();

let mut settings = insta::Settings::new();
settings.add_redaction(".timestamp", "[TS]");
settings.add_redaction(".sid", "[SID]");
settings.bind(|| {
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
---
sid: "[SID]"
did: some-user
seq: 0
init: true
timestamp: "[TS]"
started: "2020-02-07T14:17:00Z"
status: exited
errors: 1
attrs:
release: sentry-test@1.0.0
environment: production
ip_address: "::1"
user_agent: Firefox/72.0
"###);
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
---
sid: "[SID]"
did: ~
seq: 0
init: true
timestamp: "[TS]"
started: "2020-02-07T14:16:00Z"
status: exited
errors: 0
attrs:
release: sentry-test@1.0.0
environment: production
ip_address: "::1"
user_agent: Firefox/72.0
"###);
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
---
sid: "[SID]"
did: ~
seq: 0
init: true
timestamp: "[TS]"
started: "2020-02-07T14:16:00Z"
status: exited
errors: 0
attrs:
release: sentry-test@1.0.0
environment: production
ip_address: "::1"
user_agent: Firefox/72.0
"###);
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
---
sid: "[SID]"
did: ~
seq: 0
init: true
timestamp: "[TS]"
started: "2020-02-07T14:16:00Z"
status: abnormal
errors: 1
attrs:
release: sentry-test@1.0.0
environment: production
ip_address: "::1"
user_agent: Firefox/72.0
"###);
});

assert_eq!(iter.next(), None);
}
}
1 change: 1 addition & 0 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ impl EventProcessor {

// session data is never considered as part of deduplication
ItemType::Session => false,
ItemType::Sessions => false,
}
}

Expand Down
Loading

0 comments on commit 458c348

Please sign in to comment.