Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::env;
use std::ffi::{OsStr, OsString};
use std::fmt;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;

use rand::random;
Expand All @@ -21,10 +22,9 @@ use transport::{DefaultTransportFactory, Transport, TransportFactory};
use utils::{debug_images, server_name};

/// The Sentry client object.
#[derive(Clone)]
pub struct Client {
options: ClientOptions,
transport: Option<Arc<Box<Transport>>>,
transport: RwLock<Option<Arc<Box<Transport>>>>,
}

impl fmt::Debug for Client {
Expand All @@ -36,6 +36,15 @@ impl fmt::Debug for Client {
}
}

impl Clone for Client {
fn clone(&self) -> Client {
Client {
options: self.options.clone(),
transport: RwLock::new(self.transport.read().unwrap().clone()),
}
}
}

/// Type alias for before event/breadcrumb handlers.
pub type BeforeCallback<T> = Arc<Box<Fn(T) -> Option<T> + Send + Sync>>;

Expand Down Expand Up @@ -81,8 +90,8 @@ pub struct ClientOptions {
/// This will default to the `HTTPS_PROXY` environment variable
/// or `http_proxy` if that one exists.
pub https_proxy: Option<Cow<'static, str>>,
/// The timeout on client drop for draining events.
pub shutdown_timeout: Option<Duration>,
/// The timeout on client drop for draining events on shutdown.
pub shutdown_timeout: Duration,
/// Attaches stacktraces to messages.
pub attach_stacktrace: bool,
/// If turned on some default PII informat is attached.
Expand Down Expand Up @@ -185,7 +194,7 @@ impl Default for ClientOptions {
.map(Cow::Owned)
.or_else(|| env::var("HTTPS_PROXY").ok().map(Cow::Owned))
.or_else(|| env::var("http_proxy").ok().map(Cow::Owned)),
shutdown_timeout: Some(Duration::from_secs(2)),
shutdown_timeout: Duration::from_secs(2),
attach_stacktrace: false,
send_default_pii: false,
before_send: None,
Expand Down Expand Up @@ -338,11 +347,11 @@ impl Client {
/// disabled.
pub fn with_options(options: ClientOptions) -> Client {
#[cfg_attr(feature = "cargo-clippy", allow(question_mark))]
let transport = if options.dsn.is_none() {
let transport = RwLock::new(if options.dsn.is_none() {
None
} else {
Some(Arc::new(options.transport.create_transport(&options)))
};
});
Client { options, transport }
}

Expand All @@ -364,7 +373,7 @@ impl Client {
pub fn disabled_with_options(options: ClientOptions) -> Client {
Client {
options,
transport: None,
transport: RwLock::new(None),
}
}

Expand Down Expand Up @@ -517,7 +526,7 @@ impl Client {

/// Captures an event and sends it to sentry.
pub fn capture_event(&self, event: Event<'static>, scope: Option<&Scope>) -> Uuid {
if let Some(ref transport) = self.transport {
if let Some(ref transport) = *self.transport.read().unwrap() {
if self.sample_should_send() {
if let Some(event) = self.prepare_event(event, scope) {
let event_id = event.id.unwrap();
Expand All @@ -529,14 +538,16 @@ impl Client {
Default::default()
}

/// Drains all pending events up to the current time.
/// Drains all pending events and shuts down the transport behind the
/// client. After shutting down the transport is removed.
///
/// This returns `true` if the queue was successfully drained in the
/// given time or `false` if not (for instance because of a timeout).
/// If no timeout is provided the client will wait forever.
pub fn drain_events(&self, timeout: Option<Duration>) -> bool {
if let Some(ref transport) = self.transport {
transport.drain(timeout)
/// If no timeout is provided the client will wait for as long a
/// `shutdown_timeout` in the client options.
pub fn close(&self, timeout: Option<Duration>) -> bool {
if let Some(transport) = self.transport.write().unwrap().take() {
transport.shutdown(timeout.unwrap_or(self.options.shutdown_timeout))
} else {
true
}
Expand All @@ -559,7 +570,7 @@ pub struct ClientInitGuard(Arc<Client>);

impl Drop for ClientInitGuard {
fn drop(&mut self) {
self.0.drain_events(self.0.options.shutdown_timeout);
self.0.close(None);
}
}

Expand Down
9 changes: 0 additions & 9 deletions src/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,6 @@ impl Hub {
}}
}

/// Drains the currently pending events.
pub fn drain_events(&self, timeout: Option<Duration>) {
with_client_impl! {{
if let Some(ref client) = self.client() {
client.drain_events(timeout);
}
}}
}

/// Returns the currently bound client.
#[cfg(feature = "with_client_implementation")]
pub fn client(&self) -> Option<Arc<Client>> {
Expand Down
43 changes: 29 additions & 14 deletions src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
Expand All @@ -22,9 +23,9 @@ pub trait Transport: Send + Sync + 'static {
/// Drains the queue if there is one.
///
/// The default implementation does nothing. If the queue was successfully
/// drained the return value should be `true` or `false` if events were
/// shutdowned the return value should be `true` or `false` if events were
/// left in it.
fn drain(&self, timeout: Option<Duration>) -> bool {
fn shutdown(&self, timeout: Duration) -> bool {
let _timeout = timeout;
true
}
Expand Down Expand Up @@ -72,8 +73,8 @@ impl<T: Transport> Transport for Arc<T> {
(**self).send_event(event)
}

fn drain(&self, timeout: Option<Duration>) -> bool {
(**self).drain(timeout)
fn shutdown(&self, timeout: Duration) -> bool {
(**self).shutdown(timeout)
}
}

Expand Down Expand Up @@ -102,7 +103,8 @@ impl TransportFactory for DefaultTransportFactory {
pub struct HttpTransport {
dsn: Dsn,
sender: Mutex<SyncSender<Option<Event<'static>>>>,
drain_signal: Arc<Condvar>,
shutdown_signal: Arc<Condvar>,
shutdown_immediately: Arc<AtomicBool>,
queue_size: Arc<Mutex<usize>>,
_handle: Option<JoinHandle<()>>,
}
Expand All @@ -112,6 +114,7 @@ fn spawn_http_sender(
receiver: Receiver<Option<Event<'static>>>,
dsn: Dsn,
signal: Arc<Condvar>,
shutdown_immediately: Arc<AtomicBool>,
queue_size: Arc<Mutex<usize>>,
user_agent: String,
) -> JoinHandle<()> {
Expand All @@ -121,6 +124,14 @@ fn spawn_http_sender(
let url = dsn.store_api_url().to_string();

while let Some(event) = receiver.recv().unwrap_or(None) {
// on drop we want to not continue processing the queue.
if shutdown_immediately.load(Ordering::SeqCst) {
let mut size = queue_size.lock().unwrap();
*size = 0;
signal.notify_all();
break;
}

// while we are disabled due to rate limits, skip
match disabled {
Some((disabled_at, RetryAfter::Delay(disabled_for))) => {
Expand Down Expand Up @@ -176,7 +187,8 @@ impl HttpTransport {
let https_proxy = options.https_proxy.as_ref().map(|x| x.to_string());

let (sender, receiver) = sync_channel(30);
let drain_signal = Arc::new(Condvar::new());
let shutdown_signal = Arc::new(Condvar::new());
let shutdown_immediately = Arc::new(AtomicBool::new(false));
#[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))]
let queue_size = Arc::new(Mutex::new(0));
let mut client = Client::builder();
Expand All @@ -190,14 +202,16 @@ impl HttpTransport {
client.build().unwrap(),
receiver,
dsn.clone(),
drain_signal.clone(),
shutdown_signal.clone(),
shutdown_immediately.clone(),
queue_size.clone(),
user_agent,
));
HttpTransport {
dsn,
sender: Mutex::new(sender),
drain_signal,
shutdown_signal,
shutdown_immediately: shutdown_immediately,
queue_size,
_handle,
}
Expand All @@ -215,21 +229,22 @@ impl Transport for HttpTransport {
}
}

fn drain(&self, timeout: Option<Duration>) -> bool {
fn shutdown(&self, timeout: Duration) -> bool {
let guard = self.queue_size.lock().unwrap();
if *guard == 0 {
return true;
}
if let Some(timeout) = timeout {
self.drain_signal.wait_timeout(guard, timeout).is_ok()
true
} else {
self.drain_signal.wait(guard).is_ok()
if let Ok(sender) = self.sender.lock() {
sender.send(None).ok();
}
self.shutdown_signal.wait_timeout(guard, timeout).is_ok()
}
}
}

impl Drop for HttpTransport {
fn drop(&mut self) {
self.shutdown_immediately.store(true, Ordering::SeqCst);
if let Ok(sender) = self.sender.lock() {
sender.send(None).ok();
}
Expand Down