Skip to content

Commit

Permalink
Expose .flush() method on MetricSink trait
Browse files Browse the repository at this point in the history
Allow applications to periodically flush any buffered metrics
before they would otherwise be flushed due to the buffer filling
up.

Fixes #100
  • Loading branch information
56quarters committed Aug 17, 2020
1 parent 9c1eb45 commit 6c0c5ff
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/sinks/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ pub trait MetricSink {
/// Send the Statsd metric using this sink and return the number of bytes
/// written or an I/O error.
fn emit(&self, metric: &str) -> io::Result<usize>;

/// Flush any currently buffered metrics to the underlying backend, returning
/// an I/O error if they could not be written for some reason.
///
/// Note that not all sinks buffer metrics and so the default implementation of
/// this method does nothing.
fn flush(&self) -> io::Result<()> {
Ok(())
}
}

/// Implementation of a `MetricSink` that discards all metrics.
Expand Down
19 changes: 19 additions & 0 deletions src/sinks/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ impl MetricSink for BufferedUdpMetricSink {
let mut writer = self.buffer.lock().unwrap();
writer.write(metric.as_bytes())
}

fn flush(&self) -> io::Result<()> {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -303,4 +308,18 @@ mod tests {
assert_eq!(9, sink.emit("foo:54|c").unwrap());
assert_eq!(9, sink.emit("foo:67|c").unwrap());
}

#[test]
fn test_buffered_udp_metric_sink_flus() {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
// Set the capacity of the buffer such that it won't be flushed
// from a single write. Thus we can test the flush method.
let sink = BufferedUdpMetricSink::with_capacity("127.0.0.1:8125", socket, 16).unwrap();

// Note that we're including an extra byte in the expected
// number written since each metric is followed by a '\n' at
// the end.
assert_eq!(9, sink.emit("foo:54|c").unwrap());
assert!(sink.flush().is_ok());
}
}
24 changes: 24 additions & 0 deletions src/sinks/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ impl MetricSink for BufferedUnixMetricSink {
let mut writer = self.buffer.lock().unwrap();
writer.write(metric.as_bytes())
}

fn flush(&self) -> io::Result<()> {
let mut writer = self.buffer.lock().unwrap();
writer.flush()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -267,4 +272,23 @@ mod tests {
assert_eq!(9, sink.emit("foo:67|c").unwrap());
});
}

#[test]
fn test_buffered_unix_metric_sink_flush() {
let harness = UnixServerHarness::new("test_buffered_unix_metric_sink_flush");

harness.run_quiet(|path| {
let socket = UnixDatagram::unbound().unwrap();

// Set the capacity of the buffer such that it won't be flushed
// from a single write. Thus we can test the flush method.
let sink = BufferedUnixMetricSink::with_capacity(path, socket, 16);

// Note that we're including an extra byte in the expected
// number written since each metric is followed by a '\n' at
// the end.
assert_eq!(9, sink.emit("foo:54|c").unwrap());
assert!(sink.flush().is_ok());
});
}
}

0 comments on commit 6c0c5ff

Please sign in to comment.