diff --git a/lib/framework/src/common/mod.rs b/lib/framework/src/common/mod.rs deleted file mode 100644 index 860b680e5..000000000 --- a/lib/framework/src/common/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod open; - -pub use open::{OpenGauge, OpenToken}; diff --git a/lib/framework/src/common/open.rs b/lib/framework/src/common/open.rs deleted file mode 100644 index 47f97301f..000000000 --- a/lib/framework/src/common/open.rs +++ /dev/null @@ -1,130 +0,0 @@ -use std::hint; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; - -#[derive(Clone)] -pub struct OpenGauge { - gauge: Arc, -} - -impl OpenGauge { - pub fn new() -> Self { - OpenGauge { - gauge: Arc::default(), - } - } - - /// Increments and emits value once created. - /// Decrements and emits value once dropped. - pub fn open(self, emitter: E) -> OpenToken { - gauge_add(&self.gauge, 1, &emitter); - OpenToken { - gauge: self.gauge, - emitter, - } - } - - pub fn any_open(&self) -> bool { - self.gauge.load(Ordering::Acquire) != 0 - } -} - -impl Default for OpenGauge { - fn default() -> Self { - Self::new() - } -} - -pub struct OpenToken { - gauge: Arc, - emitter: E, -} - -impl Drop for OpenToken { - fn drop(&mut self) { - gauge_add(&self.gauge, -1, &self.emitter); - } -} - -/// If reporting gauges from multiple threads, they can end up in a wrong order -/// resulting in having wrong value for a prolonged period of time. -/// This function performs a synchronization procedure that corrects that. -fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) { - // The goal of this function is to properly sequence calls to `emitter` from - // multiple threads. It is possible that `emitter` will be called multiple - // times -- worst case, `n^2 / 2` times where `n` is the number of parallel - // peers -- but this is acceptable. - // - // The implementation here is a spin lock on the `gauge` value with the - // critical section being solely for updating the `gauge` value by `add` and - // calling `emitter`. If we suffer priority inversion at higher peer counts - // we might consider the use of a mutex, which will participate in the OS's - // scheduler. See [this - // post](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html) - // for details if you're working on something like that and need background. - // - // The order of calls to `emitter` are not guaranteed but it is guaranteed - // that the most recent holder of the lock will be the most recent caller of - // `emitter`. - let mut value = gauge.load(Ordering::Acquire); - loop { - let new_value = (value as isize + add) as usize; - emitter(new_value); - // Try to update gauge to new value and releasing writes to gauge metric - // in the process. Otherwise acquire new writes to gauge metric. - // - // When `compare_exchange_weak` returns Ok our `new_value` is now the - // current value in memory across all CPUs. When the return is Err we - // retry with the now current value. - match gauge.compare_exchange_weak(value, new_value, Ordering::AcqRel, Ordering::Acquire) { - Ok(_) => break, - Err(x) => { - hint::spin_loop(); - value = x; - } - } - } -} - -#[cfg(test)] -mod tests { - use std::{mem::drop, thread}; - - use super::*; - - /// If this fails at any run, then the algorithm in `gauge_add` is faulty. - #[test] - fn eventually_consistent() { - let n = 8; - let m = 1000; - let gauge = OpenGauge::new(); - let value = Arc::new(AtomicUsize::new(0)); - - let handles = (0..n) - .map(|_| { - let gauge = gauge.clone(); - let value = Arc::clone(&value); - thread::spawn(move || { - let mut work = 0; - for _ in 0..m { - let token = gauge - .clone() - .open(|count| value.store(count, Ordering::Release)); - // Do some work - for i in 0..100 { - work += i; - } - drop(token); - } - work - }) - }) - .collect::>(); - - for handle in handles { - handle.join().unwrap(); - } - - assert_eq!(0, value.load(Ordering::Acquire)); - } -} diff --git a/lib/framework/src/lib.rs b/lib/framework/src/lib.rs index b923080eb..6334a4b4d 100644 --- a/lib/framework/src/lib.rs +++ b/lib/framework/src/lib.rs @@ -4,7 +4,6 @@ pub mod async_read; pub mod batch; -mod common; pub mod config; pub mod dns; mod extension; @@ -33,7 +32,6 @@ mod utilization; use std::sync::OnceLock; -pub use common::*; pub use extension::Extension; pub use pipeline::Pipeline; pub use shutdown::*; diff --git a/lib/framework/src/sink/util/tcp.rs b/lib/framework/src/sink/util/tcp.rs index 058323a68..7df18543c 100644 --- a/lib/framework/src/sink/util/tcp.rs +++ b/lib/framework/src/sink/util/tcp.rs @@ -25,7 +25,7 @@ use crate::sink::util::socket_bytes_sink::{BytesSink, ShutdownCheck}; use crate::sink::VecSinkExt; use crate::tcp::TcpKeepaliveConfig; use crate::tls::{MaybeTlsStream, TlsConfig, TlsError}; -use crate::{dns, Healthcheck, OpenGauge, Sink, StreamSink}; +use crate::{dns, Healthcheck, Sink, StreamSink}; #[derive(Debug, Error)] enum TcpError { @@ -313,7 +313,6 @@ where while Pin::new(&mut input).peek().await.is_some() { let mut sink = self.connect().await; - let _open_token = OpenGauge::new(); let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await { Ok(()) => sink.close().await,