Skip to content

Commit

Permalink
chore(observability): ensure sent_event and received_event metric…
Browse files Browse the repository at this point in the history
…s are estimated json size (vectordotdev#17465)

This PR creates a newtype -
[`JsonSize`](https://github.com/vectordotdev/vector/blob/stephen/event_json_size/lib/vector-common/src/json_size.rs)
that is returned by the
`EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of` trait
function.

The events that emit a `component_received_event_bytes_total` or
`component_sent_event_bytes_total` event accept `JsonSize`.

This allows us to use the compiler to ensure we are emitting the correct
measurement. A number of components needed changing to ensure this
worked.

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely committed May 31, 2023
1 parent dbd7151 commit 3b2a2be
Show file tree
Hide file tree
Showing 87 changed files with 807 additions and 449 deletions.
2 changes: 1 addition & 1 deletion lib/vector-common/src/internal_event/events_received.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ crate::registered_event!(
#[allow(clippy::cast_precision_loss)]
self.events_count.record(count as f64);
self.events.increment(count as u64);
self.event_bytes.increment(byte_size as u64);
self.event_bytes.increment(byte_size.get() as u64);
}
);
6 changes: 3 additions & 3 deletions lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ crate::registered_event!(

match &self.output {
Some(output) => {
trace!(message = "Events sent.", count = %count, byte_size = %byte_size, output = %output);
trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
}
None => {
trace!(message = "Events sent.", count = %count, byte_size = %byte_size);
trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
}
}

self.events.increment(count as u64);
self.event_bytes.increment(byte_size as u64);
self.event_bytes.increment(byte_size.get() as u64);
}
);

Expand Down
6 changes: 4 additions & 2 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
pub use prelude::{error_stage, error_type};
pub use service::{CallError, PollReadyError};

use crate::json_size::JsonSize;

pub trait InternalEvent: Sized {
fn emit(self);

Expand Down Expand Up @@ -106,9 +108,9 @@ pub struct ByteSize(pub usize);
#[derive(Clone, Copy)]
pub struct Count(pub usize);

/// Holds the tuple `(count_of_events, size_of_events_in_bytes)`.
/// Holds the tuple `(count_of_events, estimated_json_size_of_events)`.
#[derive(Clone, Copy)]
pub struct CountByteSize(pub usize, pub usize);
pub struct CountByteSize(pub usize, pub JsonSize);

// Wrapper types used to hold parameters for registering events

Expand Down
105 changes: 105 additions & 0 deletions lib/vector-common/src/json_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::{
fmt,
iter::Sum,
ops::{Add, AddAssign, Sub},
};

/// A newtype for the JSON size of an event.
/// Used to emit the `component_received_event_bytes_total` and
/// `component_sent_event_bytes_total` metrics.
#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct JsonSize(usize);

impl fmt::Display for JsonSize {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl Sub for JsonSize {
type Output = JsonSize;

#[inline]
fn sub(mut self, rhs: Self) -> Self::Output {
self.0 -= rhs.0;
self
}
}

impl Add for JsonSize {
type Output = JsonSize;

#[inline]
fn add(mut self, rhs: Self) -> Self::Output {
self.0 += rhs.0;
self
}
}

impl AddAssign for JsonSize {
#[inline]
fn add_assign(&mut self, rhs: Self) {
self.0 += rhs.0;
}
}

impl Sum for JsonSize {
#[inline]
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
let mut accum = 0;
for val in iter {
accum += val.get();
}

JsonSize::new(accum)
}
}

impl From<usize> for JsonSize {
#[inline]
fn from(value: usize) -> Self {
Self(value)
}
}

impl JsonSize {
/// Create a new instance with the specified size.
#[must_use]
#[inline]
pub const fn new(size: usize) -> Self {
Self(size)
}

/// Create a new instance with size 0.
#[must_use]
#[inline]
pub const fn zero() -> Self {
Self(0)
}

/// Returns the contained size.
#[must_use]
#[inline]
pub fn get(&self) -> usize {
self.0
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[allow(clippy::module_name_repetitions)]
pub struct NonZeroJsonSize(JsonSize);

impl NonZeroJsonSize {
#[must_use]
#[inline]
pub fn new(size: JsonSize) -> Option<Self> {
(size.0 > 0).then_some(NonZeroJsonSize(size))
}
}

impl From<NonZeroJsonSize> for JsonSize {
#[inline]
fn from(value: NonZeroJsonSize) -> Self {
value.0
}
}
2 changes: 2 additions & 0 deletions lib/vector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub use vrl::btreemap;
#[cfg(feature = "byte_size_of")]
pub mod byte_size_of;

pub mod json_size;

pub mod config;

#[cfg(feature = "conversion")]
Expand Down
10 changes: 6 additions & 4 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::ops::Add;

use crate::json_size::JsonSize;

/// Metadata for batch requests.
#[derive(Clone, Copy, Debug, Default)]
pub struct RequestMetadata {
Expand All @@ -8,7 +10,7 @@ pub struct RequestMetadata {
/// Size, in bytes, of the in-memory representation of all events in this batch request.
events_byte_size: usize,
/// Size, in bytes, of the estimated JSON-encoded representation of all events in this batch request.
events_estimated_json_encoded_byte_size: usize,
events_estimated_json_encoded_byte_size: JsonSize,
/// Uncompressed size, in bytes, of the encoded events in this batch request.
request_encoded_size: usize,
/// On-the-wire size, in bytes, of the batch request itself after compression, etc.
Expand All @@ -25,7 +27,7 @@ impl RequestMetadata {
events_byte_size: usize,
request_encoded_size: usize,
request_wire_size: usize,
events_estimated_json_encoded_byte_size: usize,
events_estimated_json_encoded_byte_size: JsonSize,
) -> Self {
Self {
event_count,
Expand All @@ -47,7 +49,7 @@ impl RequestMetadata {
}

#[must_use]
pub const fn events_estimated_json_encoded_byte_size(&self) -> usize {
pub const fn events_estimated_json_encoded_byte_size(&self) -> JsonSize {
self.events_estimated_json_encoded_byte_size
}

Expand All @@ -64,7 +66,7 @@ impl RequestMetadata {
/// Constructs a `RequestMetadata` by summation of the "batch" of `RequestMetadata` provided.
#[must_use]
pub fn from_batch<T: IntoIterator<Item = RequestMetadata>>(metadata_iter: T) -> Self {
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, 0);
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, JsonSize::zero());

for metadata in metadata_iter {
metadata_sum = metadata_sum + &metadata;
Expand Down
7 changes: 5 additions & 2 deletions lib/vector-core/src/event/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use futures::{stream, Stream};
#[cfg(test)]
use quickcheck::{Arbitrary, Gen};
use vector_buffers::EventCount;
use vector_common::finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable};
use vector_common::{
finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable},
json_size::JsonSize,
};

use super::{
EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef,
Expand Down Expand Up @@ -253,7 +256,7 @@ impl ByteSizeOf for EventArray {
}

impl EstimatedJsonEncodedSizeOf for EventArray {
fn estimated_json_encoded_size_of(&self) -> usize {
fn estimated_json_encoded_size_of(&self) -> JsonSize {
match self {
Self::Logs(v) => v.estimated_json_encoded_size_of(),
Self::Traces(v) => v.estimated_json_encoded_size_of(),
Expand Down

0 comments on commit 3b2a2be

Please sign in to comment.