Skip to content

Commit

Permalink
chore(deps): bump pulsar from 5.1.1 to 6.0.0 (vectordotdev#17587)
Browse files Browse the repository at this point in the history
- Bumps [pulsar](https://github.com/streamnative/pulsar-rs) from 5.1.1
to 6.0.0.
- As part of this, a new config option for batch `max_bytes` is exposed
for the sink.
  • Loading branch information
neuronull committed Jun 2, 2023
1 parent 8549809 commit 3395cfd
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ paste = "1.0.12"
percent-encoding = { version = "2.2.0", default-features = false }
pin-project = { version = "1.1.0", default-features = false }
postgres-openssl = { version = "0.5.0", default-features = false, features = ["runtime"], optional = true }
pulsar = { version = "5.1.1", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true }
pulsar = { version = "6.0.0", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true }
rand = { version = "0.8.5", default-features = false, features = ["small_rng"] }
rand_distr = { version = "0.4.3", default-features = false }
rdkafka = { version = "0.31.0", default-features = false, features = ["tokio", "libz", "ssl", "zstd"], optional = true }
Expand Down
7 changes: 6 additions & 1 deletion src/sinks/pulsar/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,17 @@ pub struct PulsarSinkConfig {
#[configurable_component]
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct PulsarBatchConfig {
/// The maximum size of a batch before it is flushed.
/// The maximum amount of events in a batch before it is flushed.
///
/// Note this is an unsigned 32 bit integer which is a smaller capacity than
/// many of the other sink batch settings.
#[configurable(metadata(docs::type_unit = "events"))]
#[configurable(metadata(docs::examples = 1000))]
pub max_events: Option<u32>,

/// The maximum size of a batch before it is flushed.
#[configurable(metadata(docs::type_unit = "bytes"))]
pub max_bytes: Option<usize>,
}

/// Authentication configuration.
Expand Down Expand Up @@ -235,6 +239,7 @@ impl PulsarSinkConfig {
metadata: Default::default(),
schema: None,
batch_size: self.batch.max_events,
batch_byte_size: self.batch.max_bytes,
compression: None,
};

Expand Down
27 changes: 17 additions & 10 deletions website/cue/reference/components/sinks/base/pulsar.cue
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,24 @@ base: components: sinks: pulsar: configuration: {
batch: {
description: "Event batching behavior."
required: false
type: object: options: max_events: {
description: """
The maximum size of a batch before it is flushed.
type: object: options: {
max_bytes: {
description: "The maximum size of a batch before it is flushed."
required: false
type: uint: unit: "bytes"
}
max_events: {
description: """
The maximum amount of events in a batch before it is flushed.
Note this is an unsigned 32 bit integer which is a smaller capacity than
many of the other sink batch settings.
"""
required: false
type: uint: {
examples: [1000]
unit: "events"
Note this is an unsigned 32 bit integer which is a smaller capacity than
many of the other sink batch settings.
"""
required: false
type: uint: {
examples: [1000]
unit: "events"
}
}
}
}
Expand Down

0 comments on commit 3395cfd

Please sign in to comment.