Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Track outcome source #604

Merged
merged 22 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We have switched to [CalVer](https://calver.org/)! Relay's version is always in

**Internal**:

- Add source (who emitted the outcome) to Outcome payload. ([#604](https://github.com/getsentry/relay/pull/604))
- Ignore non-Rust folders for faster rebuilding and testing. ([#578](https://github.com/getsentry/relay/pull/578))
- Invalid session payloads are now logged for SDK debugging. ([#584](https://github.com/getsentry/relay/pull/584), [#591](https://github.com/getsentry/relay/pull/591))
- Add support for Outcomes generation in external Relays. ([#592](https://github.com/getsentry/relay/pull/592))
Expand Down
21 changes: 18 additions & 3 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ pub struct OverridableConfig {
pub secret_key: Option<String>,
/// The public key of the relay
pub public_key: Option<String>,
/// Outcome source
pub outcome_source: Option<String>,
}

/// The relay credentials
Expand Down Expand Up @@ -714,11 +716,14 @@ pub struct Outcomes {
/// Processing relays always emit outcomes (for backwards compatibility).
pub emit_outcomes: bool,
/// The maximum number of outcomes that are batched before being sent
/// via http to the upstream (only applies to non processing relays)
/// via http to the upstream (only applies to non processing relays).
pub batch_size: usize,
/// The maximum time interval (in milliseconds) that an outcome may be batched
/// via http to the upstream (only applies to non processing relays)
/// via http to the upstream (only applies to non processing relays).
pub batch_interval: u64,
/// Defines the source string registered in the outcomes originating from
/// this Relay (typically something like the region or the layer).
pub source: Option<String>,
}

impl Default for Outcomes {
Expand All @@ -727,6 +732,7 @@ impl Default for Outcomes {
emit_outcomes: false,
batch_size: 1000,
batch_interval: 500,
source: None,
}
}
}
Expand Down Expand Up @@ -834,7 +840,7 @@ impl Config {
/// command line parameters)
pub fn apply_override(
&mut self,
overrides: OverridableConfig,
mut overrides: OverridableConfig,
) -> Result<&mut Self, ConfigError> {
let relay = &mut self.values.relay;

Expand Down Expand Up @@ -909,6 +915,10 @@ impl Config {
} else {
None
};
let mut outcomes = &mut self.values.outcomes;
if overrides.outcome_source.is_some() {
outcomes.source = overrides.outcome_source.take();
}

if let Some(credentials) = &mut self.credentials {
//we have existing credentials we may override some entries
Expand Down Expand Up @@ -1080,6 +1090,11 @@ impl Config {
Duration::from_millis(self.values.outcomes.batch_interval)
}

/// The originating source of the outcome
pub fn outcome_source(&self) -> Option<&str> {
self.values.outcomes.source.as_deref()
}

/// Returns the log level.
pub fn log_level_filter(&self) -> log::LevelFilter {
self.values.logging.level
Expand Down
16 changes: 12 additions & 4 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,13 @@ pub struct TrackRawOutcome {
/// The client ip address.
#[serde(default, skip_serializing_if = "Option::is_none")]
remote_addr: Option<String>,
/// The source of the outcome (which Relay sent it)
#[serde(default, skip_serializing_if = "Option::is_none")]
source: Option<String>,
}

impl From<&TrackOutcome> for TrackRawOutcome {
fn from(msg: &TrackOutcome) -> Self {
impl TrackRawOutcome {
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
fn from_outcome(msg: TrackOutcome, config: &Config) -> Self {
let reason = match msg.outcome.to_reason() {
None => None,
Some(reason) => Some(reason.to_string()),
Expand All @@ -302,6 +305,10 @@ impl From<&TrackOutcome> for TrackRawOutcome {
id => Some(id),
};

// since TrackOutcome objects come only from this Relay (and not any downstream
// Relays), set the source to whatever our current outcome source is.
let source = config.outcome_source().map(str::to_owned);

TrackRawOutcome {
timestamp,
org_id,
Expand All @@ -311,6 +318,7 @@ impl From<&TrackOutcome> for TrackRawOutcome {
reason,
event_id: msg.event_id,
remote_addr: msg.remote_addr.map(|addr| addr.to_string()),
source,
}
}
}
Expand Down Expand Up @@ -468,7 +476,7 @@ mod processing {
type Result = Result<(), OutcomeError>;

fn handle(&mut self, message: TrackOutcome, _ctx: &mut Self::Context) -> Self::Result {
self.handle(TrackRawOutcome::from(&message), _ctx)
self.handle(TrackRawOutcome::from_outcome(message, &self.config), _ctx)
}
}

Expand Down Expand Up @@ -579,6 +587,6 @@ impl Handler<TrackOutcome> for HttpOutcomeProducer {
type Result = Result<(), OutcomeError>;

fn handle(&mut self, message: TrackOutcome, _ctx: &mut Self::Context) -> Self::Result {
self.handle(TrackRawOutcome::from(&message), _ctx)
self.handle(TrackRawOutcome::from_outcome(message, &self.config), _ctx)
}
}
2 changes: 2 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub fn extract_config_args(matches: &ArgMatches) -> OverridableConfig {
id: matches.value_of("id").map(str::to_owned),
public_key: matches.value_of("public_key").map(str::to_owned),
secret_key: matches.value_of("secret_key").map(str::to_owned),
outcome_source: matches.value_of("source_id").map(str::to_owned),
}
}

Expand All @@ -104,6 +105,7 @@ pub fn extract_config_env_vars() -> OverridableConfig {
id: env::var("RELAY_ID").ok(),
public_key: env::var("RELAY_PUBLIC_KEY").ok(),
secret_key: env::var("RELAY_SECRET_KEY").ok(),
outcome_source: None, //already extracted in params
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/cliapp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ pub fn make_app() -> App<'static, 'static> {
.takes_value(true)
.long("redis-url")
.help("Redis server URL."),
)
.arg(
Arg::with_name("source_id")
.value_name("SOURCE_ID")
.takes_value(true)
.long("source-id")
.env("RELAY_SOURCE_ID")
.help("Names the current relay in the outcome source."),
),
)
.subcommand(
Expand Down
47 changes: 47 additions & 0 deletions tests/integration/test_outcome.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,50 @@ def test_outcomes_non_processing_batching(relay, mini_sentry):

# no events received since all have been for an invalid project id
assert mini_sentry.captured_events.empty()


def _send_event(relay):
event_id = uuid.uuid1().hex
message_text = "some message {}".format(datetime.now())
event_body = {
"event_id": event_id,
"message": message_text,
"extra": {"msg_text": message_text},
}

try:
relay.send_event(42, event_body)
except:
pass
return event_id


def test_outcome_source(relay, mini_sentry):
"""
Test that the source is picked from configuration and passed in outcomes
"""
config = {
"outcomes": {
"emit_outcomes": True,
"batch_size": 1,
"batch_interval": 1,
"source": "my-layer",
}
}

relay = relay(mini_sentry, config)
relay.wait_relay_healthcheck()
# hack mini_sentry configures project 42 (remove the configuration so that we get an error for project 42)
mini_sentry.project_configs[42] = None

event_id = _send_event(relay)

outcomes_batch = mini_sentry.captured_outcomes.get(timeout=0.2)
assert mini_sentry.captured_outcomes.qsize() == 0 # we had only one batch

outcomes = outcomes_batch.get("outcomes")
assert len(outcomes) == 1

outcome = outcomes[0]

assert outcome.get("source") == "my-layer"