Skip to content

Commit

Permalink
fix(sinks): resolve memory leak by always setting a request builder c…
Browse files Browse the repository at this point in the history
…oncurrency limit (vectordotdev#18637)

* fix(kafka sink): performance improvements and fix memory leak

* clippy

* fix(sinks): always set a request builder concurrency limit

* fmt

* set default to WORKER_THREADS and allow env var override

* fix bad merge

* fix bad merge

* revert cargo.lock

* revert cargo.lock

* fix flakey test
  • Loading branch information
dsmith3197 committed Sep 25, 2023
1 parent 89697d1 commit 6ced6ca
Show file tree
Hide file tree
Showing 29 changed files with 117 additions and 243 deletions.
10 changes: 5 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,14 @@ pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtim
rt_builder.enable_all().thread_name(thread_name);

let threads = threads.unwrap_or_else(crate::num_threads);
if threads < 1 {
let threads = NonZeroUsize::new(threads).ok_or_else(|| {
error!("The `threads` argument must be greater or equal to 1.");
return Err(exitcode::CONFIG);
}
exitcode::CONFIG
})?;
WORKER_THREADS
.set(NonZeroUsize::new(threads).expect("already checked"))
.set(threads)
.expect("double thread initialization");
rt_builder.worker_threads(threads);
rt_builder.worker_threads(threads.get());

debug!(messaged = "Building runtime.", worker_threads = threads);
Ok(rt_builder.build().expect("Unable to create async runtime"))
Expand Down
5 changes: 4 additions & 1 deletion src/sinks/amqp/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
SourceSender,
};
use futures::StreamExt;
use std::{sync::Arc, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};
use vector_core::config::LogNamespace;

