Skip to content

Commit

Permalink
Merge branch 'master' into feature/session-batches
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem committed Oct 30, 2020
2 parents d6c4d4f + 464e6f5 commit 91c877f
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 29 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@
- Add options for metrics buffering (`metrics.buffering`) and sampling (`metrics.sample_rate`). ([#821](https://github.com/getsentry/relay/pull/821))
- Add a new `session_aggregates` ItemType which contains aggregated session numbers. ([#815](https://github.com/getsentry/relay/pull/815))

**Bug Fixes**:

- Accept sessions with IP address set to `{{auto}}`. This was previously rejected and silently dropped. ([#827](https://github.com/getsentry/relay/pull/827))

**Internal**:

- Always apply cache debouncing for project states. This reduces pressure on the Redis and file system cache. ([#819](https://github.com/getsentry/relay/pull/819))
- Discard invalid user feedback sent as part of envelope. ([#823](https://github.com/getsentry/relay/pull/823))
- Emit event errors and normalization errors for unknown breadcrumb keys. ([#824](https://github.com/getsentry/relay/pull/824))
- Normalize `breadcrumb.ty` into `breadcrumb.type` for broken Python SDK versions. ([#824](https://github.com/getsentry/relay/pull/824))

## 20.10.1

Expand Down
18 changes: 17 additions & 1 deletion relay-general/src/protocol/breadcrumb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub struct Breadcrumb {
///
/// Such a breadcrumb's `data` property has the fields `url`, `method`, `status_code`
/// (integer) and `reason` (string).
#[metastructure(field = "type", max_chars = "enumlike")]
#[metastructure(field = "type", legacy_alias = "ty", max_chars = "enumlike")]
pub ty: Annotated<String>,

/// A dotted string indicating what the crumb is or from where it comes. _Optional._
Expand Down Expand Up @@ -183,3 +183,19 @@ fn test_breadcrumb_default_values() {
assert_eq_dbg!(breadcrumb, Annotated::from_json(input).unwrap());
assert_eq_str!(output, breadcrumb.to_json().unwrap());
}

#[test]
fn test_python_ty_regression() {
// The Python SDK used to send "ty" instead of "type". We're lenient to accept both.
let input = r#"{"timestamp":946684800,"ty":"http"}"#;
let output = r#"{"timestamp":946684800.0,"type":"http"}"#;

let breadcrumb = Annotated::new(Breadcrumb {
timestamp: Annotated::new(Utc.ymd(2000, 1, 1).and_hms(0, 0, 0).into()),
ty: Annotated::new("http".into()),
..Default::default()
});

assert_eq_dbg!(breadcrumb, Annotated::from_json(input).unwrap());
assert_eq_str!(output, breadcrumb.to_json().unwrap());
}
19 changes: 17 additions & 2 deletions relay-general/src/protocol/session.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::net::IpAddr;
use std::time::SystemTime;

use chrono::{DateTime, Utc};
use failure::Fail;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::protocol::IpAddr;

/// The type of session event we're dealing with.
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -314,7 +315,7 @@ mod tests {
attributes: SessionAttributes {
release: "sentry-test@1.0.0".to_owned(),
environment: Some("production".to_owned()),
ip_address: Some("::1".parse().unwrap()),
ip_address: Some(IpAddr::parse("::1").unwrap()),
user_agent: Some("Firefox/72.0".to_owned()),
},
};
Expand Down Expand Up @@ -417,4 +418,18 @@ mod tests {

assert_eq!(iter.next(), None);
}

#[test]
fn test_session_ip_addr_auto() {
let json = r#"{
"started": "2020-02-07T14:16:00Z",
"attrs": {
"release": "sentry-test@1.0.0",
"ip_address": "{{auto}}"
}
}"#;

let update = SessionUpdate::parse(json.as_bytes()).unwrap();
assert_eq_dbg!(update.attributes.ip_address, Some(IpAddr::auto()));
}
}
76 changes: 72 additions & 4 deletions relay-general/src/store/remove_other.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use crate::processor::{ProcessValue, ProcessingState, Processor};
use crate::protocol::Event;
use crate::protocol::{Breadcrumb, Event};
use crate::types::{Annotated, ErrorKind, Meta, Object, ProcessingResult, Value};

/// Replace remaining values and all existing meta with an errors.
fn create_errors(other: &mut Object<Value>) {
for value in other.values_mut() {
*value = Annotated::from_error(ErrorKind::InvalidAttribute, None);
}
}

pub struct RemoveOtherProcessor;

impl Processor for RemoveOtherProcessor {
Expand All @@ -19,6 +26,22 @@ impl Processor for RemoveOtherProcessor {
Ok(())
}

fn process_breadcrumb(
&mut self,
breadcrumb: &mut Breadcrumb,
_meta: &mut Meta,
state: &ProcessingState<'_>,
) -> ProcessingResult {
// Move the current map out so we don't clear it in `process_other`
let mut other = std::mem::take(&mut breadcrumb.other);
create_errors(&mut other);

// Recursively clean all `other`s now. Note that this won't touch the event's other
breadcrumb.process_child_values(self, state)?;
breadcrumb.other = other;
Ok(())
}

fn process_event(
&mut self,
event: &mut Event,
Expand All @@ -39,9 +62,7 @@ impl Processor for RemoveOtherProcessor {
other.remove("query");

// Replace remaining values and all existing meta with an errors
for value in other.values_mut() {
*value = Annotated::from_error(ErrorKind::InvalidAttribute, None);
}
create_errors(&mut other);

// Recursively clean all `other`s now. Note that this won't touch the event's other
event.process_child_values(self, state)?;
Expand Down Expand Up @@ -173,3 +194,50 @@ fn test_retain_context_other() {
&contexts
);
}

#[test]
fn test_breadcrumb_errors() {
use crate::protocol::Values;

let mut event = Annotated::new(Event {
breadcrumbs: Annotated::new(Values::new(vec![Annotated::new(Breadcrumb {
other: {
let mut other = Object::new();
other.insert("foo".to_string(), Value::U64(42).into());
other.insert("bar".to_string(), Value::U64(42).into());
other
},
..Breadcrumb::default()
})])),
..Default::default()
});

process_value(
&mut event,
&mut RemoveOtherProcessor,
ProcessingState::root(),
)
.unwrap();

let other = &event
.value()
.unwrap()
.breadcrumbs
.value()
.unwrap()
.values
.value()
.unwrap()[0]
.value()
.unwrap()
.other;

assert_eq_dbg!(
*other.get("foo").unwrap(),
Annotated::from_error(ErrorKind::InvalidAttribute, None)
);
assert_eq_dbg!(
*other.get("bar").unwrap(),
Annotated::from_error(ErrorKind::InvalidAttribute, None)
);
}
114 changes: 92 additions & 22 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use relay_config::{Config, HttpEncoding, RelayMode};
use relay_general::pii::{PiiAttachmentsProcessor, PiiProcessor};
use relay_general::processor::{process_value, ProcessingState};
use relay_general::protocol::{
Breadcrumb, Csp, Event, EventId, EventType, ExpectCt, ExpectStaple, Hpkp, LenientString,
Metrics, SecurityReportType, SessionUpdate, Timestamp, Values,
Breadcrumb, Csp, Event, EventId, EventType, ExpectCt, ExpectStaple, Hpkp, IpAddr,
LenientString, Metrics, SecurityReportType, SessionUpdate, Timestamp, UserReport, Values,
};
use relay_general::store::ClockDriftProcessor;
use relay_general::types::{Annotated, Array, Object, ProcessingAction, Value};
Expand All @@ -45,7 +45,6 @@ use {
failure::ResultExt,
minidump::Minidump,
relay_filter::FilterStatKey,
relay_general::protocol::IpAddr,
relay_general::store::{GeoIpLookup, StoreConfig, StoreProcessor},
relay_quotas::{DataCategory, RateLimitingError, RedisRateLimiter},
};
Expand Down Expand Up @@ -269,31 +268,39 @@ pub struct EventProcessor {
}

impl EventProcessor {
#[cfg(feature = "processing")]
pub fn new(
config: Arc<Config>,
rate_limiter: Option<RedisRateLimiter>,
geoip_lookup: Option<Arc<GeoIpLookup>>,
) -> Self {
#[inline]
pub fn new(config: Arc<Config>) -> Self {
Self {
config,
rate_limiter,
geoip_lookup,
#[cfg(feature = "processing")]
rate_limiter: None,
#[cfg(feature = "processing")]
geoip_lookup: None,
}
}

#[cfg(not(feature = "processing"))]
pub fn new(config: Arc<Config>) -> Self {
Self { config }
#[cfg(feature = "processing")]
#[inline]
pub fn with_rate_limiter(mut self, rate_limiter: Option<RedisRateLimiter>) -> Self {
self.rate_limiter = rate_limiter;
self
}

#[cfg(feature = "processing")]
#[inline]
pub fn with_geoip_lookup(mut self, geoip_lookup: Option<Arc<GeoIpLookup>>) -> Self {
self.geoip_lookup = geoip_lookup;
self
}

/// Validates all sessions in the envelope, if any.
///
/// Sessions are removed from the envelope if they contain invalid JSON or if their timestamps
/// are out of range after clock drift correction.
fn process_sessions(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> {
fn process_sessions(&self, state: &mut ProcessEnvelopeState) {
let envelope = &mut state.envelope;
let received = state.received_at;
let client_addr = envelope.meta().client_addr();

let clock_drift_processor =
ClockDriftProcessor::new(envelope.sent_at(), received).at_least(MINIMUM_CLOCK_DRIFT);
Expand Down Expand Up @@ -352,6 +359,13 @@ impl EventProcessor {
return false;
}

if let Some(ref ip_address) = session.attributes.ip_address {
if ip_address.is_auto() {
session.attributes.ip_address = client_addr.map(IpAddr::from);
changed = true;
}
}

if changed {
let json_string = match serde_json::to_string(&session) {
Ok(json) => json,
Expand All @@ -363,8 +377,25 @@ impl EventProcessor {

true
});
}

Ok(())
/// Validates all user report/feedback items in the envelope, if any.
///
/// User feedback items are removed from the envelope if they contain invalid JSON or if the
/// JSON violates the schema (basic type validation).
fn process_user_reports(&self, state: &mut ProcessEnvelopeState) {
state.envelope.retain_items(|item| {
if item.ty() != ItemType::UserReport {
return true;
};

if let Err(error) = serde_json::from_slice::<UserReport>(&item.payload()) {
log::error!("failed to store user report: {}", LogError(&error));
return false;
}

true
});
}

/// Creates and initializes the processing state.
Expand Down Expand Up @@ -1103,7 +1134,8 @@ impl EventProcessor {
};
}

self.process_sessions(&mut state)?;
self.process_sessions(&mut state);
self.process_user_reports(&mut state);

if state.creates_event() {
if_processing!({
Expand Down Expand Up @@ -1244,11 +1276,9 @@ impl EventManager {

SyncArbiter::start(
thread_count,
clone!(config, || EventProcessor::new(
config.clone(),
rate_limiter.clone(),
geoip_lookup.clone(),
)),
clone!(config, || EventProcessor::new(config.clone())
.with_rate_limiter(rate_limiter.clone())
.with_geoip_lookup(geoip_lookup.clone())),
)
};

Expand Down Expand Up @@ -1627,6 +1657,8 @@ impl Handler<GetCapturedEvent> for EventManager {
mod tests {
use super::*;

use crate::extractors::RequestMeta;

use chrono::{DateTime, TimeZone, Utc};

fn create_breadcrumbs_item(breadcrumbs: &[(Option<DateTime<Utc>>, &str)]) -> Item {
Expand Down Expand Up @@ -1770,4 +1802,42 @@ mod tests {
// regression test to ensure we don't fail parsing an empty file
result.expect("event_from_attachments");
}

#[test]
fn test_user_report_invalid() {
let processor = EventProcessor::new(Arc::new(Default::default()));
let event_id = EventId::new();

let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"
.parse()
.unwrap();

let request_meta = RequestMeta::new(dsn);
let mut envelope = Envelope::from_request(Some(event_id), request_meta);

envelope.add_item({
let mut item = Item::new(ItemType::UserReport);
item.set_payload(ContentType::Json, r###"{"foo": "bar"}"###);
item
});

envelope.add_item({
let mut item = Item::new(ItemType::Event);
item.set_payload(ContentType::Json, "{}");
item
});

let envelope_response = processor
.process(ProcessEnvelope {
envelope,
project_state: Arc::new(ProjectState::allowed()),
start_time: Instant::now(),
})
.unwrap();

let new_envelope = envelope_response.envelope.unwrap();

assert_eq!(new_envelope.len(), 1);
assert_eq!(new_envelope.items().next().unwrap().ty(), ItemType::Event);
}
}
Loading

0 comments on commit 91c877f

Please sign in to comment.