Skip to content

Commit

Permalink
chore(observability)!: remove peer_addr internal metric tag (vector…
Browse files Browse the repository at this point in the history
…dotdev#18982)

* OPW-94 remove peer_addr tag

* fmt
  • Loading branch information
dsmith3197 committed Oct 30, 2023
1 parent e61f308 commit b9447f6
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 41 deletions.
3 changes: 1 addition & 2 deletions src/internal_events/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ impl InternalEvent for TcpBytesReceived {
);
counter!(
"component_received_bytes_total", self.byte_size as u64,
"protocol" => "tcp",
"peer_addr" => self.peer_addr.to_string()
"protocol" => "tcp"
);
}
}
54 changes: 27 additions & 27 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ mod test {
sources::util::net::SocketListenAddr,
test_util::{
collect_n, collect_n_limited,
components::{assert_source_compliance, SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS},
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
},
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
Expand All @@ -391,7 +391,7 @@ mod test {
//////// TCP TESTS ////////
#[tokio::test]
async fn tcp_it_includes_host() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand All @@ -416,7 +416,7 @@ mod test {

#[tokio::test]
async fn tcp_it_includes_vector_namespaced_fields() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();
let mut conf = TcpConfig::from_address(addr.into());
Expand Down Expand Up @@ -456,7 +456,7 @@ mod test {

#[tokio::test]
async fn tcp_splits_on_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -488,7 +488,7 @@ mod test {

#[tokio::test]
async fn tcp_it_includes_source_type() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand All @@ -514,7 +514,7 @@ mod test {

#[tokio::test]
async fn tcp_continue_after_long_line() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -555,7 +555,7 @@ mod test {

#[tokio::test]
async fn tcp_with_tls() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -619,7 +619,7 @@ mod test {

#[tokio::test]
async fn tcp_with_tls_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

Expand Down Expand Up @@ -694,7 +694,7 @@ mod test {

#[tokio::test]
async fn tcp_shutdown_simple() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let source_id = ComponentKey::from("tcp_shutdown_simple");
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();
Expand Down Expand Up @@ -962,7 +962,7 @@ mod test {

#[tokio::test]
async fn udp_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -979,7 +979,7 @@ mod test {

#[tokio::test]
async fn udp_message_preserves_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -996,7 +996,7 @@ mod test {

#[tokio::test]
async fn udp_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1017,7 +1017,7 @@ mod test {

#[tokio::test]
async fn udp_max_length() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
Expand Down Expand Up @@ -1053,7 +1053,7 @@ mod test {
/// Windows will drop the entire packet if we exceed the max_length so we are unable to
/// extract anything.
async fn udp_max_length_delimited() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
Expand Down Expand Up @@ -1084,7 +1084,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_host() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1099,7 +1099,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_vector_namespaced_fields() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, true).await;

Expand Down Expand Up @@ -1127,7 +1127,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_source_type() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

Expand All @@ -1144,7 +1144,7 @@ mod test {

#[tokio::test]
async fn udp_shutdown_simple() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let source_id = ComponentKey::from("udp_shutdown_simple");

Expand Down Expand Up @@ -1174,7 +1174,7 @@ mod test {

#[tokio::test]
async fn udp_shutdown_infinite_stream() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let source_id = ComponentKey::from("udp_shutdown_infinite_stream");

Expand Down Expand Up @@ -1334,7 +1334,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", false, false).await;
let events = collect_n(rx, 1).await;

Expand Down Expand Up @@ -1401,7 +1401,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message_with_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", false, true).await;
let events = collect_n(rx, 1).await;
let log = events[0].as_log();
Expand All @@ -1426,7 +1426,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message_preserves_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("foo\nbar", false, false).await;
let events = collect_n(rx, 1).await;

Expand All @@ -1446,7 +1446,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
unix_multiple_packets(false).await
})
.await;
Expand Down Expand Up @@ -1513,7 +1513,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", true, false).await;
let events = collect_n(rx, 1).await;

Expand All @@ -1533,7 +1533,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message_with_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", true, true).await;
let events = collect_n(rx, 1).await;
let log = events[0].as_log();
Expand All @@ -1556,7 +1556,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message_splits_on_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("foo\nbar", true, false).await;
let events = collect_n(rx, 2).await;

Expand Down Expand Up @@ -1584,7 +1584,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
unix_multiple_packets(true).await
})
.await;
Expand Down
8 changes: 4 additions & 4 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ mod test {
collect_limited,
components::{
assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS,
SOCKET_PUSH_SOURCE_TAGS,
},
metrics::{assert_counter, assert_distribution, assert_gauge, assert_set},
next_addr,
Expand All @@ -365,7 +365,7 @@ mod test {

#[tokio::test]
async fn test_statsd_udp() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_addr = next_addr();
let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into()));
let (sender, mut receiver) = mpsc::channel(200);
Expand All @@ -384,7 +384,7 @@ mod test {

#[tokio::test]
async fn test_statsd_tcp() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_addr = next_addr();
let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
let (sender, mut receiver) = mpsc::channel(200);
Expand Down Expand Up @@ -427,7 +427,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn test_statsd_unix() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test");
let config = StatsdConfig::Unix(UnixConfig {
path: in_path.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,14 +1190,14 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn test_unix_stream_syslog() {
use crate::test_util::components::SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS;
use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
use futures_util::{stream, SinkExt};
use std::os::unix::net::UnixStream as StdUnixStream;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio_util::codec::{FramedWrite, LinesCodec};

assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 1;
let in_path = tempfile::tempdir().unwrap().into_path().join("stream_test");

Expand Down
7 changes: 1 addition & 6 deletions src/test_util/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ pub const HTTP_PULL_SOURCE_TAGS: [&str; 2] = ["endpoint", "protocol"];
pub const HTTP_PUSH_SOURCE_TAGS: [&str; 2] = ["http_path", "protocol"];

/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`.
pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 2] = ["peer_addr", "protocol"];

/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`, but
/// specifically sources that experience high cardinality i.e. many many clients, where emitting metrics with the peer
/// address as a tag would represent too high of a cost to pay.
pub const SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"];
pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"];

/// The standard set of tags for all generic socket-based sources that poll connections i.e. Redis.
pub const SOCKET_PULL_SOURCE_TAGS: [&str; 2] = ["remote_addr", "protocol"];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Vector's 0.34.0 release includes **breaking changes**:

1. [Removal of Deprecated Datadog Component Config Options](#datadog-deprecated-config-options)
1. [Removal of Deprecated `component_name` Metric Tag](#deprecated-component-name)
1. [Removal of `peer_addr` Metric Tag](#remove-peer-addr)
1. [Blackhole sink no longer reports by default](#blackhole-sink-reporting)

We cover them below to help you upgrade quickly:
Expand All @@ -33,6 +34,10 @@ been removed from the Enterprise configuration. Instead of `region`, `site` shou

The deprecated `component_name` tag has been removed from all internal metrics. Instead the `component_id` tag should be used.

#### Removal of `peer_addr` Metric Tag {#remove-peer-addr}

The `peer_addr` tag has been removed from the `component_received_bytes_total` internal metric for TCP-based sources due to its unbounded cardinality.

#### Blackhole sink no longer reports by default {#blackhole-sink-reporting}

The `blackhole` sink no longer reports events processed every second by default. Instead this
Expand Down

0 comments on commit b9447f6

Please sign in to comment.