diff --git a/Cargo.lock b/Cargo.lock index 18176b52a3474..07f0a845e9e39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6340,9 +6340,9 @@ dependencies = [ [[package]] name = "pulsar" -version = "5.1.1" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20f237570b5665b38c7d5228f9a1d2990e369c00e635704528996bcd5219f540" +checksum = "06fbacec81fe6fb82f076279c3aaeb05324478f62c3074f13ecd0452cbec27b2" dependencies = [ "async-trait", "bit-vec 0.6.3", diff --git a/Cargo.toml b/Cargo.toml index d42c2efdf5055..aed442756ae8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -288,7 +288,7 @@ paste = "1.0.12" percent-encoding = { version = "2.2.0", default-features = false } pin-project = { version = "1.1.0", default-features = false } postgres-openssl = { version = "0.5.0", default-features = false, features = ["runtime"], optional = true } -pulsar = { version = "5.1.1", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true } +pulsar = { version = "6.0.0", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true } rand = { version = "0.8.5", default-features = false, features = ["small_rng"] } rand_distr = { version = "0.4.3", default-features = false } rdkafka = { version = "0.31.0", default-features = false, features = ["tokio", "libz", "ssl", "zstd"], optional = true } diff --git a/src/sinks/pulsar/config.rs b/src/sinks/pulsar/config.rs index eba7eba4694d8..d7b9f505175b6 100644 --- a/src/sinks/pulsar/config.rs +++ b/src/sinks/pulsar/config.rs @@ -87,13 +87,17 @@ pub struct PulsarSinkConfig { #[configurable_component] #[derive(Clone, Copy, Debug, Default)] pub(crate) struct PulsarBatchConfig { - /// The maximum size of a batch before it is flushed. + /// The maximum amount of events in a batch before it is flushed. /// /// Note this is an unsigned 32 bit integer which is a smaller capacity than /// many of the other sink batch settings. #[configurable(metadata(docs::type_unit = "events"))] #[configurable(metadata(docs::examples = 1000))] pub max_events: Option, + + /// The maximum size of a batch before it is flushed. + #[configurable(metadata(docs::type_unit = "bytes"))] + pub max_bytes: Option, } /// Authentication configuration. @@ -235,6 +239,7 @@ impl PulsarSinkConfig { metadata: Default::default(), schema: None, batch_size: self.batch.max_events, + batch_byte_size: self.batch.max_bytes, compression: None, }; diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index cd55deaf6093c..cc62959b42f8a 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -86,17 +86,24 @@ base: components: sinks: pulsar: configuration: { batch: { description: "Event batching behavior." required: false - type: object: options: max_events: { - description: """ - The maximum size of a batch before it is flushed. + type: object: options: { + max_bytes: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: unit: "bytes" + } + max_events: { + description: """ + The maximum amount of events in a batch before it is flushed. - Note this is an unsigned 32 bit integer which is a smaller capacity than - many of the other sink batch settings. - """ - required: false - type: uint: { - examples: [1000] - unit: "events" + Note this is an unsigned 32 bit integer which is a smaller capacity than + many of the other sink batch settings. + """ + required: false + type: uint: { + examples: [1000] + unit: "events" + } } } }