Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Improved PII Scrubbing for attributes / logs ([#5061](https://github.com/getsentry/relay/pull/5061)))
- Introduces a project scope sampling rule type. ([#5077](https://github.com/getsentry/relay/pull/5077)))
- Produce transactions on `transactions` Kafka topic, even if they have attachments. ([#5081](https://github.com/getsentry/relay/pull/5081))
- Add option gating Snuba publishing to ingest-replay-events for Replays. ([#5088](https://github.com/getsentry/relay/pull/5088))

## 25.8.0

Expand Down
8 changes: 8 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ pub struct Options {
)]
pub http_span_allowed_hosts: Vec<String>,

/// Disables Relay from sending replay-events to Snuba.
#[serde(
rename = "replay.relay-snuba-publishing-disabled",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub replay_relay_snuba_publish_disabled: bool,

/// All other unknown options.
#[serde(flatten)]
other: HashMap<String, Value>,
Expand Down
21 changes: 21 additions & 0 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ impl StoreService {
let mut replay_event = None;
let mut replay_recording = None;

// Whether Relay will submit the replay-event to snuba or not.
let replay_relay_snuba_publish_disabled = self
.global_config
.current()
.options
.replay_relay_snuba_publish_disabled;

for item in envelope.items() {
match item.ty() {
ItemType::Attachment => {
Expand Down Expand Up @@ -324,6 +331,7 @@ impl StoreService {
item.payload(),
received_at,
retention,
replay_relay_snuba_publish_disabled,
)?;
}
ItemType::ReplayRecording => {
Expand All @@ -337,6 +345,7 @@ impl StoreService {
received_at,
retention,
&item.payload(),
replay_relay_snuba_publish_disabled,
)?;
}
ItemType::CheckIn => {
Expand Down Expand Up @@ -423,6 +432,7 @@ impl StoreService {
None,
received_at,
retention,
replay_relay_snuba_publish_disabled,
)?;
}

Expand Down Expand Up @@ -843,7 +853,12 @@ impl StoreService {
received_at: DateTime<Utc>,
retention_days: u16,
payload: &[u8],
relay_snuba_publish_disabled: bool,
) -> Result<(), StoreError> {
if relay_snuba_publish_disabled {
return Ok(());
}

let message = ReplayEventKafkaMessage {
replay_id,
project_id,
Expand All @@ -865,6 +880,7 @@ impl StoreService {
replay_video: Option<&[u8]>,
received_at: DateTime<Utc>,
retention: u16,
relay_snuba_publish_disabled: bool,
) -> Result<(), StoreError> {
// Maximum number of bytes accepted by the consumer.
let max_payload_size = self.config.max_replay_message_size();
Expand Down Expand Up @@ -904,6 +920,7 @@ impl StoreService {
payload,
replay_event,
replay_video,
relay_snuba_publish_disabled,
});

self.produce(KafkaTopic::ReplayRecordings, message)?;
Expand All @@ -918,6 +935,7 @@ impl StoreService {
payload: Bytes,
received_at: DateTime<Utc>,
retention: u16,
relay_snuba_publish_disabled: bool,
) -> Result<(), StoreError> {
#[derive(Deserialize)]
struct VideoEvent<'a> {
Expand Down Expand Up @@ -950,6 +968,7 @@ impl StoreService {
received_at,
retention,
replay_event,
relay_snuba_publish_disabled,
)?;

self.produce_replay_recording(
Expand All @@ -960,6 +979,7 @@ impl StoreService {
Some(replay_video),
received_at,
retention,
relay_snuba_publish_disabled,
)
}

Expand Down Expand Up @@ -1509,6 +1529,7 @@ struct ReplayRecordingNotChunkedKafkaMessage<'a> {
replay_event: Option<&'a [u8]>,
#[serde(with = "serde_bytes")]
replay_video: Option<&'a [u8]>,
relay_snuba_publish_disabled: bool,
}

/// User report for an event wrapped up in a message ready for consumption in Kafka.
Expand Down
32 changes: 30 additions & 2 deletions tests/integration/test_replay_recordings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import zlib

from sentry_sdk.envelope import Envelope, Item, PayloadRef
import pytest

from .test_replay_events import generate_replay_sdk_event


def test_replay_recordings(mini_sentry, relay_chain):
Expand Down Expand Up @@ -30,16 +33,31 @@ def test_replay_recordings(mini_sentry, relay_chain):
assert replay_recording.startswith(b"{}\n") # The body is compressed


@pytest.mark.parametrize(
"value,expected", [(None, False), (False, False), (True, True)]
)
def test_nonchunked_replay_recordings_processing(
mini_sentry, relay_with_processing, replay_recordings_consumer, outcomes_consumer
mini_sentry,
relay_with_processing,
replay_events_consumer,
replay_recordings_consumer,
outcomes_consumer,
value,
expected,
):
project_id = 42
org_id = 0
replay_id = "515539018c9b4260a6f999572f1661ee"
relay = relay_with_processing()

if value is not None:
mini_sentry.global_config["options"][
"replay.relay-snuba-publishing-disabled"
] = value
mini_sentry.add_basic_project_config(
project_id, extra={"config": {"features": ["organizations:session-replay"]}}
)
relay = relay_with_processing()
replay_events_consumer = replay_events_consumer(timeout=10)
replay_recordings_consumer = replay_recordings_consumer()
outcomes_consumer = outcomes_consumer()

Expand All @@ -54,6 +72,8 @@ def test_nonchunked_replay_recordings_processing(
)
payload = recording_payload(b"[]")
envelope.add_item(Item(payload=PayloadRef(bytes=payload), type="replay_recording"))
json_payload = generate_replay_sdk_event()
envelope.add_item(Item(payload=PayloadRef(json=json_payload), type="replay_event"))

relay.send_envelope(project_id, envelope)

Expand All @@ -67,6 +87,14 @@ def test_nonchunked_replay_recordings_processing(
assert replay_recording["retention_days"] == 90
assert replay_recording["payload"] == payload
assert replay_recording["type"] == "replay_recording_not_chunked"
assert replay_recording["relay_snuba_publish_disabled"] is expected

if value is True:
# Nothing produced.
with pytest.raises(AssertionError):
replay_events_consumer.get_replay_event()
else:
assert replay_events_consumer.get_replay_event() is not None

outcomes_consumer.assert_empty()

Expand Down
21 changes: 18 additions & 3 deletions tests/integration/test_replay_videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,33 @@

import msgpack
import json
import pytest


@pytest.mark.parametrize(
"value,expected", [(None, False), (False, False), (True, True)]
)
def test_replay_recording_with_video(
mini_sentry,
relay_with_processing,
replay_recordings_consumer,
outcomes_consumer,
replay_events_consumer,
value,
expected,
):
project_id = 42
org_id = 0
replay_id = "515539018c9b4260a6f999572f1661ee"
relay = relay_with_processing()
if value is not None:
mini_sentry.global_config["options"][
"replay.relay-snuba-publishing-disabled"
] = value
mini_sentry.add_basic_project_config(
project_id,
extra={"config": {"features": ["organizations:session-replay"]}},
)
relay = relay_with_processing()
replay = generate_replay_sdk_event(replay_id)
replay_events_consumer = replay_events_consumer(timeout=10)
replay_recordings_consumer = replay_recordings_consumer()
Expand Down Expand Up @@ -61,6 +71,7 @@ def test_replay_recording_with_video(
assert replay_recording["payload"] == _recording_payload
assert replay_recording["type"] == "replay_recording_not_chunked"
assert replay_recording["replay_event"] is not None
assert replay_recording["relay_snuba_publish_disabled"] is expected

# Assert the replay-video bytes were published to the consumer.
assert replay_recording["replay_video"] == b"hello, world!"
Expand All @@ -70,8 +81,12 @@ def test_replay_recording_with_video(
assert replay_event["type"] == "replay_event"
assert replay_event["replay_id"] == "515539018c9b4260a6f999572f1661ee"

replay_event, _ = replay_events_consumer.get_replay_event()
assert_replay_payload_matches(replay, replay_event)
if value is True:
with pytest.raises(AssertionError):
replay_events_consumer.get_replay_event() # Nothing produced.
else:
replay_event, _ = replay_events_consumer.get_replay_event()
assert_replay_payload_matches(replay, replay_event)

# Assert all conumers are empty.
replay_recordings_consumer.assert_empty()
Expand Down