Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Apply quotas per item #636

Merged
merged 4 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@

## Unreleased

**Features**:

- Sessions and attachments can be rate limited now. These rate limits apply separately from error events, which means that you can continue to send Release Health sessions while you're out of quota with errors. ([#636](https://github.com/getsentry/relay/pull/636))

**Bug Fixes**:

- Outcomes from downstream relays were not forwarded upstream. ([#632](https://github.com/getsentry/relay/pull/632))
- Apply clock drift correction to Release Health sessions and validate timestamps. ([#633](https://github.com/getsentry/relay/pull/633))

**Internal**:

- Restructure the envelope and event ingestion paths into a pipeline and apply rate limits to all envelopes. ([#635](https://github.com/getsentry/relay/pull/635), [#636](https://github.com/getsentry/relay/pull/636))

## 20.6.0

We have switched to [CalVer](https://calver.org/)! Relay's version is always in line with the latest version of [Sentry](https://github.com/getsentry/sentry).
Expand Down
46 changes: 37 additions & 9 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::utils::{self, FormDataIter, FutureExt};
use {
crate::actors::store::{StoreEnvelope, StoreError, StoreForwarder},
crate::service::ServerErrorKind,
crate::utils::EnvelopeLimiter,
failure::ResultExt,
relay_filter::FilterStatKey,
relay_general::protocol::IpAddr,
Expand Down Expand Up @@ -225,6 +226,21 @@ impl ProcessEnvelopeState {
.map(|event| event.ty.value().copied().unwrap_or_default())
}

/// Returns the data category if there is an event.
///
/// The data category is computed from the event type. Both `Default` and `Error` events map to
/// the `Error` data category. If there is no Event, `None` is returned.
#[cfg(feature = "processing")]
fn event_category(&self) -> Option<DataCategory> {
self.event_type().map(|ty| match ty {
EventType::Default | EventType::Error => DataCategory::Error,
EventType::Transaction => DataCategory::Transaction,
EventType::Csp | EventType::Hpkp | EventType::ExpectCT | EventType::ExpectStaple => {
DataCategory::Security
}
})
}

/// Removes the event payload from this processing state.
#[cfg(feature = "processing")]
fn remove_event(&mut self) {
Expand Down Expand Up @@ -880,12 +896,6 @@ impl EventProcessor {

#[cfg(feature = "processing")]
fn enforce_quotas(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> {
// TODO: Enforce quotas for all envelope items. For now, only consider Envelopes with events
// and assume the `Error` data category.
if state.event_type().is_none() {
return Ok(());
}

let rate_limiter = match self.rate_limiter.as_ref() {
Some(rate_limiter) => rate_limiter,
None => return Ok(()),
Expand All @@ -897,18 +907,36 @@ impl EventProcessor {
return Ok(());
}

let mut remove_event = false;
let category = state.event_category();

// When invoking the rate limiter, capture if the event item has been rate limited to also
// remove it from the processing state eventually.
let mut envelope_limiter = EnvelopeLimiter::new(|item_scope, _quantity| {
let limits = rate_limiter.is_rate_limited(quotas, item_scope)?;
remove_event ^= Some(item_scope.category) == category && limits.is_limited();
Ok(limits)
});

// Tell the envelope limiter about the event, since it has been removed from the Envelope at
// this stage in processing.
if let Some(category) = category {
envelope_limiter.assume_event(category);
}

// Fetch scoping again from the project state. This is a rather cheap operation at this
// point and it is easier than passing scoping through all layers of `process_envelope`.
let scoping = project_state.get_scoping(state.envelope.meta());

state.rate_limits = metric!(timer(RelayTimers::EventProcessingRateLimiting), {
rate_limiter
.is_rate_limited(quotas, scoping.item(DataCategory::Error))
envelope_limiter
.enforce(&mut state.envelope, &scoping)
.map_err(ProcessingError::QuotasFailed)?
});

if state.rate_limits.is_limited() {
if remove_event {
state.remove_event();
debug_assert!(state.envelope.is_empty());
}

Ok(())
Expand Down
25 changes: 24 additions & 1 deletion relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Write;
use std::fmt::{self, Write};

use relay_quotas::{
DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
Expand Down Expand Up @@ -131,6 +131,16 @@ where
}
}

/// Assume an event with the given category, even if no item is present in the envelope.
///
/// This ensures that rate limits for the given data category are checked even if there is no
/// matching item in the envelope. Other items are handled according to the rules as if the
/// event item were present.
#[cfg(feature = "processing")]
pub fn assume_event(&mut self, category: DataCategory) {
self.event_category = Some(category);
}

/// Process rate limits for the envelope, removing offending items and returning applied limits.
pub fn enforce(mut self, envelope: &mut Envelope, scoping: &Scoping) -> Result<RateLimits, E> {
self.aggregate(envelope);
Expand Down Expand Up @@ -207,6 +217,19 @@ where
}
}

impl<F> fmt::Debug for EnvelopeLimiter<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EnvelopeLimiter")
.field("event_category", &self.event_category)
.field("attachment_quantity", &self.attachment_quantity)
.field("session_quantity", &self.session_quantity)
.field("remove_event", &self.remove_event)
.field("remove_attachments", &self.remove_attachments)
.field("remove_sessions", &self.remove_sessions)
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
75 changes: 75 additions & 0 deletions tests/integration/test_attachments.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pytest
import time
import uuid

from requests.exceptions import HTTPError

Expand Down Expand Up @@ -132,3 +134,76 @@ def test_empty_attachments_with_processing(
"event_id": event_id,
"project_id": project_id,
}


@pytest.mark.parametrize("rate_limits", [[], ["attachment"]])
def test_attachments_ratelimit(
mini_sentry, relay_with_processing, outcomes_consumer, rate_limits
):
event_id = "515539018c9b4260a6f999572f1661ee"

relay = relay_with_processing()
relay.wait_relay_healthcheck()

project_config = mini_sentry.project_configs[42] = mini_sentry.full_project_config()
project_config["config"]["quotas"] = [
{"categories": rate_limits, "limit": 0, "reasonCode": "static_disabled_quota"}
]

outcomes_consumer = outcomes_consumer()
attachments = [("att_1", "foo.txt", b"")]

# First attachment returns 200 but is rate limited in processing
relay.send_attachments(42, event_id, attachments)
# TODO: There are no outcomes emitted for attachments yet. Instead, sleep to allow Relay to
# process the event and cache the rate limit
# outcomes_consumer.assert_rate_limited("static_disabled_quota")
time.sleep(0.2)

# Second attachment returns 429 in endpoint
with pytest.raises(HTTPError) as excinfo:
relay.send_attachments(42, event_id, attachments)
assert excinfo.value.response.status_code == 429
# outcomes_consumer.assert_rate_limited("static_disabled_quota")


def test_attachments_quotas(
mini_sentry, relay_with_processing, attachments_consumer, outcomes_consumer,
):
event_id = "515539018c9b4260a6f999572f1661ee"

relay = relay_with_processing()
relay.wait_relay_healthcheck()

project_config = mini_sentry.project_configs[42] = mini_sentry.full_project_config()
project_config["config"]["quotas"] = [
{
"id": "test_rate_limiting_{}".format(uuid.uuid4().hex),
"categories": ["attachment"],
"window": 3600,
"limit": 5, # TODO: Test attachment size quota once implemented
"reasonCode": "attachments_exceeded",
}
]

attachments_consumer = attachments_consumer()
outcomes_consumer = outcomes_consumer()
attachments = [("att_1", "foo.txt", b"blabla")]

for i in range(5):
relay.send_attachments(42, event_id, [("att_1", "%s.txt" % i, b"")])
attachment = attachments_consumer.get_individual_attachment()
assert attachment["attachment"]["name"] == "%s.txt" % i

# First attachment returns 200 but is rate limited in processing
relay.send_attachments(42, event_id, attachments)
# TODO: There are no outcomes emitted for attachments yet. Instead, sleep to allow Relay to
# process the event and cache the rate limit
# outcomes_consumer.assert_rate_limited("static_disabled_quota")
time.sleep(0.2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that is too short for sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hoping to get rid of this very soon. The TODO above is related to this -- we need to wait for outcomes eventually.

Right now, it seems that 200ms is enough for this particular test. If it turns out that this is flakey, we can increase it.


# Second attachment returns 429 in endpoint
with pytest.raises(HTTPError) as excinfo:
relay.send_attachments(42, event_id, attachments)
assert excinfo.value.response.status_code == 429
# outcomes_consumer.assert_rate_limited("static_disabled_quota")
26 changes: 22 additions & 4 deletions tests/integration/test_minidump.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,28 @@ def test_minidump_invalid_nested_formdata(mini_sentry, relay):
relay.send_minidump(project_id=project_id, files=attachments)


@pytest.mark.parametrize("rate_limits", [None, ["attachment"], ["transaction"]])
def test_minidump_with_processing(
mini_sentry, relay_with_processing, attachments_consumer
mini_sentry, relay_with_processing, attachments_consumer, rate_limits
):
project_id = 42
relay = relay_with_processing()
relay.wait_relay_healthcheck()
mini_sentry.project_configs[project_id] = mini_sentry.full_project_config()

project_config = mini_sentry.project_configs[42] = mini_sentry.full_project_config()

# Configure rate limits. The transaction rate limit does not affect minidumps. The attachment
# rate limit would affect them, but since minidumps are required for processing they are still
# passed through. Only when "error" is limited will the minidump be rejected.
if rate_limits:
project_config["config"]["quotas"] = [
{
"categories": rate_limits,
"limit": 0,
"reasonCode": "static_disabled_quota",
}
]

attachments_consumer = attachments_consumer()

attachments = [(MINIDUMP_ATTACHMENT_NAME, "minidump.dmp", "MDMP content")]
Expand Down Expand Up @@ -344,13 +359,16 @@ def test_minidump_with_processing(
]


def test_minidump_ratelimit(mini_sentry, relay_with_processing, outcomes_consumer):
@pytest.mark.parametrize("rate_limits", [[], ["error"], ["error", "attachment"]])
def test_minidump_ratelimit(
mini_sentry, relay_with_processing, outcomes_consumer, rate_limits
):
relay = relay_with_processing()
relay.wait_relay_healthcheck()

project_config = mini_sentry.project_configs[42] = mini_sentry.full_project_config()
project_config["config"]["quotas"] = [
{"limit": 0, "reasonCode": "static_disabled_quota",}
{"categories": rate_limits, "limit": 0, "reasonCode": "static_disabled_quota"}
]

outcomes_consumer = outcomes_consumer()
Expand Down
83 changes: 83 additions & 0 deletions tests/integration/test_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from datetime import datetime, timedelta, timezone
import pytest
from requests.exceptions import HTTPError
import six
import uuid


def test_session_with_processing(mini_sentry, relay_with_processing, sessions_consumer):
Expand Down Expand Up @@ -245,3 +249,82 @@ def test_session_release_required(
assert sessions_consumer.poll() is None
assert mini_sentry.test_failures
mini_sentry.test_failures.clear()


def test_session_quotas(mini_sentry, relay_with_processing, sessions_consumer):
relay = relay_with_processing()
relay.wait_relay_healthcheck()

sessions_consumer = sessions_consumer()

project_config = mini_sentry.full_project_config()
project_config["config"]["eventRetention"] = 17
project_config["config"]["quotas"] = [
{
"id": "test_rate_limiting_{}".format(uuid.uuid4().hex),
"categories": ["session"],
"scope": "key",
"scopeId": six.text_type(project_config["publicKeys"][0]["numericId"]),
"window": 3600,
"limit": 5,
"reasonCode": "sessions_exceeded",
}
]
mini_sentry.project_configs[42] = project_config

timestamp = datetime.now(tz=timezone.utc)
started = timestamp - timedelta(hours=1)

session = {
"sid": "8333339f-5675-4f89-a9a0-1c935255ab58",
"timestamp": timestamp.isoformat(),
"started": started.isoformat(),
"attrs": {"release": "sentry-test@1.0.0"},
}

for i in range(5):
relay.send_session(42, session)
sessions_consumer.get_session()

# Rate limited, but responds with 200 because of deferred processing
relay.send_session(42, session)
assert sessions_consumer.poll() is None

with pytest.raises(HTTPError):
relay.send_session(42, session)
assert sessions_consumer.poll() is None


def test_session_disabled(mini_sentry, relay_with_processing, sessions_consumer):
relay = relay_with_processing()
relay.wait_relay_healthcheck()

sessions_consumer = sessions_consumer()

project_config = mini_sentry.full_project_config()
project_config["config"]["eventRetention"] = 17
project_config["config"]["quotas"] = [
{
"categories": ["session"],
"scope": "key",
"scopeId": six.text_type(project_config["publicKeys"][0]["numericId"]),
"limit": 0,
"reasonCode": "sessions_exceeded",
}
]
mini_sentry.project_configs[42] = project_config

timestamp = datetime.now(tz=timezone.utc)
started = timestamp - timedelta(hours=1)

relay.send_session(
42,
{
"sid": "8333339f-5675-4f89-a9a0-1c935255ab58",
"timestamp": timestamp.isoformat(),
"started": started.isoformat(),
"attrs": {"release": "sentry-test@1.0.0"},
},
)

assert sessions_consumer.poll() is None
Loading