diff --git a/src/client.rs b/src/client.rs index 0ab5fcab4..670f73cf7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,7 @@ use std::env; use std::ffi::{OsStr, OsString}; use std::fmt; use std::sync::Arc; +use std::sync::RwLock; use std::time::Duration; use rand::random; @@ -21,10 +22,9 @@ use transport::{DefaultTransportFactory, Transport, TransportFactory}; use utils::{debug_images, server_name}; /// The Sentry client object. -#[derive(Clone)] pub struct Client { options: ClientOptions, - transport: Option>>, + transport: RwLock>>>, } impl fmt::Debug for Client { @@ -36,6 +36,15 @@ impl fmt::Debug for Client { } } +impl Clone for Client { + fn clone(&self) -> Client { + Client { + options: self.options.clone(), + transport: RwLock::new(self.transport.read().unwrap().clone()), + } + } +} + /// Type alias for before event/breadcrumb handlers. pub type BeforeCallback = Arc Option + Send + Sync>>; @@ -81,8 +90,8 @@ pub struct ClientOptions { /// This will default to the `HTTPS_PROXY` environment variable /// or `http_proxy` if that one exists. pub https_proxy: Option>, - /// The timeout on client drop for draining events. - pub shutdown_timeout: Option, + /// The timeout on client drop for draining events on shutdown. + pub shutdown_timeout: Duration, /// Attaches stacktraces to messages. pub attach_stacktrace: bool, /// If turned on some default PII informat is attached. @@ -185,7 +194,7 @@ impl Default for ClientOptions { .map(Cow::Owned) .or_else(|| env::var("HTTPS_PROXY").ok().map(Cow::Owned)) .or_else(|| env::var("http_proxy").ok().map(Cow::Owned)), - shutdown_timeout: Some(Duration::from_secs(2)), + shutdown_timeout: Duration::from_secs(2), attach_stacktrace: false, send_default_pii: false, before_send: None, @@ -338,11 +347,11 @@ impl Client { /// disabled. pub fn with_options(options: ClientOptions) -> Client { #[cfg_attr(feature = "cargo-clippy", allow(question_mark))] - let transport = if options.dsn.is_none() { + let transport = RwLock::new(if options.dsn.is_none() { None } else { Some(Arc::new(options.transport.create_transport(&options))) - }; + }); Client { options, transport } } @@ -364,7 +373,7 @@ impl Client { pub fn disabled_with_options(options: ClientOptions) -> Client { Client { options, - transport: None, + transport: RwLock::new(None), } } @@ -517,7 +526,7 @@ impl Client { /// Captures an event and sends it to sentry. pub fn capture_event(&self, event: Event<'static>, scope: Option<&Scope>) -> Uuid { - if let Some(ref transport) = self.transport { + if let Some(ref transport) = *self.transport.read().unwrap() { if self.sample_should_send() { if let Some(event) = self.prepare_event(event, scope) { let event_id = event.id.unwrap(); @@ -529,14 +538,16 @@ impl Client { Default::default() } - /// Drains all pending events up to the current time. + /// Drains all pending events and shuts down the transport behind the + /// client. After shutting down the transport is removed. /// /// This returns `true` if the queue was successfully drained in the /// given time or `false` if not (for instance because of a timeout). - /// If no timeout is provided the client will wait forever. - pub fn drain_events(&self, timeout: Option) -> bool { - if let Some(ref transport) = self.transport { - transport.drain(timeout) + /// If no timeout is provided the client will wait for as long a + /// `shutdown_timeout` in the client options. + pub fn close(&self, timeout: Option) -> bool { + if let Some(transport) = self.transport.write().unwrap().take() { + transport.shutdown(timeout.unwrap_or(self.options.shutdown_timeout)) } else { true } @@ -559,7 +570,7 @@ pub struct ClientInitGuard(Arc); impl Drop for ClientInitGuard { fn drop(&mut self) { - self.0.drain_events(self.0.options.shutdown_timeout); + self.0.close(None); } } diff --git a/src/hub.rs b/src/hub.rs index f9ec01fee..0047eb17c 100644 --- a/src/hub.rs +++ b/src/hub.rs @@ -311,15 +311,6 @@ impl Hub { }} } - /// Drains the currently pending events. - pub fn drain_events(&self, timeout: Option) { - with_client_impl! {{ - if let Some(ref client) = self.client() { - client.drain_events(timeout); - } - }} - } - /// Returns the currently bound client. #[cfg(feature = "with_client_implementation")] pub fn client(&self) -> Option> { diff --git a/src/transport.rs b/src/transport.rs index 2e626139b..1bf509bb8 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::{Arc, Condvar, Mutex}; use std::thread::{self, JoinHandle}; @@ -22,9 +23,9 @@ pub trait Transport: Send + Sync + 'static { /// Drains the queue if there is one. /// /// The default implementation does nothing. If the queue was successfully - /// drained the return value should be `true` or `false` if events were + /// shutdowned the return value should be `true` or `false` if events were /// left in it. - fn drain(&self, timeout: Option) -> bool { + fn shutdown(&self, timeout: Duration) -> bool { let _timeout = timeout; true } @@ -72,8 +73,8 @@ impl Transport for Arc { (**self).send_event(event) } - fn drain(&self, timeout: Option) -> bool { - (**self).drain(timeout) + fn shutdown(&self, timeout: Duration) -> bool { + (**self).shutdown(timeout) } } @@ -102,7 +103,8 @@ impl TransportFactory for DefaultTransportFactory { pub struct HttpTransport { dsn: Dsn, sender: Mutex>>>, - drain_signal: Arc, + shutdown_signal: Arc, + shutdown_immediately: Arc, queue_size: Arc>, _handle: Option>, } @@ -112,6 +114,7 @@ fn spawn_http_sender( receiver: Receiver>>, dsn: Dsn, signal: Arc, + shutdown_immediately: Arc, queue_size: Arc>, user_agent: String, ) -> JoinHandle<()> { @@ -121,6 +124,14 @@ fn spawn_http_sender( let url = dsn.store_api_url().to_string(); while let Some(event) = receiver.recv().unwrap_or(None) { + // on drop we want to not continue processing the queue. + if shutdown_immediately.load(Ordering::SeqCst) { + let mut size = queue_size.lock().unwrap(); + *size = 0; + signal.notify_all(); + break; + } + // while we are disabled due to rate limits, skip match disabled { Some((disabled_at, RetryAfter::Delay(disabled_for))) => { @@ -176,7 +187,8 @@ impl HttpTransport { let https_proxy = options.https_proxy.as_ref().map(|x| x.to_string()); let (sender, receiver) = sync_channel(30); - let drain_signal = Arc::new(Condvar::new()); + let shutdown_signal = Arc::new(Condvar::new()); + let shutdown_immediately = Arc::new(AtomicBool::new(false)); #[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))] let queue_size = Arc::new(Mutex::new(0)); let mut client = Client::builder(); @@ -190,14 +202,16 @@ impl HttpTransport { client.build().unwrap(), receiver, dsn.clone(), - drain_signal.clone(), + shutdown_signal.clone(), + shutdown_immediately.clone(), queue_size.clone(), user_agent, )); HttpTransport { dsn, sender: Mutex::new(sender), - drain_signal, + shutdown_signal, + shutdown_immediately: shutdown_immediately, queue_size, _handle, } @@ -215,21 +229,22 @@ impl Transport for HttpTransport { } } - fn drain(&self, timeout: Option) -> bool { + fn shutdown(&self, timeout: Duration) -> bool { let guard = self.queue_size.lock().unwrap(); if *guard == 0 { - return true; - } - if let Some(timeout) = timeout { - self.drain_signal.wait_timeout(guard, timeout).is_ok() + true } else { - self.drain_signal.wait(guard).is_ok() + if let Ok(sender) = self.sender.lock() { + sender.send(None).ok(); + } + self.shutdown_signal.wait_timeout(guard, timeout).is_ok() } } } impl Drop for HttpTransport { fn drop(&mut self) { + self.shutdown_immediately.store(true, Ordering::SeqCst); if let Ok(sender) = self.sender.lock() { sender.send(None).ok(); }