Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client-reports): Protocol support for client reports #1081

Merged
merged 7 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Add sampling based on transaction name. ([#1058](https://github.com/getsentry/relay/pull/1058))
- Support running Relay without config directory. The most important configuration, including Relay mode and credentials, can now be provided through commandline arguments or environment variables alone. ([#1055](https://github.com/getsentry/relay/pull/1055)
- Protocol support for client reports. ([#1081](https://github.com/getsentry/relay/pull/1081))
- Extract session metrics in non processing relays. ([#1073](https://github.com/getsentry/relay/pull/1073))

**Bug Fixes**:
Expand Down
6 changes: 6 additions & 0 deletions relay-common/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ pub enum DataCategory {
Attachment = 4,
/// Session updates. Quantity is the number of updates in the batch.
Session = 5,
/// Reserved data category that shall not appear in the outcomes.
Internal = -2,
/// Any other data category not known by this Relay.
#[serde(other)]
Unknown = -1,
Expand All @@ -122,6 +124,7 @@ impl DataCategory {
"security" => Self::Security,
"attachment" => Self::Attachment,
"session" => Self::Session,
"internal" => Self::Internal,
_ => Self::Unknown,
}
}
Expand All @@ -135,6 +138,7 @@ impl DataCategory {
Self::Security => "security",
Self::Attachment => "attachment",
Self::Session => "session",
Self::Internal => "internal",
Self::Unknown => "unknown",
}
}
Expand All @@ -146,6 +150,8 @@ impl DataCategory {

/// Returns the numeric value for this outcome.
pub fn value(self) -> Option<u8> {
// negative values (Internal and Unknown) cannot be sent as
// outcomes (internally so!)
(self as i8).try_into().ok()
}
}
Expand Down
14 changes: 13 additions & 1 deletion relay-common/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::fmt;
use std::time::{Duration, Instant, SystemTime};

use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use serde::{Deserialize, Serialize};

/// Converts an `Instant` into a `SystemTime`.
Expand Down Expand Up @@ -83,6 +83,11 @@ impl UnixTimestamp {
self.0
}

/// Returns the timestamp as chrono datetime.
pub fn as_datetime(self) -> DateTime<Utc> {
DateTime::from_utc(NaiveDateTime::from_timestamp(self.0 as i64, 0), Utc)
}

/// Converts the UNIX timestamp into an `Instant` based on the current system timestamp.
///
/// Returns [`MonotonicResult::Instant`] if the timestamp can be represented. Otherwise, returns
Expand Down Expand Up @@ -139,13 +144,20 @@ impl std::ops::Sub for UnixTimestamp {
}
}

#[derive(Debug)]
/// An error returned from parsing [`UnixTimestamp`].
pub struct ParseUnixTimestampError(());

impl std::str::FromStr for UnixTimestamp {
type Err = ParseUnixTimestampError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(datetime) = s.parse::<DateTime<Utc>>() {
let timestamp = datetime.timestamp();
if timestamp >= 0 {
return Ok(UnixTimestamp(timestamp as u64));
}
}
let ts = s.parse().or(Err(ParseUnixTimestampError(())))?;
Ok(Self(ts))
}
Expand Down
36 changes: 36 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ struct Limits {
max_attachment_size: ByteSize,
/// The maximum combined size for all attachments in an envelope or request.
max_attachments_size: ByteSize,
/// The maximum combined size for all client reports in an envelope or request.
max_client_reports_size: ByteSize,
/// The maximum payload size for an entire envelopes. Individual limits still apply.
max_envelope_size: ByteSize,
/// The maximum number of session items per envelope.
Expand Down Expand Up @@ -539,6 +541,7 @@ impl Default for Limits {
max_event_size: ByteSize::mebibytes(1),
max_attachment_size: ByteSize::mebibytes(100),
max_attachments_size: ByteSize::mebibytes(100),
max_client_reports_size: ByteSize::kibibytes(4),
max_envelope_size: ByteSize::mebibytes(100),
max_session_count: 100,
max_api_payload_size: ByteSize::mebibytes(20),
Expand Down Expand Up @@ -824,6 +827,8 @@ pub struct Outcomes {
/// Controls whether outcomes will be emitted when processing is disabled.
/// Processing relays always emit outcomes (for backwards compatibility).
pub emit_outcomes: bool,
/// Controls wheather client reported outcomes should be emitted.
pub emit_client_outcomes: bool,
/// The maximum number of outcomes that are batched before being sent
/// via http to the upstream (only applies to non processing relays).
pub batch_size: usize,
Expand All @@ -839,6 +844,7 @@ impl Default for Outcomes {
fn default() -> Self {
Outcomes {
emit_outcomes: false,
emit_client_outcomes: true,
batch_size: 1000,
batch_interval: 500,
source: None,
Expand Down Expand Up @@ -1017,6 +1023,18 @@ impl Config {
Ok(config)
}

/// Creates a config from a JSON value.
///
/// This is mostly useful for tests.
pub fn from_json_value(value: serde_json::Value) -> Result<Config, ConfigError> {
Ok(Config {
values: serde_json::from_value(value)
.map_err(|err| ConfigError::wrap(err, ConfigErrorKind::BadJson))?,
credentials: None,
path: PathBuf::new(),
})
}

/// Override configuration with values coming from other sources (e.g. env variables or
/// command line parameters)
pub fn apply_override(
Expand Down Expand Up @@ -1307,6 +1325,19 @@ impl Config {
self.values.outcomes.emit_outcomes || self.values.processing.enabled
}

/// Returns whether this Relay should emit client outcomes
///
/// Relays that do not emit client outcomes will forward client recieved outcomes
/// directly to the next relay in the chain as client report envelope. This is only done
/// if this relay emits outcomes at all. A relay that will not emit outcomes
/// will forward the envelope unchanged.
///
/// This flag can be explicitly disabled on processing relays as well to prevent the
/// emitting of client outcomes to the kafka topic.
pub fn emit_client_outcomes(&self) -> bool {
self.values.outcomes.emit_client_outcomes
}

/// Returns the maximum number of outcomes that are batched before being sent
pub fn outcome_batch_size(&self) -> usize {
self.values.outcomes.batch_size
Expand Down Expand Up @@ -1451,6 +1482,11 @@ impl Config {
self.values.limits.max_attachments_size.as_bytes()
}

/// Returns the maxmium combined size of client reports in bytes.
pub fn max_client_reports_size(&self) -> usize {
self.values.limits.max_client_reports_size.as_bytes()
}

/// Returns the maximum size of an envelope payload in bytes.
///
/// Individual item size limits still apply.
Expand Down
81 changes: 81 additions & 0 deletions relay-general/src/protocol/client_report.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use relay_common::{DataCategory, UnixTimestamp};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct DiscardedEvent {
pub reason: String,
pub category: DataCategory,
pub quantity: u32,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ClientReport {
/// The timestamp of when the report was created.
pub timestamp: Option<UnixTimestamp>,
/// Discard reason counters.
pub discarded_events: Vec<DiscardedEvent>,
}

impl ClientReport {
/// Parses a client report update from JSON.
pub fn parse(payload: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(payload)
}

/// Serializes a client report update back into JSON.
pub fn serialize(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_client_report_roundtrip() {
let json = r#"{
"timestamp": "2020-02-07T15:17:00Z",
"discarded_events": [
{"reason": "foo_reason", "category": "error", "quantity": 42},
{"reason": "foo_reason", "category": "transaction", "quantity": 23}
]
}"#;

let output = r#"{
"timestamp": 1581088620,
"discarded_events": [
{
"reason": "foo_reason",
"category": "error",
"quantity": 42
},
{
"reason": "foo_reason",
"category": "transaction",
"quantity": 23
}
]
}"#;

let update = ClientReport {
timestamp: Some("2020-02-07T15:17:00Z".parse().unwrap()),
discarded_events: vec![
DiscardedEvent {
reason: "foo_reason".into(),
category: DataCategory::Error,
quantity: 42,
},
DiscardedEvent {
reason: "foo_reason".into(),
category: DataCategory::Transaction,
quantity: 23,
},
],
};

let parsed = ClientReport::parse(json.as_bytes()).unwrap();
assert_eq_dbg!(update, parsed);
assert_eq_str!(output, serde_json::to_string_pretty(&update).unwrap());
}
}
2 changes: 2 additions & 0 deletions relay-general/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

mod breadcrumb;
mod breakdowns;
mod client_report;
mod clientsdk;
mod constants;
mod contexts;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub use sentry_release_parser::{validate_environment, validate_release};

pub use self::breadcrumb::Breadcrumb;
pub use self::breakdowns::Breakdowns;
pub use self::client_report::ClientReport;
pub use self::clientsdk::{ClientSdkInfo, ClientSdkPackage};
pub use self::constants::VALID_PLATFORMS;
pub use self::contexts::{
Expand Down
2 changes: 1 addition & 1 deletion relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl CategoryUnit {
| DataCategory::Security => Some(Self::Count),
DataCategory::Attachment => Some(Self::Bytes),
DataCategory::Session => Some(Self::Batched),
DataCategory::Unknown => None,
DataCategory::Internal | DataCategory::Unknown => None,
}
}
}
Expand Down
Loading