Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): Combined Envelope Items (reopen) #3035

Merged
merged 44 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e775838
feat(replay): combined envelope items
JoshFerge May 31, 2023
dc26a9d
only run in processing mode
JoshFerge May 31, 2023
7033ae0
add integration test. fix processing mode stuff, add store logic
JoshFerge Jun 7, 2023
2c9d294
remove processing mode business, use vec instead of Bytes
JoshFerge Jun 7, 2023
aa0c0ce
fix args in combined store func
JoshFerge Jun 7, 2023
9d21e80
fix tests
JoshFerge Jun 7, 2023
d7d18bb
remove unused test
JoshFerge Jun 7, 2023
5ce488b
merge
JoshFerge Jan 31, 2024
5ad1304
get it compiling
JoshFerge Jan 31, 2024
da62bb8
get tests running
JoshFerge Jan 31, 2024
8dbe24c
remove unused imports
JoshFerge Jan 31, 2024
e29cabf
make clippy even more happy
JoshFerge Jan 31, 2024
3b96c66
add changelog entry
JoshFerge Jan 31, 2024
6f7433e
move logic to store
JoshFerge Feb 6, 2024
6f825d4
fix
JoshFerge Feb 6, 2024
aa4d3be
fix merge
JoshFerge Feb 6, 2024
8d06df8
more cleanup
JoshFerge Feb 6, 2024
d8662fa
Update CHANGELOG.md
JoshFerge Feb 6, 2024
91eeda5
remove feature
JoshFerge Feb 6, 2024
848a513
use flag in header
JoshFerge Feb 8, 2024
6399ce8
merge
JoshFerge Feb 8, 2024
0540710
try to debug
JoshFerge Feb 8, 2024
562c5ef
tests working
JoshFerge Feb 8, 2024
43dd578
dont produce two messages
JoshFerge Feb 9, 2024
b9f7156
cleanup
JoshFerge Feb 9, 2024
c8726f3
remove println
JoshFerge Feb 9, 2024
27b508d
fix lint
JoshFerge Feb 9, 2024
f625668
combined_payload not used in non-processing, allow dead code
JoshFerge Feb 9, 2024
8d0e335
Update relay-server/src/envelope.rs
JoshFerge Feb 9, 2024
fa4afd2
simplify processing feature declare on env item flag
JoshFerge Feb 9, 2024
6383eb0
move logic into one func
JoshFerge Feb 9, 2024
9224202
clean refactor
JoshFerge Feb 9, 2024
f56ab35
add one final test / fix impl
JoshFerge Feb 9, 2024
3b52610
small refactor to produce function
JoshFerge Feb 13, 2024
50de8a2
Merge branch 'master' into jferg/combine-replay-env-items
JoshFerge Feb 13, 2024
1ae7767
Merge branch 'master' into jferg/combine-replay-env-items
JoshFerge Feb 13, 2024
35dd886
only produce replay events if replay recording / event exists
JoshFerge Feb 14, 2024
12ce170
Remove redundant boolean condition
cmanallen Feb 15, 2024
70e20f3
Flatten extraction and consider replay_event size when measuring payl…
cmanallen Feb 15, 2024
68c5a02
Replay-event variable is only populated if the flag is enabled
cmanallen Feb 15, 2024
306245a
Remove redundant method
cmanallen Feb 15, 2024
6ad66f9
Remove isize note
cmanallen Feb 15, 2024
2959b9d
Correct boolean condition
cmanallen Feb 15, 2024
b96ca07
Emit outcome if the payload is too large
cmanallen Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Emit a usage metric for total spans. ([#3007](https://github.com/getsentry/relay/pull/3007))
- Drop timestamp from metrics partition key. ([#3025](https://github.com/getsentry/relay/pull/3025))
- Drop spans ending outside the valid timestamp range. ([#3013](https://github.com/getsentry/relay/pull/3013))
- Add support for combining replay envelope items. ([#3035](https://github.com/getsentry/relay/pull/3035))
- Extract INP metrics from spans. ([#2969](https://github.com/getsentry/relay/pull/2969), [#3041](https://github.com/getsentry/relay/pull/3041))
- Add ability to rate limit metric buckets by namespace. ([#2941](https://github.com/getsentry/relay/pull/2941))
- Upgrade sqlparser to 0.43.1.([#3057](https://github.com/getsentry/relay/pull/3057))
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ make build

#### Snapshot tests

We use `insta` for snapshot testing. It will run as part of the `make test` command
We use `insta` for snapshot testing. It will run as part of the `make test` command
to validate schema/protocol changes. To install the `insta` tool for reviewing snapshots run:
```bash
cargo install cargo-insta
Expand Down
4 changes: 4 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ pub enum Feature {
/// Enables data scrubbing of replay recording payloads.
#[serde(rename = "organizations:session-replay-recording-scrubbing")]
SessionReplayRecordingScrubbing,
/// Enables combining session replay envelope items (Replay Recordings and Replay Events).
/// into one Kafka message.
#[serde(rename = "organizations:session-replay-combined-envelope-items")]
SessionReplayCombinedEnvelopeItems,
/// Enables new User Feedback ingest.
///
/// TODO(jferg): rename to UserFeedbackIngest once old UserReport logic is deprecated.
Expand Down
18 changes: 17 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ pub struct ItemHeaders {
#[serde(default, skip)]
rate_limited: bool,

/// Indicates that this item should be combined into one payload with other replay item.
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
replay_combined_payload: bool,

/// Contains the amount of events this item was generated and aggregated from.
///
/// A [metrics buckets](`ItemType::MetricBuckets`) item contains metrics extracted and
Expand Down Expand Up @@ -576,6 +581,7 @@ impl Item {
filename: None,
routing_hint: None,
rate_limited: false,
replay_combined_payload: false,
source_quantities: None,
sample_rates: None,
other: BTreeMap::new(),
Expand Down Expand Up @@ -739,6 +745,17 @@ impl Item {
self.headers.source_quantities = Some(source_quantities);
}

/// Returns if the payload's replay items should be combined into one kafka message.
#[cfg(feature = "processing")]
pub fn replay_combined_payload(&self) -> bool {
self.headers.replay_combined_payload
}

/// Sets the replay_combined_payload for this item.
pub fn set_replay_combined_payload(&mut self, combined_payload: bool) {
self.headers.replay_combined_payload = combined_payload;
}

/// Sets sample rates for this item.
pub fn set_sample_rates(&mut self, sample_rates: Value) {
if matches!(sample_rates, Value::Array(ref a) if !a.is_empty()) {
Expand Down Expand Up @@ -858,7 +875,6 @@ impl Item {
ItemType::UnrealReport => true,
ItemType::UserReport => true,
ItemType::UserReportV2 => true,

ItemType::ReplayEvent => true,
ItemType::Session => false,
ItemType::Sessions => false,
Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/services/processor/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ pub fn process(state: &mut ProcessEnvelopeState, config: &Config) -> Result<(),
user_agent: meta.user_agent(),
client_hints: meta.client_hints().as_deref(),
};
let combined_envelope_items =
project_state.has_feature(Feature::SessionReplayCombinedEnvelopeItems);

state.managed_envelope.retain_items(|item| match item.ty() {
ItemType::ReplayEvent => {
if !replays_enabled {
return ItemAction::DropSilently;
}
if combined_envelope_items {
item.set_replay_combined_payload(true);
}

match process_replay_event(&item.payload(), project_config, client_addr, user_agent) {
Ok(replay) => match replay.to_json() {
Expand Down Expand Up @@ -84,6 +89,9 @@ pub fn process(state: &mut ProcessEnvelopeState, config: &Config) -> Result<(),
if !replays_enabled {
return ItemAction::DropSilently;
}
if combined_envelope_items {
item.set_replay_combined_payload(true);
}

// XXX: Processing is there just for data scrubbing. Skip the entire expensive
// processing step if we do not need to scrub.
Expand Down
97 changes: 87 additions & 10 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ impl StoreService {

let mut attachments = Vec::new();

let mut replay_event = None;
let mut replay_recording = None;
let mut send_combined_replay_envelope = false;

for item in envelope.items() {
match item.ty() {
ItemType::Attachment => {
Expand Down Expand Up @@ -244,16 +248,17 @@ impl StoreService {
item,
)?,
ItemType::ReplayRecording => {
self.produce_replay_recording(event_id, scoping, item, start_time, retention)?
if item.replay_combined_payload() {
send_combined_replay_envelope = true
}
replay_recording = Some(item);
}
ItemType::ReplayEvent => {
if item.replay_combined_payload() {
send_combined_replay_envelope = true
}
cmanallen marked this conversation as resolved.
Show resolved Hide resolved
replay_event = Some(item);
}
ItemType::ReplayEvent => self.produce_replay_event(
event_id.ok_or(StoreError::NoEventId)?,
scoping.organization_id,
scoping.project_id,
start_time,
retention,
item,
)?,
ItemType::CheckIn => self.produce_check_in(
scoping.organization_id,
scoping.project_id,
Expand All @@ -269,6 +274,16 @@ impl StoreService {
}
}

self.produce_replay_messages(
replay_event,
replay_recording,
event_id.ok_or(StoreError::NoEventId)?,
scoping,
start_time,
retention,
send_combined_replay_envelope,
)?;

if event_item.is_none() && attachments.is_empty() {
// No event-related content. All done.
return Ok(());
Expand Down Expand Up @@ -774,6 +789,7 @@ impl StoreService {
event_id: Option<EventId>,
scoping: Scoping,
item: &Item,
replay_event: Option<&Item>,
start_time: Instant,
retention: u16,
) -> Result<(), StoreError> {
Expand All @@ -783,6 +799,11 @@ impl StoreService {
// Remaining bytes can be filled by the payload.
let max_payload_size = self.config.max_replay_message_size() - max_message_metadata_size;

let mut replay_event_payload = None;
if let Some(replay_event) = replay_event {
replay_event_payload = Some(replay_event.payload());
}
cmanallen marked this conversation as resolved.
Show resolved Hide resolved

if item.payload().len() < max_payload_size {
let message =
KafkaMessage::ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage {
Expand All @@ -793,6 +814,7 @@ impl StoreService {
received: UnixTimestamp::from_instant(start_time).as_secs(),
retention_days: retention,
payload: item.payload(),
replay_event: replay_event_payload,
});

self.produce(
Expand All @@ -812,6 +834,61 @@ impl StoreService {
Ok(())
}

#[allow(clippy::too_many_arguments)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what to do about the too many args here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing replay_event, replay_recording, and a boolean flag, the most idiomatic would probably be to pass an enum like

enum Data {
    Event(&Item),
    Recording(&Item),
    Combined { event: &Item, recording: &Item }
}

That would also make states like `(None, None, true)` impossible.

fn produce_replay_messages(
&self,
replay_event: Option<&Item>,
replay_recording: Option<&Item>,
replay_id: EventId,
scoping: Scoping,
start_time: Instant,
retention_days: u16,
send_combined_replay_envelope: bool,
) -> Result<(), StoreError> {
if let Some(replay_event) = replay_event {
// always produce replay event
self.produce_replay_event(
replay_id,
scoping.organization_id,
scoping.project_id,
start_time,
retention_days,
replay_event,
)?;

if let Some(replay_recording) = replay_recording {
/*
produce replay recording with replay event if combined flag is set
otherwise produce replay recording without replay event
*/
self.produce_replay_recording(
Some(replay_id),
scoping,
replay_recording,
if send_combined_replay_envelope {
Some(replay_event)
} else {
None
},
start_time,
retention_days,
)?;
}
} else if let Some(replay_recording) = replay_recording {
// this block in theory should never happen, as SDK always sends replay_event and recording together,
// but just in case, if we only receive a recording without an event, we'll still produce it.
self.produce_replay_recording(
Some(replay_id),
scoping,
replay_recording,
None,
start_time,
retention_days,
)?;
}
Ok(())
}

fn produce_check_in(
&self,
organization_id: u64,
Expand Down Expand Up @@ -1072,7 +1149,6 @@ struct ReplayRecordingChunkKafkaMessage {
/// the tuple (id, chunk_index) is the unique identifier for a single chunk.
chunk_index: usize,
}

#[derive(Debug, Serialize)]
struct ReplayRecordingChunkMeta {
/// The attachment ID within the event.
Expand Down Expand Up @@ -1114,6 +1190,7 @@ struct ReplayRecordingNotChunkedKafkaMessage {
received: u64,
retention_days: u16,
payload: Bytes,
replay_event: Option<Bytes>,
}

/// User report for an event wrapped up in a message ready for consumption in Kafka.
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ def get_chunked_replay(self):
assert v["type"] == "replay_recording", v["type"]
return v

def get_not_chunked_replay(self):
message = self.poll()
def get_not_chunked_replay(self, timeout=None):
message = self.poll(timeout=timeout)
assert message is not None
assert message.error() is None

Expand Down
111 changes: 111 additions & 0 deletions tests/integration/test_replay_combined_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from sentry_sdk.envelope import Envelope, Item, PayloadRef

from .test_replay_recordings import recording_payload
from .test_replay_events import generate_replay_sdk_event
import json


def test_replay_combined_with_processing(
mini_sentry,
relay_with_processing,
replay_recordings_consumer,
replay_events_consumer,
):
project_id = 42
replay_id = "515539018c9b4260a6f999572f1661ee"
relay = relay_with_processing()
mini_sentry.add_basic_project_config(
project_id,
extra={
"config": {
"features": [
"organizations:session-replay",
"organizations:session-replay-combined-envelope-items",
]
}
},
)
replay_recordings_consumer = replay_recordings_consumer()
replay_events_consumer = replay_events_consumer(timeout=10)

envelope = Envelope(
headers=[
[
"event_id",
replay_id,
],
["attachment_type", "replay_recording"],
]
)
payload = recording_payload(b"[]")
envelope.add_item(Item(payload=PayloadRef(bytes=payload), type="replay_recording"))

replay_event = generate_replay_sdk_event(replay_id=replay_id)
envelope.add_item(Item(payload=PayloadRef(json=replay_event), type="replay_event"))

relay.send_envelope(project_id, envelope)

combined_replay_message = replay_recordings_consumer.get_not_chunked_replay(
timeout=10
)

assert combined_replay_message["type"] == "replay_recording_not_chunked"
assert combined_replay_message["replay_id"] == replay_id

assert combined_replay_message["payload"] == payload

replay_event = json.loads(combined_replay_message["replay_event"])

assert replay_event["replay_id"] == replay_id

replay_event, replay_event_message = replay_events_consumer.get_replay_event()
assert replay_event["type"] == "replay_event"
assert replay_event["replay_id"] == replay_id
assert replay_event_message["retention_days"] == 90


def test_replay_combined_with_processing_no_flag_set(
mini_sentry, relay_with_processing, replay_recordings_consumer
):
project_id = 42
replay_id = "515539018c9b4260a6f999572f1661ee"
relay = relay_with_processing()
mini_sentry.add_basic_project_config(
project_id,
extra={
"config": {
"features": [
"organizations:session-replay",
]
}
},
)
replay_recordings_consumer = replay_recordings_consumer()

envelope = Envelope(
headers=[
[
"event_id",
replay_id,
],
["attachment_type", "replay_recording"],
]
)
payload = recording_payload(b"[]")
envelope.add_item(Item(payload=PayloadRef(bytes=payload), type="replay_recording"))

replay_event = generate_replay_sdk_event(replay_id=replay_id)
envelope.add_item(Item(payload=PayloadRef(json=replay_event), type="replay_event"))

relay.send_envelope(project_id, envelope)

replay_recording_message = replay_recordings_consumer.get_not_chunked_replay(
timeout=10
)

assert replay_recording_message["type"] == "replay_recording_not_chunked"
assert replay_recording_message["replay_id"] == replay_id

assert replay_recording_message["payload"] == payload

assert replay_recording_message["replay_event"] is None
Loading
Loading