pub fn make_config() -> AmqpSinkConfig {
Expand Down Expand Up @@ -129,6 +129,9 @@ async fn amqp_happy_path() {
}

assert_eq!(out.len(), input.len());

let input: HashSet<String> = HashSet::from_iter(input);
let out: HashSet<String> = HashSet::from_iter(out);
assert_eq!(out, input);
}

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/amqp/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl AmqpSink {

input
.filter_map(|event| std::future::ready(self.make_amqp_event(event)))
.request_builder(None, request_builder)
.request_builder(default_request_builder_concurrency_limit(), request_builder)
.filter_map(|request| async move {
match request {
Err(e) => {
Expand Down
15 changes: 2 additions & 13 deletions src/sinks/appsignal/sink.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
use futures::{stream::BoxStream, StreamExt};
use futures_util::future::ready;
use tower::{Service, ServiceBuilder};
use vector_core::{
event::Event,
sink::StreamSink,
stream::{BatcherSettings, DriverResponse},
};

use crate::{
codecs::Transformer,
internal_events::SinkRequestBuildError,
sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression},
};
use crate::sinks::{prelude::*, util::buffer::metrics::MetricNormalizer};

use super::{
encoder::AppsignalEncoder,
Expand Down Expand Up @@ -47,7 +36,7 @@ where
})
.batched(self.batch_settings.into_byte_size_config())
.request_builder(
None,
default_request_builder_concurrency_limit(),
AppsignalRequestBuilder {
compression: self.compression,
encoder: AppsignalEncoder {
Expand Down
9 changes: 5 additions & 4 deletions src/sinks/aws_kinesis/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize};
use std::{borrow::Cow, fmt::Debug, marker::PhantomData};

use lookup::lookup_v2::ConfigValuePath;
use rand::random;
Expand Down Expand Up @@ -42,8 +42,6 @@ where
R: Record + Send + Sync + Unpin + Clone + 'static,
{
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let request_builder_concurrency_limit = NonZeroUsize::new(50);

input
.filter_map(|event| {
// Panic: This sink only accepts Logs, so this should never panic
Expand All @@ -52,7 +50,10 @@ where

future::ready(processed)
})
.request_builder(request_builder_concurrency_limit, self.request_builder)
.request_builder(
default_request_builder_concurrency_limit(),
self.request_builder,
)
.filter_map(|request| async move {
match request {
Err(error) => {
Expand Down
20 changes: 5 additions & 15 deletions src/sinks/aws_s_s/sink.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
use std::num::NonZeroUsize;

use futures::stream::BoxStream;
use futures_util::StreamExt;
use vector_core::sink::StreamSink;

use super::{client::Client, request_builder::SSRequestBuilder, service::SSService};
use crate::internal_events::SinkRequestBuildError;
use crate::sinks::aws_s_s::retry::SSRetryLogic;
use crate::{
event::Event,
sinks::util::{
builder::SinkBuilderExt, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig,
},
};
use crate::sinks::prelude::*;

#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct SqsSinkDefaultBatchSettings;
Expand Down Expand Up @@ -55,14 +43,16 @@ where
let request = self
.request
.unwrap_with(&TowerRequestConfig::default().timeout_secs(30));
let request_builder_concurrency_limit = NonZeroUsize::new(50);
let retry_logic: SSRetryLogic<E> = super::retry::SSRetryLogic::new();
let service = tower::ServiceBuilder::new()
.settings(request, retry_logic)
.service(self.service);

input
.request_builder(request_builder_concurrency_limit, self.request_builder)
.request_builder(
default_request_builder_concurrency_limit(),
self.request_builder,
)
.filter_map(|req| async move {
req.map_err(|error| {
emit!(SinkRequestBuildError { error });
Expand Down
22 changes: 3 additions & 19 deletions src/sinks/azure_common/sink.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
use std::{fmt, num::NonZeroUsize};
use std::fmt;

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures_util::StreamExt;
use tower::Service;
use vector_common::request_metadata::MetaDescriptive;
use vector_core::{
event::Finalizable,
sink::StreamSink,
stream::{BatcherSettings, DriverResponse},
};

use crate::{
event::Event,
internal_events::SinkRequestBuildError,
sinks::util::{partitioner::KeyPartitioner, RequestBuilder, SinkBuilderExt},
};
use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner};

pub struct AzureBlobSink<Svc, RB> {
service: Svc,
Expand Down Expand Up @@ -54,7 +39,6 @@ where
let partitioner = self.partitioner;
let settings = self.batcher_settings;

let builder_limit = NonZeroUsize::new(64);
let request_builder = self.request_builder;

input
Expand All @@ -65,7 +49,7 @@ where
// that occurs.
key.map(move |k| (k, batch))
})
.request_builder(builder_limit, request_builder)
.request_builder(default_request_builder_concurrency_limit(), request_builder)
.filter_map(|request| async move {
match request {
Err(error) => {
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/azure_monitor_logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ where
input
.batched(self.batch_settings.into_byte_size_config())
.request_builder(
None,
default_request_builder_concurrency_limit(),
AzureMonitorLogsRequestBuilder {
encoding: self.encoding,
},
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/clickhouse/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ClickhouseSink {
)
.filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
.request_builder(
None,
default_request_builder_concurrency_limit(),
ClickhouseRequestBuilder {
compression: self.compression,
encoding: self.encoding,
Expand Down
18 changes: 5 additions & 13 deletions src/sinks/databend/sink.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
use std::num::NonZeroUsize;

use futures_util::{stream::BoxStream, StreamExt};
use vector_core::event::Event;
use vector_core::sink::StreamSink;
use vector_core::stream::BatcherSettings;

use crate::{
internal_events::SinkRequestBuildError,
sinks::util::{service::Svc, SinkBuilderExt},
};
use crate::sinks::prelude::*;

use super::request_builder::DatabendRequestBuilder;
use super::service::{DatabendRetryLogic, DatabendService};
Expand All @@ -33,10 +23,12 @@ impl DatabendSink {
}

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let builder_limit = NonZeroUsize::new(64);
input
.batched(self.batch_settings.into_byte_size_config())
.request_builder(builder_limit, self.request_builder)
.request_builder(
default_request_builder_concurrency_limit(),
self.request_builder,
)
.filter_map(|request| async move {
match request {
Err(error) => {
Expand Down
18 changes: 7 additions & 11 deletions src/sinks/datadog/events/sink.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
use std::{fmt, num::NonZeroUsize};
use std::fmt;

use async_trait::async_trait;
use futures::{stream::BoxStream, StreamExt};
use lookup::event_path;
use tower::Service;
use vector_core::stream::DriverResponse;

use crate::{
event::Event,
internal_events::{ParserMissingFieldError, SinkRequestBuildError, DROP_EVENT},
internal_events::{ParserMissingFieldError, DROP_EVENT},
sinks::{
datadog::events::request_builder::{DatadogEventsRequest, DatadogEventsRequestBuilder},
util::{SinkBuilderExt, StreamSink},
prelude::*,
},
};

Expand All @@ -27,11 +22,12 @@ where
S::Error: fmt::Debug + Into<crate::Error> + Send,
{
async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let concurrency_limit = NonZeroUsize::new(50);

input
.filter_map(ensure_required_fields)
.request_builder(concurrency_limit, DatadogEventsRequestBuilder::new())
.request_builder(
default_request_builder_concurrency_limit(),
DatadogEventsRequestBuilder::new(),
)
.filter_map(|request| async move {
match request {
Err(error) => {
Expand Down
27 changes: 5 additions & 22 deletions src/sinks/datadog/logs/sink.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,14 @@
use std::{fmt::Debug, io, num::NonZeroUsize, sync::Arc};
use std::{fmt::Debug, io, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
use codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig};
use futures::stream::{BoxStream, StreamExt};
use lookup::event_path;
use snafu::Snafu;
use tower::Service;
use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata};
use vector_core::{
event::{Event, EventFinalizers, Finalizable, Value},
partition::Partitioner,
sink::StreamSink,
stream::{BatcherSettings, DriverResponse},
};

use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
use crate::{
codecs::{Encoder, Transformer},
internal_events::SinkRequestBuildError,
sinks::util::{
encoding::{write_all, Encoder as _},
metadata::RequestMetadataBuilder,
request_builder::EncodeResult,
Compression, Compressor, RequestBuilder, SinkBuilderExt,
},
use crate::sinks::{
prelude::*,
util::{encoding::Encoder as _, Compressor},
};
#[derive(Default)]
struct EventPartitioner;
Expand Down Expand Up @@ -278,11 +262,10 @@ where

let partitioner = EventPartitioner;

let builder_limit = NonZeroUsize::new(64);
let input = input.batched_partitioned(partitioner, self.batch_settings);
input
.request_builder(
builder_limit,
default_request_builder_concurrency_limit(),
LogRequestBuilder {
default_api_key,
encoding: self.encoding,
Expand Down
18 changes: 6 additions & 12 deletions src/sinks/elasticsearch/sink.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use std::{fmt, num::NonZeroUsize};
use std::fmt;

use async_trait::async_trait;
use futures::{future, stream::BoxStream, StreamExt};
use lookup::lookup_v2::ConfigValuePath;
use tower::Service;
use vector_core::stream::{BatcherSettings, DriverResponse};
use vrl::path::PathPrefix;

use crate::{
codecs::Transformer,
event::{Event, LogEvent, Value},
internal_events::SinkRequestBuildError,
sinks::{
elasticsearch::{
encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder,
service::ElasticsearchRequest, BulkAction, ElasticsearchCommonMode,
},
util::{SinkBuilderExt, StreamSink},
prelude::*,
},
transforms::metric_to_log::MetricToLog,
};
Expand Down Expand Up @@ -67,8 +60,6 @@ where
S::Error: fmt::Debug + Into<crate::Error> + Send,
{
pub async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let request_builder_concurrency_limit = NonZeroUsize::new(50);

let mode = self.mode;
let id_key_field = self.id_key_field.as_ref();
let transformer = self.transformer.clone();
Expand All @@ -91,7 +82,10 @@ where
future::ready(process_log(log, &mode, id_key_field, &transformer))
})
.batched(self.batch_settings.into_byte_size_config())
.request_builder(request_builder_concurrency_limit, self.request_builder)
.request_builder(
default_request_builder_concurrency_limit(),
self.request_builder,
)
.filter_map(|request| async move {
match request {
Err(error) => {
Expand Down
5 changes: 4 additions & 1 deletion src/sinks/gcp/stackdriver/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ where
.into_item_size_config(HttpJsonBatchSizer),
)
// Build requests with no concurrency limit.
.request_builder(None, self.request_builder)
.request_builder(
default_request_builder_concurrency_limit(),
self.request_builder,
)
// Filter out any errors that occurred in the request building.
.filter_map(|request| async move {
match request {
Expand Down
Loading

0 comments on commit 6ced6ca

Please sign in to comment.