From 766f86db10df086299390c38392875ea76d954ff Mon Sep 17 00:00:00 2001 From: jeffail Date: Wed, 12 Dec 2018 22:02:03 +0000 Subject: [PATCH] Rename max_batch_size to max_batch_count --- CHANGELOG.md | 6 ++++++ config/amqp.json | 2 +- config/amqp.yaml | 2 +- config/env/README.md | 6 +++--- config/env/default.yaml | 6 +++--- config/everything.yaml | 6 +++--- config/kafka.json | 2 +- config/kafka.yaml | 2 +- config/kafka_balanced.json | 2 +- config/kafka_balanced.yaml | 2 +- docs/inputs/README.md | 12 ++++++------ lib/input/amqp.go | 2 +- lib/input/kafka.go | 2 +- lib/input/kafka_balanced.go | 2 +- lib/input/reader/amqp.go | 6 +++--- lib/input/reader/amqp_test.go | 6 +++--- lib/input/reader/kafka.go | 6 +++--- lib/input/reader/kafka_balanced.go | 6 +++--- 18 files changed, 42 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9dd9a06e9..8033765028 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. ## Unreleased +## 0.41.0 - 2018-12-12 + +### Changed + +- Renamed `max_batch_size` to `max_batch_count` for consistency. + ## 0.40.2 - 2018-12-12 ### Added diff --git a/config/amqp.json b/config/amqp.json index 5ec3035abd..b88b164b09 100644 --- a/config/amqp.json +++ b/config/amqp.json @@ -10,7 +10,7 @@ "amqp": { "bindings_declare": [], "consumer_tag": "benthos-consumer", - "max_batch_size": 1, + "max_batch_count": 1, "prefetch_count": 10, "prefetch_size": 0, "queue": "benthos-queue", diff --git a/config/amqp.yaml b/config/amqp.yaml index 3cfc74eab9..dfb873ada6 100644 --- a/config/amqp.yaml +++ b/config/amqp.yaml @@ -9,7 +9,7 @@ input: amqp: bindings_declare: [] consumer_tag: benthos-consumer - max_batch_size: 1 + max_batch_count: 1 prefetch_count: 10 prefetch_size: 0 queue: benthos-queue diff --git a/config/env/README.md b/config/env/README.md index 95e4d6ebba..3474ed3c3a 100644 --- a/config/env/README.md +++ b/config/env/README.md @@ -54,7 +54,7 @@ HTTP_ROOT_PATH = /benthos INPUTS = 1 INPUT_TYPE = dynamic INPUT_AMQP_CONSUMER_TAG = benthos-consumer -INPUT_AMQP_MAX_BATCH_SIZE = 1 +INPUT_AMQP_MAX_BATCH_COUNT = 1 INPUT_AMQP_PREFETCH_COUNT = 10 INPUT_AMQP_PREFETCH_SIZE = 0 INPUT_AMQP_QUEUE = benthos-queue @@ -117,7 +117,7 @@ INPUT_KAFKA_BALANCED_ADDRESSES = localhost:9092 INPUT_KAFKA_BALANCED_CLIENT_ID = benthos_kafka_input INPUT_KAFKA_BALANCED_COMMIT_PERIOD_MS = 1000 INPUT_KAFKA_BALANCED_CONSUMER_GROUP = benthos_consumer_group -INPUT_KAFKA_BALANCED_MAX_BATCH_SIZE = 1 +INPUT_KAFKA_BALANCED_MAX_BATCH_COUNT = 1 INPUT_KAFKA_BALANCED_START_FROM_OLDEST = true INPUT_KAFKA_BALANCED_TARGET_VERSION = 1.0.0 INPUT_KAFKA_BALANCED_TLS_ENABLED = false @@ -127,7 +127,7 @@ INPUT_KAFKA_BALANCED_TOPICS = benthos_stream INPUT_KAFKA_CLIENT_ID = benthos_kafka_input INPUT_KAFKA_COMMIT_PERIOD_MS = 1000 INPUT_KAFKA_CONSUMER_GROUP = benthos_consumer_group -INPUT_KAFKA_MAX_BATCH_SIZE = 1 +INPUT_KAFKA_MAX_BATCH_COUNT = 1 INPUT_KAFKA_PARTITION = 0 INPUT_KAFKA_START_FROM_OLDEST = true INPUT_KAFKA_TARGET_VERSION = 1.0.0 diff --git a/config/env/default.yaml b/config/env/default.yaml index 06696f1d0d..8b70edc41d 100644 --- a/config/env/default.yaml +++ b/config/env/default.yaml @@ -10,7 +10,7 @@ input: inputs: - amqp: consumer_tag: ${INPUT_AMQP_CONSUMER_TAG:benthos-consumer} - max_batch_size: ${INPUT_AMQP_MAX_BATCH_SIZE:1} + max_batch_count: ${INPUT_AMQP_MAX_BATCH_COUNT:1} prefetch_count: ${INPUT_AMQP_PREFETCH_COUNT:10} prefetch_size: ${INPUT_AMQP_PREFETCH_SIZE:0} queue: ${INPUT_AMQP_QUEUE:benthos-queue} @@ -90,7 +90,7 @@ input: client_id: ${INPUT_KAFKA_CLIENT_ID:benthos_kafka_input} commit_period_ms: ${INPUT_KAFKA_COMMIT_PERIOD_MS:1000} consumer_group: ${INPUT_KAFKA_CONSUMER_GROUP:benthos_consumer_group} - max_batch_size: ${INPUT_KAFKA_MAX_BATCH_SIZE:1} + max_batch_count: ${INPUT_KAFKA_MAX_BATCH_COUNT:1} partition: ${INPUT_KAFKA_PARTITION:0} start_from_oldest: ${INPUT_KAFKA_START_FROM_OLDEST:true} target_version: ${INPUT_KAFKA_TARGET_VERSION:1.0.0} @@ -105,7 +105,7 @@ input: client_id: ${INPUT_KAFKA_BALANCED_CLIENT_ID:benthos_kafka_input} commit_period_ms: ${INPUT_KAFKA_BALANCED_COMMIT_PERIOD_MS:1000} consumer_group: ${INPUT_KAFKA_BALANCED_CONSUMER_GROUP:benthos_consumer_group} - max_batch_size: ${INPUT_KAFKA_BALANCED_MAX_BATCH_SIZE:1} + max_batch_count: ${INPUT_KAFKA_BALANCED_MAX_BATCH_COUNT:1} start_from_oldest: ${INPUT_KAFKA_BALANCED_START_FROM_OLDEST:true} target_version: ${INPUT_KAFKA_BALANCED_TARGET_VERSION:1.0.0} tls: diff --git a/config/everything.yaml b/config/everything.yaml index 0f2f3cd736..0c4b12e4c0 100644 --- a/config/everything.yaml +++ b/config/everything.yaml @@ -15,7 +15,7 @@ input: consumer_tag: benthos-consumer prefetch_count: 10 prefetch_size: 0 - max_batch_size: 1 + max_batch_count: 1 tls: enabled: false root_cas_file: "" @@ -99,7 +99,7 @@ input: partition: 0 start_from_oldest: true target_version: 1.0.0 - max_batch_size: 1 + max_batch_count: 1 tls: enabled: false root_cas_file: "" @@ -115,7 +115,7 @@ input: - benthos_stream start_from_oldest: true target_version: 1.0.0 - max_batch_size: 1 + max_batch_count: 1 tls: enabled: false root_cas_file: "" diff --git a/config/kafka.json b/config/kafka.json index b9fc7ae019..b86325bc3a 100644 --- a/config/kafka.json +++ b/config/kafka.json @@ -14,7 +14,7 @@ "client_id": "benthos_kafka_input", "commit_period_ms": 1000, "consumer_group": "benthos_consumer_group", - "max_batch_size": 1, + "max_batch_count": 1, "partition": 0, "start_from_oldest": true, "target_version": "1.0.0", diff --git a/config/kafka.yaml b/config/kafka.yaml index 8e21a248cc..f4c08b25b0 100644 --- a/config/kafka.yaml +++ b/config/kafka.yaml @@ -12,7 +12,7 @@ input: client_id: benthos_kafka_input commit_period_ms: 1000 consumer_group: benthos_consumer_group - max_batch_size: 1 + max_batch_count: 1 partition: 0 start_from_oldest: true target_version: 1.0.0 diff --git a/config/kafka_balanced.json b/config/kafka_balanced.json index 5132f43fb8..81920f2b25 100644 --- a/config/kafka_balanced.json +++ b/config/kafka_balanced.json @@ -14,7 +14,7 @@ "client_id": "benthos_kafka_input", "commit_period_ms": 1000, "consumer_group": "benthos_consumer_group", - "max_batch_size": 1, + "max_batch_count": 1, "start_from_oldest": true, "target_version": "1.0.0", "tls": { diff --git a/config/kafka_balanced.yaml b/config/kafka_balanced.yaml index 9876c45210..1b5851497f 100644 --- a/config/kafka_balanced.yaml +++ b/config/kafka_balanced.yaml @@ -12,7 +12,7 @@ input: client_id: benthos_kafka_input commit_period_ms: 1000 consumer_group: benthos_consumer_group - max_batch_size: 1 + max_batch_count: 1 start_from_oldest: true target_version: 1.0.0 tls: diff --git a/docs/inputs/README.md b/docs/inputs/README.md index 77be36cb4c..e46c03ec74 100644 --- a/docs/inputs/README.md +++ b/docs/inputs/README.md @@ -70,7 +70,7 @@ type: amqp amqp: bindings_declare: [] consumer_tag: benthos-consumer - max_batch_size: 1 + max_batch_count: 1 prefetch_count: 10 prefetch_size: 0 queue: benthos-queue @@ -88,7 +88,7 @@ amqp: Connects to an AMQP (0.91) queue. AMQP is a messaging protocol used by various message brokers, including RabbitMQ. -The field `max_batch_size` specifies the maximum number of prefetched +The field `max_batch_count` specifies the maximum number of prefetched messages to be batched together. When more than one message is batched they can be split into individual messages with the `split` processor. @@ -412,7 +412,7 @@ kafka: client_id: benthos_kafka_input commit_period_ms: 1000 consumer_group: benthos_consumer_group - max_batch_size: 1 + max_batch_count: 1 partition: 0 start_from_oldest: true target_version: 1.0.0 @@ -429,7 +429,7 @@ consumer group (set via config). Only one partition per input is supported, if you wish to balance partitions across a consumer group look at the `kafka_balanced` input type instead. -The field `max_batch_size` specifies the maximum number of prefetched +The field `max_batch_count` specifies the maximum number of prefetched messages to be batched together. When more than one message is batched they can be split into individual messages with the `split` processor. @@ -482,7 +482,7 @@ kafka_balanced: client_id: benthos_kafka_input commit_period_ms: 1000 consumer_group: benthos_consumer_group - max_batch_size: 1 + max_batch_count: 1 start_from_oldest: true target_version: 1.0.0 tls: @@ -498,7 +498,7 @@ Connects to a kafka (0.9+) server. Offsets are managed within kafka as per the consumer group (set via config), and partitions are automatically balanced across any members of the consumer group. -The field `max_batch_size` specifies the maximum number of prefetched +The field `max_batch_count` specifies the maximum number of prefetched messages to be batched together. When more than one message is batched they can be split into individual messages with the `split` processor. diff --git a/lib/input/amqp.go b/lib/input/amqp.go index d229e335cb..dbf6c53964 100644 --- a/lib/input/amqp.go +++ b/lib/input/amqp.go @@ -36,7 +36,7 @@ func init() { Connects to an AMQP (0.91) queue. AMQP is a messaging protocol used by various message brokers, including RabbitMQ. -The field ` + "`max_batch_size`" + ` specifies the maximum number of prefetched +The field ` + "`max_batch_count`" + ` specifies the maximum number of prefetched messages to be batched together. When more than one message is batched they can be split into individual messages with the ` + "`split`" + ` processor. diff --git a/lib/input/kafka.go b/lib/input/kafka.go index 36fa0588c3..886bbba4f0 100644 --- a/lib/input/kafka.go +++ b/lib/input/kafka.go @@ -39,7 +39,7 @@ consumer group (set via config). Only one partition per input is supported, if you wish to balance partitions across a consumer group look at the ` + "`kafka_balanced`" + ` input type instead. -The field ` + "`max_batch_size`" + ` specifies the maximum number of prefetched +The field ` + "`max_batch_count`" + ` specifies the maximum number of prefetched messages to be batched together. When more than one message is batched they can be split into individual messages with the ` + "`split`" + ` processor. diff --git a/lib/input/kafka_balanced.go b/lib/input/kafka_balanced.go index 1c0ec0d0d4..b15873c5f0 100644 --- a/lib/input/kafka_balanced.go +++ b/lib/input/kafka_balanced.go @@ -38,7 +38,7 @@ Connects to a kafka (0.9+) server. Offsets are managed within kafka as per the consumer group (set via config), and partitions are automatically balanced across any members of the consumer group. -The field ` + "`max_batch_size`" + ` specifies the maximum number of prefetched +The field ` + "`max_batch_count`" + ` specifies the maximum number of prefetched messages to be batched together. When more than one message is batched they can be split into individual messages with the ` + "`split`" + ` processor. diff --git a/lib/input/reader/amqp.go b/lib/input/reader/amqp.go index 08e56375f7..14f10f72f9 100644 --- a/lib/input/reader/amqp.go +++ b/lib/input/reader/amqp.go @@ -61,7 +61,7 @@ type AMQPConfig struct { ConsumerTag string `json:"consumer_tag" yaml:"consumer_tag"` PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"` PrefetchSize int `json:"prefetch_size" yaml:"prefetch_size"` - MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"` + MaxBatchCount int `json:"max_batch_count" yaml:"max_batch_count"` TLS btls.Config `json:"tls" yaml:"tls"` } @@ -78,7 +78,7 @@ func NewAMQPConfig() AMQPConfig { PrefetchCount: 10, PrefetchSize: 0, TLS: btls.NewConfig(), - MaxBatchSize: 1, + MaxBatchCount: 1, BindingsDeclare: []AMQPBindingConfig{}, } } @@ -335,7 +335,7 @@ func (a *AMQP) Read() (types.Message, error) { addPart(data) batchLoop: - for i := 1; i < a.conf.MaxBatchSize; i++ { + for i := 1; i < a.conf.MaxBatchCount; i++ { select { case data, open = <-c: if !open { diff --git a/lib/input/reader/amqp_test.go b/lib/input/reader/amqp_test.go index 87729886a2..933364dcd6 100644 --- a/lib/input/reader/amqp_test.go +++ b/lib/input/reader/amqp_test.go @@ -199,7 +199,7 @@ func testAMQPBatch(url string, t *testing.T) { conf := NewAMQPConfig() conf.URL = url conf.QueueDeclare.Enabled = true - conf.MaxBatchSize = 10 + conf.MaxBatchCount = 10 conf.BindingsDeclare = append(conf.BindingsDeclare, AMQPBindingConfig{ Exchange: exchange, RoutingKey: key, @@ -256,8 +256,8 @@ func testAMQPBatch(url string, t *testing.T) { ContentType: "application/octet-stream", ContentEncoding: "", Body: []byte(testStr), - DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent - Priority: 0, // 0-9 + DeliveryMode: 1, // 1=non-persistent, 2=persistent + Priority: 0, // 0-9 }); pErr != nil { t.Error(pErr) } diff --git a/lib/input/reader/kafka.go b/lib/input/reader/kafka.go index 1fec88ac9c..61a334dea5 100644 --- a/lib/input/reader/kafka.go +++ b/lib/input/reader/kafka.go @@ -47,7 +47,7 @@ type KafkaConfig struct { Partition int32 `json:"partition" yaml:"partition"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` TargetVersion string `json:"target_version" yaml:"target_version"` - MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"` + MaxBatchCount int `json:"max_batch_count" yaml:"max_batch_count"` TLS btls.Config `json:"tls" yaml:"tls"` } @@ -62,7 +62,7 @@ func NewKafkaConfig() KafkaConfig { Partition: 0, StartFromOldest: true, TargetVersion: sarama.V1_0_0_0.String(), - MaxBatchSize: 1, + MaxBatchCount: 1, TLS: btls.NewConfig(), } } @@ -299,7 +299,7 @@ func (k *Kafka) Read() (types.Message, error) { addPart(data) batchLoop: - for i := 1; i < k.conf.MaxBatchSize; i++ { + for i := 1; i < k.conf.MaxBatchCount; i++ { select { case data, open = <-partConsumer.Messages(): if !open { diff --git a/lib/input/reader/kafka_balanced.go b/lib/input/reader/kafka_balanced.go index 46768a29bc..ef5ca6137d 100644 --- a/lib/input/reader/kafka_balanced.go +++ b/lib/input/reader/kafka_balanced.go @@ -47,7 +47,7 @@ type KafkaBalancedConfig struct { Topics []string `json:"topics" yaml:"topics"` StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"` TargetVersion string `json:"target_version" yaml:"target_version"` - MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"` + MaxBatchCount int `json:"max_batch_count" yaml:"max_batch_count"` TLS btls.Config `json:"tls" yaml:"tls"` } @@ -61,7 +61,7 @@ func NewKafkaBalancedConfig() KafkaBalancedConfig { Topics: []string{"benthos_stream"}, StartFromOldest: true, TargetVersion: sarama.V1_0_0_0.String(), - MaxBatchSize: 1, + MaxBatchCount: 1, TLS: btls.NewConfig(), } } @@ -265,7 +265,7 @@ func (k *KafkaBalanced) Read() (types.Message, error) { addPart(data) batchLoop: - for i := 1; i < k.conf.MaxBatchSize; i++ { + for i := 1; i < k.conf.MaxBatchCount; i++ { select { case data, open = <-consumer.Messages(): if !open {