Skip to content

Commit

Permalink
Rename max_batch_size to max_batch_count
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Dec 12, 2018
1 parent 8c2b71a commit 766f86d
Show file tree
Hide file tree
Showing 18 changed files with 42 additions and 36 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.

## Unreleased

## 0.41.0 - 2018-12-12

### Changed

- Renamed `max_batch_size` to `max_batch_count` for consistency.

## 0.40.2 - 2018-12-12

### Added
Expand Down
2 changes: 1 addition & 1 deletion config/amqp.json
Expand Up @@ -10,7 +10,7 @@
"amqp": {
"bindings_declare": [],
"consumer_tag": "benthos-consumer",
"max_batch_size": 1,
"max_batch_count": 1,
"prefetch_count": 10,
"prefetch_size": 0,
"queue": "benthos-queue",
Expand Down
2 changes: 1 addition & 1 deletion config/amqp.yaml
Expand Up @@ -9,7 +9,7 @@ input:
amqp:
bindings_declare: []
consumer_tag: benthos-consumer
max_batch_size: 1
max_batch_count: 1
prefetch_count: 10
prefetch_size: 0
queue: benthos-queue
Expand Down
6 changes: 3 additions & 3 deletions config/env/README.md
Expand Up @@ -54,7 +54,7 @@ HTTP_ROOT_PATH = /benthos
INPUTS = 1
INPUT_TYPE = dynamic
INPUT_AMQP_CONSUMER_TAG = benthos-consumer
INPUT_AMQP_MAX_BATCH_SIZE = 1
INPUT_AMQP_MAX_BATCH_COUNT = 1
INPUT_AMQP_PREFETCH_COUNT = 10
INPUT_AMQP_PREFETCH_SIZE = 0
INPUT_AMQP_QUEUE = benthos-queue
Expand Down Expand Up @@ -117,7 +117,7 @@ INPUT_KAFKA_BALANCED_ADDRESSES = localhost:9092
INPUT_KAFKA_BALANCED_CLIENT_ID = benthos_kafka_input
INPUT_KAFKA_BALANCED_COMMIT_PERIOD_MS = 1000
INPUT_KAFKA_BALANCED_CONSUMER_GROUP = benthos_consumer_group
INPUT_KAFKA_BALANCED_MAX_BATCH_SIZE = 1
INPUT_KAFKA_BALANCED_MAX_BATCH_COUNT = 1
INPUT_KAFKA_BALANCED_START_FROM_OLDEST = true
INPUT_KAFKA_BALANCED_TARGET_VERSION = 1.0.0
INPUT_KAFKA_BALANCED_TLS_ENABLED = false
Expand All @@ -127,7 +127,7 @@ INPUT_KAFKA_BALANCED_TOPICS = benthos_stream
INPUT_KAFKA_CLIENT_ID = benthos_kafka_input
INPUT_KAFKA_COMMIT_PERIOD_MS = 1000
INPUT_KAFKA_CONSUMER_GROUP = benthos_consumer_group
INPUT_KAFKA_MAX_BATCH_SIZE = 1
INPUT_KAFKA_MAX_BATCH_COUNT = 1
INPUT_KAFKA_PARTITION = 0
INPUT_KAFKA_START_FROM_OLDEST = true
INPUT_KAFKA_TARGET_VERSION = 1.0.0
Expand Down
6 changes: 3 additions & 3 deletions config/env/default.yaml
Expand Up @@ -10,7 +10,7 @@ input:
inputs:
- amqp:
consumer_tag: ${INPUT_AMQP_CONSUMER_TAG:benthos-consumer}
max_batch_size: ${INPUT_AMQP_MAX_BATCH_SIZE:1}
max_batch_count: ${INPUT_AMQP_MAX_BATCH_COUNT:1}
prefetch_count: ${INPUT_AMQP_PREFETCH_COUNT:10}
prefetch_size: ${INPUT_AMQP_PREFETCH_SIZE:0}
queue: ${INPUT_AMQP_QUEUE:benthos-queue}
Expand Down Expand Up @@ -90,7 +90,7 @@ input:
client_id: ${INPUT_KAFKA_CLIENT_ID:benthos_kafka_input}
commit_period_ms: ${INPUT_KAFKA_COMMIT_PERIOD_MS:1000}
consumer_group: ${INPUT_KAFKA_CONSUMER_GROUP:benthos_consumer_group}
max_batch_size: ${INPUT_KAFKA_MAX_BATCH_SIZE:1}
max_batch_count: ${INPUT_KAFKA_MAX_BATCH_COUNT:1}
partition: ${INPUT_KAFKA_PARTITION:0}
start_from_oldest: ${INPUT_KAFKA_START_FROM_OLDEST:true}
target_version: ${INPUT_KAFKA_TARGET_VERSION:1.0.0}
Expand All @@ -105,7 +105,7 @@ input:
client_id: ${INPUT_KAFKA_BALANCED_CLIENT_ID:benthos_kafka_input}
commit_period_ms: ${INPUT_KAFKA_BALANCED_COMMIT_PERIOD_MS:1000}
consumer_group: ${INPUT_KAFKA_BALANCED_CONSUMER_GROUP:benthos_consumer_group}
max_batch_size: ${INPUT_KAFKA_BALANCED_MAX_BATCH_SIZE:1}
max_batch_count: ${INPUT_KAFKA_BALANCED_MAX_BATCH_COUNT:1}
start_from_oldest: ${INPUT_KAFKA_BALANCED_START_FROM_OLDEST:true}
target_version: ${INPUT_KAFKA_BALANCED_TARGET_VERSION:1.0.0}
tls:
Expand Down
6 changes: 3 additions & 3 deletions config/everything.yaml
Expand Up @@ -15,7 +15,7 @@ input:
consumer_tag: benthos-consumer
prefetch_count: 10
prefetch_size: 0
max_batch_size: 1
max_batch_count: 1
tls:
enabled: false
root_cas_file: ""
Expand Down Expand Up @@ -99,7 +99,7 @@ input:
partition: 0
start_from_oldest: true
target_version: 1.0.0
max_batch_size: 1
max_batch_count: 1
tls:
enabled: false
root_cas_file: ""
Expand All @@ -115,7 +115,7 @@ input:
- benthos_stream
start_from_oldest: true
target_version: 1.0.0
max_batch_size: 1
max_batch_count: 1
tls:
enabled: false
root_cas_file: ""
Expand Down
2 changes: 1 addition & 1 deletion config/kafka.json
Expand Up @@ -14,7 +14,7 @@
"client_id": "benthos_kafka_input",
"commit_period_ms": 1000,
"consumer_group": "benthos_consumer_group",
"max_batch_size": 1,
"max_batch_count": 1,
"partition": 0,
"start_from_oldest": true,
"target_version": "1.0.0",
Expand Down
2 changes: 1 addition & 1 deletion config/kafka.yaml
Expand Up @@ -12,7 +12,7 @@ input:
client_id: benthos_kafka_input
commit_period_ms: 1000
consumer_group: benthos_consumer_group
max_batch_size: 1
max_batch_count: 1
partition: 0
start_from_oldest: true
target_version: 1.0.0
Expand Down
2 changes: 1 addition & 1 deletion config/kafka_balanced.json
Expand Up @@ -14,7 +14,7 @@
"client_id": "benthos_kafka_input",
"commit_period_ms": 1000,
"consumer_group": "benthos_consumer_group",
"max_batch_size": 1,
"max_batch_count": 1,
"start_from_oldest": true,
"target_version": "1.0.0",
"tls": {
Expand Down
2 changes: 1 addition & 1 deletion config/kafka_balanced.yaml
Expand Up @@ -12,7 +12,7 @@ input:
client_id: benthos_kafka_input
commit_period_ms: 1000
consumer_group: benthos_consumer_group
max_batch_size: 1
max_batch_count: 1
start_from_oldest: true
target_version: 1.0.0
tls:
Expand Down
12 changes: 6 additions & 6 deletions docs/inputs/README.md
Expand Up @@ -70,7 +70,7 @@ type: amqp
amqp:
bindings_declare: []
consumer_tag: benthos-consumer
max_batch_size: 1
max_batch_count: 1
prefetch_count: 10
prefetch_size: 0
queue: benthos-queue
Expand All @@ -88,7 +88,7 @@ amqp:
Connects to an AMQP (0.91) queue. AMQP is a messaging protocol used by various
message brokers, including RabbitMQ.

The field `max_batch_size` specifies the maximum number of prefetched
The field `max_batch_count` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the `split` processor.

Expand Down Expand Up @@ -412,7 +412,7 @@ kafka:
client_id: benthos_kafka_input
commit_period_ms: 1000
consumer_group: benthos_consumer_group
max_batch_size: 1
max_batch_count: 1
partition: 0
start_from_oldest: true
target_version: 1.0.0
Expand All @@ -429,7 +429,7 @@ consumer group (set via config). Only one partition per input is supported, if
you wish to balance partitions across a consumer group look at the
`kafka_balanced` input type instead.

The field `max_batch_size` specifies the maximum number of prefetched
The field `max_batch_count` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the `split` processor.

Expand Down Expand Up @@ -482,7 +482,7 @@ kafka_balanced:
client_id: benthos_kafka_input
commit_period_ms: 1000
consumer_group: benthos_consumer_group
max_batch_size: 1
max_batch_count: 1
start_from_oldest: true
target_version: 1.0.0
tls:
Expand All @@ -498,7 +498,7 @@ Connects to a kafka (0.9+) server. Offsets are managed within kafka as per the
consumer group (set via config), and partitions are automatically balanced
across any members of the consumer group.

The field `max_batch_size` specifies the maximum number of prefetched
The field `max_batch_count` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the `split` processor.

Expand Down
2 changes: 1 addition & 1 deletion lib/input/amqp.go
Expand Up @@ -36,7 +36,7 @@ func init() {
Connects to an AMQP (0.91) queue. AMQP is a messaging protocol used by various
message brokers, including RabbitMQ.
The field ` + "`max_batch_size`" + ` specifies the maximum number of prefetched
The field ` + "`max_batch_count`" + ` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the ` + "`split`" + ` processor.
Expand Down
2 changes: 1 addition & 1 deletion lib/input/kafka.go
Expand Up @@ -39,7 +39,7 @@ consumer group (set via config). Only one partition per input is supported, if
you wish to balance partitions across a consumer group look at the
` + "`kafka_balanced`" + ` input type instead.
The field ` + "`max_batch_size`" + ` specifies the maximum number of prefetched
The field ` + "`max_batch_count`" + ` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the ` + "`split`" + ` processor.
Expand Down
2 changes: 1 addition & 1 deletion lib/input/kafka_balanced.go
Expand Up @@ -38,7 +38,7 @@ Connects to a kafka (0.9+) server. Offsets are managed within kafka as per the
consumer group (set via config), and partitions are automatically balanced
across any members of the consumer group.
The field ` + "`max_batch_size`" + ` specifies the maximum number of prefetched
The field ` + "`max_batch_count`" + ` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the ` + "`split`" + ` processor.
Expand Down
6 changes: 3 additions & 3 deletions lib/input/reader/amqp.go
Expand Up @@ -61,7 +61,7 @@ type AMQPConfig struct {
ConsumerTag string `json:"consumer_tag" yaml:"consumer_tag"`
PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"`
PrefetchSize int `json:"prefetch_size" yaml:"prefetch_size"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
MaxBatchCount int `json:"max_batch_count" yaml:"max_batch_count"`
TLS btls.Config `json:"tls" yaml:"tls"`
}

Expand All @@ -78,7 +78,7 @@ func NewAMQPConfig() AMQPConfig {
PrefetchCount: 10,
PrefetchSize: 0,
TLS: btls.NewConfig(),
MaxBatchSize: 1,
MaxBatchCount: 1,
BindingsDeclare: []AMQPBindingConfig{},
}
}
Expand Down Expand Up @@ -335,7 +335,7 @@ func (a *AMQP) Read() (types.Message, error) {
addPart(data)

batchLoop:
for i := 1; i < a.conf.MaxBatchSize; i++ {
for i := 1; i < a.conf.MaxBatchCount; i++ {
select {
case data, open = <-c:
if !open {
Expand Down
6 changes: 3 additions & 3 deletions lib/input/reader/amqp_test.go
Expand Up @@ -199,7 +199,7 @@ func testAMQPBatch(url string, t *testing.T) {
conf := NewAMQPConfig()
conf.URL = url
conf.QueueDeclare.Enabled = true
conf.MaxBatchSize = 10
conf.MaxBatchCount = 10
conf.BindingsDeclare = append(conf.BindingsDeclare, AMQPBindingConfig{
Exchange: exchange,
RoutingKey: key,
Expand Down Expand Up @@ -256,8 +256,8 @@ func testAMQPBatch(url string, t *testing.T) {
ContentType: "application/octet-stream",
ContentEncoding: "",
Body: []byte(testStr),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
DeliveryMode: 1, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
}); pErr != nil {
t.Error(pErr)
}
Expand Down
6 changes: 3 additions & 3 deletions lib/input/reader/kafka.go
Expand Up @@ -47,7 +47,7 @@ type KafkaConfig struct {
Partition int32 `json:"partition" yaml:"partition"`
StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"`
TargetVersion string `json:"target_version" yaml:"target_version"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
MaxBatchCount int `json:"max_batch_count" yaml:"max_batch_count"`
TLS btls.Config `json:"tls" yaml:"tls"`
}

Expand All @@ -62,7 +62,7 @@ func NewKafkaConfig() KafkaConfig {
Partition: 0,
StartFromOldest: true,
TargetVersion: sarama.V1_0_0_0.String(),
MaxBatchSize: 1,
MaxBatchCount: 1,
TLS: btls.NewConfig(),
}
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func (k *Kafka) Read() (types.Message, error) {
addPart(data)

batchLoop:
for i := 1; i < k.conf.MaxBatchSize; i++ {
for i := 1; i < k.conf.MaxBatchCount; i++ {
select {
case data, open = <-partConsumer.Messages():
if !open {
Expand Down
6 changes: 3 additions & 3 deletions lib/input/reader/kafka_balanced.go
Expand Up @@ -47,7 +47,7 @@ type KafkaBalancedConfig struct {
Topics []string `json:"topics" yaml:"topics"`
StartFromOldest bool `json:"start_from_oldest" yaml:"start_from_oldest"`
TargetVersion string `json:"target_version" yaml:"target_version"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
MaxBatchCount int `json:"max_batch_count" yaml:"max_batch_count"`
TLS btls.Config `json:"tls" yaml:"tls"`
}

Expand All @@ -61,7 +61,7 @@ func NewKafkaBalancedConfig() KafkaBalancedConfig {
Topics: []string{"benthos_stream"},
StartFromOldest: true,
TargetVersion: sarama.V1_0_0_0.String(),
MaxBatchSize: 1,
MaxBatchCount: 1,
TLS: btls.NewConfig(),
}
}
Expand Down Expand Up @@ -265,7 +265,7 @@ func (k *KafkaBalanced) Read() (types.Message, error) {
addPart(data)

batchLoop:
for i := 1; i < k.conf.MaxBatchSize; i++ {
for i := 1; i < k.conf.MaxBatchCount; i++ {
select {
case data, open = <-consumer.Messages():
if !open {
Expand Down

0 comments on commit 766f86d

Please sign in to comment.