Skip to content

Commit

Permalink
chore(statsd sink): refactor statsd sink to stream-based style (vec…
Browse files Browse the repository at this point in the history
…tordotdev#16199)

This PR completely refactors the `statsd` sink in the "new style", which
uses stream-based combinators to build a `Stream` implementation that
drives component behavior.

At a high-level, the PR is indeed for refactoring the sink, but
ultimately includes as much, if not more, refactoring work around
establishing reusable `Service`-based primitives for building other
sinks like `statsd` i.e. `socket` or `syslog`.

## Reviewer Notes

I've mostly copied the existing shared socket sink types --
`TcpSinkConfig`, etc -- and existing socket services -- `UdpService` --
and created consistent versions of them for TCP, UDP, and Unix Domain
sockets.

This includes a configuration type that is `Configurable`-compatible for
all of them, with socket-specific configurations[1] and then methods for
generating both the `Service` implementation and a `Healthcheck`
implementation. Ultimately, this should form the basis of other sink
refactors that use sockets directly (`socket`, `syslog`, etc) but it may
be desirable to do some more trait-ifying to avoid some of the necessary
boilerplate introduced here.

## Remaining Work

- [x] fix normalizer unit tests + add one for sketches
  • Loading branch information
tobz committed May 25, 2023
1 parent b28d915 commit 2a76cac
Show file tree
Hide file tree
Showing 31 changed files with 2,346 additions and 786 deletions.
1 change: 1 addition & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Enot
Evercoss
Explay
FAQs
FQDNs
Fabro
Figma
Flipboard
Expand Down
6 changes: 6 additions & 0 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ impl From<&'static str> for Protocol {
}
}

impl From<Protocol> for SharedString {
fn from(value: Protocol) -> Self {
value.0
}
}

/// Macro to take care of some of the repetitive boilerplate in implementing a registered event. See
/// the other events in this module for examples of how to use this.
///
Expand Down
13 changes: 1 addition & 12 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,18 +253,7 @@ pub(crate) use self::statsd_sink::*;
pub(crate) use self::tag_cardinality_limit::*;
#[cfg(feature = "transforms-throttle")]
pub(crate) use self::throttle::*;
#[cfg(all(
any(
feature = "sinks-socket",
feature = "sinks-statsd",
feature = "sources-dnstap",
feature = "sources-metrics",
feature = "sources-statsd",
feature = "sources-syslog",
feature = "sources-socket"
),
unix
))]
#[cfg(unix)]
pub(crate) use self::unix::*;
#[cfg(feature = "sinks-websocket")]
pub(crate) use self::websocket::*;
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub enum SocketMode {
}

impl SocketMode {
const fn as_str(self) -> &'static str {
pub const fn as_str(self) -> &'static str {
match self {
Self::Tcp => "tcp",
Self::Udp => "udp",
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/statsd_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vector_common::internal_event::{
#[derive(Debug)]
pub struct StatsdInvalidMetricError<'a> {
pub value: &'a MetricValue,
pub kind: &'a MetricKind,
pub kind: MetricKind,
}

impl<'a> InternalEvent for StatsdInvalidMetricError<'a> {
Expand Down
28 changes: 28 additions & 0 deletions src/internal_events/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,34 @@ impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
}
}

#[derive(Debug)]
pub struct UnixSendIncompleteError {
pub data_size: usize,
pub sent: usize,
}

impl InternalEvent for UnixSendIncompleteError {
fn emit(self) {
let reason = "Could not send all data in one Unix datagram.";
error!(
message = reason,
data_size = self.data_size,
sent = self.sent,
dropped = self.data_size - self.sent,
error_type = error_type::WRITER_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::WRITER_FAILED,
"stage" => error_stage::SENDING,
);

emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
}
}

#[derive(Debug)]
pub struct UnixSocketFileDeleteError<'a> {
pub path: &'a Path,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub mod line_agg;
pub mod list;
#[cfg(any(feature = "sources-nats", feature = "sinks-nats"))]
pub(crate) mod nats;
pub mod net;
#[allow(unreachable_pub)]
pub(crate) mod proto;
pub mod providers;
Expand Down Expand Up @@ -112,7 +113,6 @@ pub mod trace;
#[allow(unreachable_pub)]
pub mod transforms;
pub mod types;
pub mod udp;
pub mod unit_test;
pub(crate) mod utilization;
pub mod validate;
Expand Down
52 changes: 52 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//! Networking-related helper functions.

use std::{io, time::Duration};

use socket2::{SockRef, TcpKeepalive};
use tokio::net::TcpStream;

/// Sets the receive buffer size for a socket.
///
/// This is the equivalent of setting the `SO_RCVBUF` socket setting directly.
///
/// # Errors
///
/// If there is an error setting the receive buffer size on the given socket, or if the value given
/// as the socket is not a valid socket, an error variant will be returned explaining the underlying
/// I/O error.
pub fn set_receive_buffer_size<'s, S>(socket: &'s S, size: usize) -> io::Result<()>
where
SockRef<'s>: From<&'s S>,
{
SockRef::from(socket).set_recv_buffer_size(size)
}

/// Sets the send buffer size for a socket.
///
/// This is the equivalent of setting the `SO_SNDBUF` socket setting directly.
///
/// # Errors
///
/// If there is an error setting the send buffer size on the given socket, or if the value given
/// as the socket is not a valid socket, an error variant will be returned explaining the underlying
/// I/O error.
pub fn set_send_buffer_size<'s, S>(socket: &'s S, size: usize) -> io::Result<()>
where
SockRef<'s>: From<&'s S>,
{
SockRef::from(socket).set_send_buffer_size(size)
}

/// Sets the TCP keepalive behavior on a socket.
///
/// This is the equivalent of setting the `SO_KEEPALIVE` and `TCP_KEEPALIVE` socket settings
/// directly.
///
/// # Errors
///
/// If there is an error with either enabling keepalive probes or setting the TCP keepalive idle
/// timeout on the given socket, an error variant will be returned explaining the underlying I/O
/// error.
pub fn set_keepalive(socket: &TcpStream, ttl: Duration) -> io::Result<()> {
SockRef::from(socket).set_tcp_keepalive(&TcpKeepalive::new().with_time(ttl))
}
Loading

0 comments on commit 2a76cac

Please sign in to comment.