Skip to content

Commit

Permalink
Network-level telemetry via sink stats
Browse files Browse the repository at this point in the history
  • Loading branch information
mlowicki committed Mar 19, 2024
1 parent e7eeb94 commit a991935
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cadence/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ pub use self::client::{

pub use self::sinks::{
BufferedSpyMetricSink, BufferedUdpMetricSink, MetricSink, NopMetricSink, QueuingMetricSink,
QueuingMetricSinkBuilder, SpyMetricSink, UdpMetricSink,
QueuingMetricSinkBuilder, SinkStats, SpyMetricSink, UdpMetricSink,
};

pub use self::types::{
Expand Down
57 changes: 56 additions & 1 deletion cadence/src/sinks/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,58 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::io;
use std::{
io,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

#[derive(Debug, Default)]
pub struct SinkStats {
pub bytes_sent: u64,
pub packets_sent: u64,
pub bytes_dropped: u64,
pub packets_dropped: u64,
}

#[derive(Debug, Clone, Default)]
pub(crate) struct WriterStats {
bytes_sent: Arc<AtomicU64>,
packets_sent: Arc<AtomicU64>,
bytes_dropped: Arc<AtomicU64>,
packets_dropped: Arc<AtomicU64>,
}

impl WriterStats {
pub(crate) fn incr_bytes_sent(&self, n: u64) {
self.bytes_sent.fetch_add(n, Ordering::Relaxed);
}

pub(crate) fn incr_packets_sent(&self) {
self.packets_sent.fetch_add(1, Ordering::Relaxed);
}

pub(crate) fn incr_bytes_dropped(&self, n: u64) {
self.bytes_dropped.fetch_add(n, Ordering::Relaxed);
}

pub(crate) fn incr_packets_dropped(&self) {
self.packets_dropped.fetch_add(1, Ordering::Relaxed);
}
}

impl From<&WriterStats> for SinkStats {
fn from(stats: &WriterStats) -> Self {
SinkStats {
bytes_sent: stats.bytes_sent.load(Ordering::Relaxed),
packets_sent: stats.packets_sent.load(Ordering::Relaxed),
bytes_dropped: stats.bytes_dropped.load(Ordering::Relaxed),
packets_dropped: stats.packets_dropped.load(Ordering::Relaxed),
}
}
}

/// Trait for various backends that send Statsd metrics somewhere.
///
Expand Down Expand Up @@ -77,6 +128,10 @@ pub trait MetricSink {
fn flush(&self) -> io::Result<()> {
Ok(())
}

fn stats(&self) -> SinkStats {
SinkStats::default()
}
}

/// Implementation of a `MetricSink` that discards all metrics.
Expand Down
2 changes: 1 addition & 1 deletion cadence/src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod queuing;
mod spy;
mod udp;

pub use crate::sinks::core::{MetricSink, NopMetricSink};
pub use crate::sinks::core::{MetricSink, NopMetricSink, SinkStats};
pub use crate::sinks::queuing::{QueuingMetricSink, QueuingMetricSinkBuilder};
pub use crate::sinks::spy::{BufferedSpyMetricSink, SpyMetricSink};
pub use crate::sinks::udp::{BufferedUdpMetricSink, UdpMetricSink};
Expand Down
6 changes: 5 additions & 1 deletion cadence/src/sinks/queuing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats};
use crossbeam_channel::{self, Receiver, Sender, TrySendError};
use std::fmt;
use std::io::{self, ErrorKind};
Expand Down Expand Up @@ -273,6 +273,10 @@ impl MetricSink for QueuingMetricSink {
fn flush(&self) -> Result<(), std::io::Error> {
self.sink.flush()
}

fn stats(&self) -> SinkStats {
self.sink.stats()
}
}

impl Drop for QueuingMetricSink {
Expand Down
32 changes: 27 additions & 5 deletions cadence/src/sinks/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::Mutex;

use crate::io::MultiLineWriter;
use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats, WriterStats};
use crate::types::{ErrorKind, MetricError, MetricResult};

// Default size of the buffer for buffered metric sinks. This
Expand Down Expand Up @@ -118,17 +118,29 @@ impl MetricSink for UdpMetricSink {
pub(crate) struct UdpWriteAdapter {
addr: SocketAddr,
socket: UdpSocket,
stats: WriterStats,
}

impl UdpWriteAdapter {
pub(crate) fn new(addr: SocketAddr, socket: UdpSocket) -> UdpWriteAdapter {
UdpWriteAdapter { addr, socket }
pub(crate) fn new(addr: SocketAddr, socket: UdpSocket, stats: WriterStats) -> UdpWriteAdapter {
UdpWriteAdapter { addr, socket, stats }
}
}

impl Write for UdpWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.socket.send_to(buf, self.addr)
match self.socket.send_to(buf, self.addr) {
Ok(written) => {
self.stats.incr_bytes_sent(written as u64);
self.stats.incr_packets_sent();
Ok(written)
}
Err(e) => {
self.stats.incr_bytes_dropped(buf.len() as u64);
self.stats.incr_packets_dropped();
Err(e)
}
}
}

fn flush(&mut self) -> io::Result<()> {
Expand Down Expand Up @@ -161,6 +173,7 @@ impl Write for UdpWriteAdapter {
#[derive(Debug)]
pub struct BufferedUdpMetricSink {
buffer: Mutex<MultiLineWriter<UdpWriteAdapter>>,
stats: WriterStats,
}

impl BufferedUdpMetricSink {
Expand Down Expand Up @@ -238,8 +251,13 @@ impl BufferedUdpMetricSink {
A: ToSocketAddrs,
{
let addr = get_addr(sink_addr)?;
let stats = WriterStats::default();
Ok(BufferedUdpMetricSink {
buffer: Mutex::new(MultiLineWriter::new(UdpWriteAdapter::new(addr, socket), cap)),
buffer: Mutex::new(MultiLineWriter::new(
UdpWriteAdapter::new(addr, socket, stats.clone()),
cap,
)),
stats,
})
}
}
Expand All @@ -254,6 +272,10 @@ impl MetricSink for BufferedUdpMetricSink {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

#[cfg(test)]
Expand Down
31 changes: 27 additions & 4 deletions cadence/src/sinks/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::path::{Path, PathBuf};
use std::sync::Mutex;

use crate::io::MultiLineWriter;
use crate::sinks::core::MetricSink;
use crate::sinks::core::{MetricSink, SinkStats, WriterStats};

// Default size of the buffer for buffered metric sinks. This
// is a rather conservative value, picked for consistency with
Expand Down Expand Up @@ -95,23 +95,36 @@ impl MetricSink for UnixMetricSink {
pub(crate) struct UnixWriteAdapter {
path: PathBuf,
socket: UnixDatagram,
stats: WriterStats,
}

impl UnixWriteAdapter {
fn new<P>(socket: UnixDatagram, path: P) -> UnixWriteAdapter
fn new<P>(socket: UnixDatagram, path: P, stats: WriterStats) -> UnixWriteAdapter
where
P: AsRef<Path>,
{
UnixWriteAdapter {
path: path.as_ref().to_path_buf(),
socket,
stats,
}
}
}

impl Write for UnixWriteAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.socket.send_to(buf, &self.path)
match self.socket.send_to(buf, &self.path) {
Ok(written) => {
self.stats.incr_bytes_sent(written as u64);
self.stats.incr_packets_sent();
Ok(written)
}
Err(e) => {
self.stats.incr_bytes_dropped(buf.len() as u64);
self.stats.incr_packets_dropped();
Err(e)
}
}
}

fn flush(&mut self) -> io::Result<()> {
Expand Down Expand Up @@ -147,6 +160,7 @@ impl Write for UnixWriteAdapter {
#[derive(Debug)]
pub struct BufferedUnixMetricSink {
buffer: Mutex<MultiLineWriter<UnixWriteAdapter>>,
stats: WriterStats,
}

impl BufferedUnixMetricSink {
Expand Down Expand Up @@ -202,8 +216,13 @@ impl BufferedUnixMetricSink {
where
P: AsRef<Path>,
{
let stats = WriterStats::default();
BufferedUnixMetricSink {
buffer: Mutex::new(MultiLineWriter::new(UnixWriteAdapter::new(socket, path), cap)),
buffer: Mutex::new(MultiLineWriter::new(
UnixWriteAdapter::new(socket, path, stats.clone()),
cap,
)),
stats,
}
}
}
Expand All @@ -218,6 +237,10 @@ impl MetricSink for BufferedUnixMetricSink {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}

fn stats(&self) -> SinkStats {
(&self.stats).into()
}
}

#[cfg(test)]
Expand Down

0 comments on commit a991935

Please sign in to comment.