Skip to content

Commit

Permalink
fix(demo gcp_pubsub internal_metrics source throttle transform): Fix …
Browse files Browse the repository at this point in the history
…`interval` fractional second parsing (vectordotdev#17917)

The demo logs source `interval` parameter examples imply that demo logs
faster than 1/sec are possible by specifying a fractional value.
However, the demo logs source incorrectly parses this value using
serde's
[DurationSeconds](https://docs.rs/serde_with/latest/serde_with/struct.DurationSeconds.html).
This is incorrect - fractional second parts are coerced into a
full-integer value based on rounding rules. So currently, `interval`
values of >= 0.5 to 1 behave the same as `interval = 1`, while
`interval` values of < 0.5 behave the same as `interval = 0`. The
correct serde struct to use is
[DurationSecondsWithFrac](https://docs.rs/serde_with/latest/serde_with/struct.DurationSecondsWithFrac.html).

(edit): This has since expanded to complete replacement of all usages of
`DurationSeconds<f64, Strict>` with `DurationSecondsWithFrac<f64,
Strict>`. This now also affects `gcp_pubsub` and `internal_metrics`
sources, as well as the `throttle` transform.
  • Loading branch information
sbalmos committed Jul 18, 2023
1 parent 32950d8 commit b44a431
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 8 deletions.
4 changes: 2 additions & 2 deletions lib/vector-config/src/external/serde_with.rs
Expand Up @@ -50,7 +50,7 @@ where
impl Configurable for serde_with::DurationSeconds<u64, serde_with::formats::Strict> {
fn referenceable_name() -> Option<&'static str> {
// We're masking the type parameters here because we only deal with whole seconds via this
// version, and handle fractional seconds with `DurationSeconds<f64, Strict>`, which we
// version, and handle fractional seconds with `DurationSecondsWithFrac<f64, Strict>`, which we
// expose as `serde_with::DurationFractionalSeconds`.
Some("serde_with::DurationSeconds")
}
Expand All @@ -76,7 +76,7 @@ impl Configurable for serde_with::DurationSeconds<u64, serde_with::formats::Stri
}
}

impl Configurable for serde_with::DurationSeconds<f64, serde_with::formats::Strict> {
impl Configurable for serde_with::DurationSecondsWithFrac<f64, serde_with::formats::Strict> {
fn referenceable_name() -> Option<&'static str> {
// We're masking the type parameters here because we only deal with fractional seconds via this
// version, and handle whole seconds with `DurationSeconds<u64, Strict>`, which we
Expand Down
2 changes: 1 addition & 1 deletion src/sources/demo_logs.rs
Expand Up @@ -48,7 +48,7 @@ pub struct DemoLogsConfig {
#[derivative(Default(value = "default_interval()"))]
#[serde(default = "default_interval")]
#[configurable(metadata(docs::examples = 1.0, docs::examples = 0.1, docs::examples = 0.01,))]
#[serde_as(as = "serde_with::DurationSeconds<f64>")]
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
pub interval: Duration,

/// The total number of lines to output.
Expand Down
6 changes: 3 additions & 3 deletions src/sources/gcp_pubsub.rs
Expand Up @@ -162,7 +162,7 @@ pub struct PubsubConfig {
/// How often to poll the currently active streams to see if they
/// are all busy and so open a new stream.
#[serde(default = "default_poll_time")]
#[serde_as(as = "serde_with::DurationSeconds<f64>")]
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
#[configurable(metadata(docs::human_name = "Poll Time"))]
pub poll_time_seconds: Duration,

Expand All @@ -184,7 +184,7 @@ pub struct PubsubConfig {

/// The amount of time, in seconds, to wait between retry attempts after an error.
#[serde(default = "default_retry_delay")]
#[serde_as(as = "serde_with::DurationSeconds<f64>")]
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
#[configurable(metadata(docs::human_name = "Retry Delay"))]
pub retry_delay_secs: Duration,

Expand All @@ -198,7 +198,7 @@ pub struct PubsubConfig {
/// before sending a keepalive request. If this is set larger than
/// `60`, you may see periodic errors sent from the server.
#[serde(default = "default_keepalive")]
#[serde_as(as = "serde_with::DurationSeconds<f64>")]
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
#[configurable(metadata(docs::human_name = "Keepalive"))]
pub keepalive_secs: Duration,

Expand Down
2 changes: 1 addition & 1 deletion src/sources/internal_metrics.rs
Expand Up @@ -27,7 +27,7 @@ use crate::{
#[serde(deny_unknown_fields, default)]
pub struct InternalMetricsConfig {
/// The interval between metric gathering, in seconds.
#[serde_as(as = "serde_with::DurationSeconds<f64>")]
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
#[serde(default = "default_scrape_interval")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
pub scrape_interval_secs: Duration,
Expand Down
2 changes: 1 addition & 1 deletion src/transforms/throttle.rs
Expand Up @@ -30,7 +30,7 @@ pub struct ThrottleConfig {
threshold: u32,

/// The time window in which the configured `threshold` is applied, in seconds.
#[serde_as(as = "serde_with::DurationSeconds<f64>")]
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
#[configurable(metadata(docs::human_name = "Time Window"))]
window_secs: Duration,

Expand Down

0 comments on commit b44a431

Please sign in to comment.