Skip to content

Commit

Permalink
ref: Remove Compatibility mode for Session Aggregates
Browse files Browse the repository at this point in the history
The compatibility mode was added in #815 because it was waiting on proper support in snuba.
That downstream support was added recently in getsentry/snuba#1492, and compatibility mode was switched off in production via a configuration change.
Also, the snuba change was included in the 21.1 CalVer release, so it is fine to remove compatibility mode and ship this feature in full fidelity in 21.2.
  • Loading branch information
Swatinem committed Jan 20, 2021
1 parent 2974b9c commit cbabfeb
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 225 deletions.
13 changes: 0 additions & 13 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,18 +691,11 @@ fn default_max_rate_limit() -> Option<u32> {
Some(300) // 5 minutes
}

fn default_explode_session_aggregates() -> bool {
true
}

/// Controls Sentry-internal event processing.
#[derive(Serialize, Deserialize, Debug)]
pub struct Processing {
/// True if the Relay should do processing. Defaults to `false`.
pub enabled: bool,
/// Indicates if session aggregates should be exploded into individual session updates.
#[serde(default = "default_explode_session_aggregates")]
pub explode_session_aggregates: bool,
/// GeoIp DB file source.
#[serde(default)]
pub geoip_path: Option<PathBuf>,
Expand Down Expand Up @@ -739,7 +732,6 @@ impl Default for Processing {
fn default() -> Self {
Self {
enabled: false,
explode_session_aggregates: default_explode_session_aggregates(),
geoip_path: None,
max_secs_in_future: default_max_secs_in_future(),
max_secs_in_past: default_max_secs_in_past(),
Expand Down Expand Up @@ -1385,11 +1377,6 @@ impl Config {
self.values.processing.enabled
}

/// Indicates if session aggregates should be exploded into individual session updates.
pub fn explode_session_aggregates(&self) -> bool {
self.values.processing.explode_session_aggregates
}

/// The path to the GeoIp database required for event processing.
pub fn geoip_path(&self) -> Option<&Path> {
self.values.processing.geoip_path.as_deref()
Expand Down
144 changes: 0 additions & 144 deletions relay-general/src/protocol/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,55 +166,6 @@ impl SessionAggregates {
pub fn serialize(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}

/// The total number of sessions in this aggregate.
pub fn num_sessions(&self) -> u32 {
self.aggregates
.iter()
.map(|i| i.exited + i.errored + i.abnormal + i.crashed)
.sum()
}

/// Creates individual session updates from the aggregates.
pub fn into_updates_iter(self) -> impl Iterator<Item = SessionUpdate> {
let attributes = self.attributes;
let mut items = self.aggregates;
let mut item_opt = items.pop();
std::iter::from_fn(move || loop {
let item = item_opt.as_mut()?;

let (status, errors) = if item.exited > 0 {
item.exited -= 1;
(SessionStatus::Exited, 0)
} else if item.errored > 0 {
item.errored -= 1;
// when exploding, we create "legacy" session updates that have no `errored` state
(SessionStatus::Exited, 1)
} else if item.abnormal > 0 {
item.abnormal -= 1;
(SessionStatus::Abnormal, 1)
} else if item.crashed > 0 {
item.crashed -= 1;
(SessionStatus::Crashed, 1)
} else {
item_opt = items.pop();
continue;
};
let attributes = attributes.clone();
return Some(SessionUpdate {
session_id: Uuid::new_v4(),
distinct_id: item.distinct_id.clone(),
sequence: 0,
init: true,
timestamp: Utc::now(),
started: item.started,
duration: None,
status,
errors,
attributes,
});
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -341,99 +292,4 @@ mod tests {
let update = SessionUpdate::parse(json.as_bytes()).unwrap();
assert_eq_dbg!(update.attributes.ip_address, Some(IpAddr::auto()));
}

#[test]
fn test_session_aggregates() {
let json = r#"{
"aggregates": [{
"started": "2020-02-07T14:16:00Z",
"exited": 2,
"abnormal": 1
},{
"started": "2020-02-07T14:17:00Z",
"did": "some-user",
"errored": 1
}],
"attrs": {
"release": "sentry-test@1.0.0",
"environment": "production",
"ip_address": "::1",
"user_agent": "Firefox/72.0"
}
}"#;
let aggregates = SessionAggregates::parse(json.as_bytes()).unwrap();
let mut iter = aggregates.into_updates_iter();

let mut settings = insta::Settings::new();
settings.add_redaction(".timestamp", "[TS]");
settings.add_redaction(".sid", "[SID]");
settings.bind(|| {
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
---
sid: "[SID]"
did: some-user
seq: 0
init: true
timestamp: "[TS]"
started: "2020-02-07T14:17:00Z"
status: exited
errors: 1
attrs:
release: sentry-test@1.0.0
environment: production
ip_address: "::1"
user_agent: Firefox/72.0
"###);
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
---
sid: "[SID]"
did: ~
seq: 0
init: true
timestamp: "[TS]"
started: "2020-02-07T14:16:00Z"
status: exited
errors: 0
attrs:
release: sentry-test@1.0.0
environment: production
ip_address: "::1"
user_agent: Firefox/72.0
"###);
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
---
sid: "[SID]"
did: ~
seq: 0
init: true
timestamp: "[TS]"
started: "2020-02-07T14:16:00Z"
status: exited
errors: 0
attrs:
release: sentry-test@1.0.0
environment: production
ip_address: "::1"
user_agent: Firefox/72.0
"###);
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
---
sid: "[SID]"
did: ~
seq: 0
init: true
timestamp: "[TS]"
started: "2020-02-07T14:16:00Z"
status: abnormal
errors: 1
attrs:
release: sentry-test@1.0.0
environment: production
ip_address: "::1"
user_agent: Firefox/72.0
"###);
});

assert_eq!(iter.next(), None);
}
}
36 changes: 10 additions & 26 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,45 +168,29 @@ impl StoreForwarder {
Ok(session) => session,
Err(_) => return Ok(()),
};

if session.status == SessionStatus::Errored {
// Individual updates should never have the status `errored`
session.status = SessionStatus::Exited;
}
self.produce_session_update(org_id, project_id, event_retention, client, session)?;
self.produce_session_update(org_id, project_id, event_retention, client, session)
}
ItemType::Sessions => {
let aggregates = match SessionAggregates::parse(&item.payload()) {
Ok(aggregates) => aggregates,
Err(_) => return Ok(()),
};

if self.config.explode_session_aggregates() {
if aggregates.num_sessions() as usize > MAX_EXPLODED_SESSIONS {
relay_log::warn!("exploded session items from aggregate exceed threshold");
}

for session in aggregates.into_updates_iter().take(MAX_EXPLODED_SESSIONS) {
self.produce_session_update(
org_id,
project_id,
event_retention,
client,
session,
)?;
}
} else {
self.produce_sessions_from_aggregate(
org_id,
project_id,
event_retention,
client,
aggregates,
)?
}
self.produce_sessions_from_aggregate(
org_id,
project_id,
event_retention,
client,
aggregates,
)
}
_ => {}
_ => Ok(()),
}
Ok(())
}

fn produce_sessions_from_aggregate(
Expand Down
59 changes: 17 additions & 42 deletions tests/integration/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,12 @@ def test_session_with_processing_two_events(


def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consumer):
relay = relay_with_processing({"processing": {"explode_session_aggregates": False}})
relay = relay_with_processing()
sessions_consumer = sessions_consumer()

timestamp = datetime.now(tz=timezone.utc)
started = timestamp - timedelta(hours=1)
started1 = timestamp - timedelta(hours=1)
started2 = started1 - timedelta(hours=1)

project_id = 42
mini_sentry.add_full_project_config(project_id)
Expand All @@ -177,10 +178,13 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume
{
"aggregates": [
{
"started": started.isoformat(),
"started": started1.isoformat(),
"did": "foobarbaz",
"exited": 2,
"errored": 3,
},
{"started": started2.isoformat(),
"abnormal": 1,
}
],
"attrs": {"release": "sentry-test@1.0.0", "environment": "production",},
Expand All @@ -196,7 +200,7 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume
"distinct_id": "367e2499-2b45-586d-814f-778b60144e87",
"quantity": 2,
"seq": 0,
"started": started.timestamp(),
"started": started1.timestamp(),
"duration": None,
"status": "exited",
"errors": 0,
Expand All @@ -215,7 +219,7 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume
"distinct_id": "367e2499-2b45-586d-814f-778b60144e87",
"quantity": 3,
"seq": 0,
"started": started.timestamp(),
"started": started1.timestamp(),
"duration": None,
"status": "errored",
"errors": 1,
Expand All @@ -225,54 +229,25 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume
"sdk": "raven-node/2.6.3",
}


def test_session_aggregates_explode(
mini_sentry, relay_with_processing, sessions_consumer
):
relay = relay_with_processing({"processing": {"explode_session_aggregates": True}})
sessions_consumer = sessions_consumer()

timestamp = datetime.now(tz=timezone.utc)
started = timestamp - timedelta(hours=1)

project_id = 42
mini_sentry.add_full_project_config(project_id)
relay.send_session_aggregates(
project_id,
{
"aggregates": [
{"started": started.isoformat(), "did": "foobarbaz", "exited": 2,}
],
"attrs": {"release": "sentry-test@1.0.0", "environment": "production",},
},
)

expected = {
session = sessions_consumer.get_session()
del session["received"]
assert session == {
"org_id": 1,
"project_id": project_id,
"distinct_id": "367e2499-2b45-586d-814f-778b60144e87",
"session_id": "00000000-0000-0000-0000-000000000000",
"distinct_id": "00000000-0000-0000-0000-000000000000",
"quantity": 1,
"seq": 0,
"started": started.timestamp(),
"started": started2.timestamp(),
"duration": None,
"status": "exited",
"errors": 0,
"status": "abnormal",
"errors": 1,
"release": "sentry-test@1.0.0",
"environment": "production",
"retention_days": 90,
"sdk": "raven-node/2.6.3",
}

session = sessions_consumer.get_session()
del session["session_id"]
del session["received"]
assert session == expected

session = sessions_consumer.get_session()
del session["session_id"]
del session["received"]
assert session == expected


def test_session_with_custom_retention(
mini_sentry, relay_with_processing, sessions_consumer
Expand Down

0 comments on commit cbabfeb

Please sign in to comment.