Skip to content

Commit

Permalink
add max_batch_size fields to some inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Dec 12, 2018
1 parent e77fcad commit 8c2b71a
Show file tree
Hide file tree
Showing 20 changed files with 403 additions and 160 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.

## Unreleased

## 0.40.2 - 2018-12-12

### Added

- New `max_batch_size` field added to `kafka`, `kafka_balanced` and `amqp`
inputs. This provides a mechanism for creating message batches optimistically.

## 0.40.0 - 2018-12-10

### Added
Expand Down
1 change: 1 addition & 0 deletions config/amqp.json
Expand Up @@ -10,6 +10,7 @@
"amqp": {
"bindings_declare": [],
"consumer_tag": "benthos-consumer",
"max_batch_size": 1,
"prefetch_count": 10,
"prefetch_size": 0,
"queue": "benthos-queue",
Expand Down
1 change: 1 addition & 0 deletions config/amqp.yaml
Expand Up @@ -9,6 +9,7 @@ input:
amqp:
bindings_declare: []
consumer_tag: benthos-consumer
max_batch_size: 1
prefetch_count: 10
prefetch_size: 0
queue: benthos-queue
Expand Down
3 changes: 3 additions & 0 deletions config/env/README.md
Expand Up @@ -54,6 +54,7 @@ HTTP_ROOT_PATH = /benthos
INPUTS = 1
INPUT_TYPE = dynamic
INPUT_AMQP_CONSUMER_TAG = benthos-consumer
INPUT_AMQP_MAX_BATCH_SIZE = 1
INPUT_AMQP_PREFETCH_COUNT = 10
INPUT_AMQP_PREFETCH_SIZE = 0
INPUT_AMQP_QUEUE = benthos-queue
Expand Down Expand Up @@ -116,6 +117,7 @@ INPUT_KAFKA_BALANCED_ADDRESSES = localhost:9092
INPUT_KAFKA_BALANCED_CLIENT_ID = benthos_kafka_input
INPUT_KAFKA_BALANCED_COMMIT_PERIOD_MS = 1000
INPUT_KAFKA_BALANCED_CONSUMER_GROUP = benthos_consumer_group
INPUT_KAFKA_BALANCED_MAX_BATCH_SIZE = 1
INPUT_KAFKA_BALANCED_START_FROM_OLDEST = true
INPUT_KAFKA_BALANCED_TARGET_VERSION = 1.0.0
INPUT_KAFKA_BALANCED_TLS_ENABLED = false
Expand All @@ -125,6 +127,7 @@ INPUT_KAFKA_BALANCED_TOPICS = benthos_stream
INPUT_KAFKA_CLIENT_ID = benthos_kafka_input
INPUT_KAFKA_COMMIT_PERIOD_MS = 1000
INPUT_KAFKA_CONSUMER_GROUP = benthos_consumer_group
INPUT_KAFKA_MAX_BATCH_SIZE = 1
INPUT_KAFKA_PARTITION = 0
INPUT_KAFKA_START_FROM_OLDEST = true
INPUT_KAFKA_TARGET_VERSION = 1.0.0
Expand Down
3 changes: 3 additions & 0 deletions config/env/default.yaml
Expand Up @@ -10,6 +10,7 @@ input:
inputs:
- amqp:
consumer_tag: ${INPUT_AMQP_CONSUMER_TAG:benthos-consumer}
max_batch_size: ${INPUT_AMQP_MAX_BATCH_SIZE:1}
prefetch_count: ${INPUT_AMQP_PREFETCH_COUNT:10}
prefetch_size: ${INPUT_AMQP_PREFETCH_SIZE:0}
queue: ${INPUT_AMQP_QUEUE:benthos-queue}
Expand Down Expand Up @@ -89,6 +90,7 @@ input:
client_id: ${INPUT_KAFKA_CLIENT_ID:benthos_kafka_input}
commit_period_ms: ${INPUT_KAFKA_COMMIT_PERIOD_MS:1000}
consumer_group: ${INPUT_KAFKA_CONSUMER_GROUP:benthos_consumer_group}
max_batch_size: ${INPUT_KAFKA_MAX_BATCH_SIZE:1}
partition: ${INPUT_KAFKA_PARTITION:0}
start_from_oldest: ${INPUT_KAFKA_START_FROM_OLDEST:true}
target_version: ${INPUT_KAFKA_TARGET_VERSION:1.0.0}
Expand All @@ -103,6 +105,7 @@ input:
client_id: ${INPUT_KAFKA_BALANCED_CLIENT_ID:benthos_kafka_input}
commit_period_ms: ${INPUT_KAFKA_BALANCED_COMMIT_PERIOD_MS:1000}
consumer_group: ${INPUT_KAFKA_BALANCED_CONSUMER_GROUP:benthos_consumer_group}
max_batch_size: ${INPUT_KAFKA_BALANCED_MAX_BATCH_SIZE:1}
start_from_oldest: ${INPUT_KAFKA_BALANCED_START_FROM_OLDEST:true}
target_version: ${INPUT_KAFKA_BALANCED_TARGET_VERSION:1.0.0}
tls:
Expand Down
3 changes: 3 additions & 0 deletions config/everything.yaml
Expand Up @@ -15,6 +15,7 @@ input:
consumer_tag: benthos-consumer
prefetch_count: 10
prefetch_size: 0
max_batch_size: 1
tls:
enabled: false
root_cas_file: ""
Expand Down Expand Up @@ -98,6 +99,7 @@ input:
partition: 0
start_from_oldest: true
target_version: 1.0.0
max_batch_size: 1
tls:
enabled: false
root_cas_file: ""
Expand All @@ -113,6 +115,7 @@ input:
- benthos_stream
start_from_oldest: true
target_version: 1.0.0
max_batch_size: 1
tls:
enabled: false
root_cas_file: ""
Expand Down
1 change: 1 addition & 0 deletions config/kafka.json
Expand Up @@ -14,6 +14,7 @@
"client_id": "benthos_kafka_input",
"commit_period_ms": 1000,
"consumer_group": "benthos_consumer_group",
"max_batch_size": 1,
"partition": 0,
"start_from_oldest": true,
"target_version": "1.0.0",
Expand Down
1 change: 1 addition & 0 deletions config/kafka.yaml
Expand Up @@ -12,6 +12,7 @@ input:
client_id: benthos_kafka_input
commit_period_ms: 1000
consumer_group: benthos_consumer_group
max_batch_size: 1
partition: 0
start_from_oldest: true
target_version: 1.0.0
Expand Down
1 change: 1 addition & 0 deletions config/kafka_balanced.json
Expand Up @@ -14,6 +14,7 @@
"client_id": "benthos_kafka_input",
"commit_period_ms": 1000,
"consumer_group": "benthos_consumer_group",
"max_batch_size": 1,
"start_from_oldest": true,
"target_version": "1.0.0",
"tls": {
Expand Down
1 change: 1 addition & 0 deletions config/kafka_balanced.yaml
Expand Up @@ -12,6 +12,7 @@ input:
client_id: benthos_kafka_input
commit_period_ms: 1000
consumer_group: benthos_consumer_group
max_batch_size: 1
start_from_oldest: true
target_version: 1.0.0
tls:
Expand Down
15 changes: 15 additions & 0 deletions docs/inputs/README.md
Expand Up @@ -70,6 +70,7 @@ type: amqp
amqp:
bindings_declare: []
consumer_tag: benthos-consumer
max_batch_size: 1
prefetch_count: 10
prefetch_size: 0
queue: benthos-queue
Expand All @@ -87,6 +88,10 @@ amqp:
Connects to an AMQP (0.91) queue. AMQP is a messaging protocol used by various
message brokers, including RabbitMQ.

The field `max_batch_size` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the `split` processor.

It's possible for this input type to declare the target queue by setting
`queue_declare.enabled` to `true`, if the queue already exists then
the declaration passively verifies that they match the target fields.
Expand Down Expand Up @@ -407,6 +412,7 @@ kafka:
client_id: benthos_kafka_input
commit_period_ms: 1000
consumer_group: benthos_consumer_group
max_batch_size: 1
partition: 0
start_from_oldest: true
target_version: 1.0.0
Expand All @@ -423,6 +429,10 @@ consumer group (set via config). Only one partition per input is supported, if
you wish to balance partitions across a consumer group look at the
`kafka_balanced` input type instead.

The field `max_batch_size` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the `split` processor.

The target version by default will be the oldest supported, as it is expected
that the server will be backwards compatible. In order to support newer client
features you should increase this version up to the known version of the target
Expand Down Expand Up @@ -472,6 +482,7 @@ kafka_balanced:
client_id: benthos_kafka_input
commit_period_ms: 1000
consumer_group: benthos_consumer_group
max_batch_size: 1
start_from_oldest: true
target_version: 1.0.0
tls:
Expand All @@ -487,6 +498,10 @@ Connects to a kafka (0.9+) server. Offsets are managed within kafka as per the
consumer group (set via config), and partitions are automatically balanced
across any members of the consumer group.

The field `max_batch_size` specifies the maximum number of prefetched
messages to be batched together. When more than one message is batched they can
be split into individual messages with the `split` processor.

### TLS

Custom TLS settings can be used to override system defaults. This includes
Expand Down
68 changes: 33 additions & 35 deletions go.mod
@@ -1,62 +1,63 @@
module github.com/Jeffail/benthos

require (
cloud.google.com/go v0.30.0
cloud.google.com/go v0.34.0
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/DataDog/zstd v1.3.4 // indirect
github.com/Jeffail/gabs v1.1.1
github.com/Microsoft/go-winio v0.4.11 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/OneOfOne/xxhash v1.2.2
github.com/Shopify/sarama v1.19.0
github.com/Shopify/sarama v1.20.0
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/armon/go-radix v1.0.0
github.com/aws/aws-sdk-go v1.15.59
github.com/aws/aws-sdk-go v1.16.3
github.com/boltdb/bolt v1.3.1 // indirect
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/cenkalti/backoff v2.0.0+incompatible
github.com/cenkalti/backoff v2.1.0+incompatible
github.com/cespare/xxhash v1.1.0 // indirect
github.com/colinmarc/hdfs v1.1.3
github.com/containerd/continuity v0.0.0-20181003075958-be9bd761db19 // indirect
github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.3.3 // indirect
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.1.1
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712
github.com/fortytw2/leaktest v1.2.0 // indirect
github.com/go-redis/redis v6.14.1+incompatible
github.com/go-sql-driver/mysql v1.4.0 // indirect
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/go-redis/redis v6.14.2+incompatible
github.com/go-sql-driver/mysql v1.4.1 // indirect
github.com/gofrs/uuid v3.1.0+incompatible
github.com/gogo/protobuf v1.1.1 // indirect
github.com/gogo/protobuf v1.2.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/go-cmp v0.2.0 // indirect
github.com/googleapis/gax-go v2.0.0+incompatible // indirect
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e // indirect
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2
github.com/gorilla/websocket v1.4.0
github.com/gotestyourself/gotestyourself v2.1.0+incompatible // indirect
github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect
github.com/hashicorp/raft v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/jtolds/gls v4.2.1+incompatible // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/lib/pq v1.0.0 // indirect
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 // indirect
github.com/microcosm-cc/bluemonday v1.0.1
github.com/nats-io/gnatsd v1.3.0 // indirect
github.com/nats-io/go-nats v1.6.0
github.com/nats-io/go-nats v1.7.0
github.com/nats-io/go-nats-streaming v0.4.0
github.com/nats-io/nats-streaming-server v0.11.2 // indirect
github.com/nats-io/nkeys v0.0.2 // indirect
github.com/nats-io/nuid v1.0.0 // indirect
github.com/nsqio/go-nsq v1.0.7
github.com/olivere/elastic v6.2.11+incompatible
github.com/onsi/gomega v1.4.2 // indirect
github.com/olivere/elastic v6.2.14+incompatible
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opencontainers/runc v0.1.1 // indirect
Expand All @@ -65,31 +66,28 @@ require (
github.com/pebbe/zmq4 v1.0.0
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pkg/errors v0.8.0 // indirect
github.com/prometheus/client_golang v0.9.0
github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39 // indirect
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d // indirect
github.com/prometheus/client_golang v0.9.2
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/sirupsen/logrus v1.1.1 // indirect
github.com/sirupsen/logrus v1.2.0 // indirect
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d // indirect
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a // indirect
github.com/spf13/cast v1.2.0
github.com/streadway/amqp v0.0.0-20180806233856-70e15c650864
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c // indirect
github.com/spf13/cast v1.3.0
github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9
github.com/trivago/grok v1.0.0
github.com/trivago/tgo v1.0.5 // indirect
go.opencensus.io v0.17.0 // indirect
golang.org/x/crypto v0.0.0-20181015023909-0c41d7ab0a0e // indirect
golang.org/x/net v0.0.0-20181017193950-04a2e542c03f // indirect
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4 // indirect
golang.org/x/sys v0.0.0-20181021155630-eda9bb28ed51 // indirect
google.golang.org/api v0.0.0-20181021000519-a2651947f503 // indirect
google.golang.org/appengine v1.2.0 // indirect
google.golang.org/genproto v0.0.0-20181016170114-94acd270e44e // indirect
google.golang.org/grpc v1.15.0 // indirect
go.opencensus.io v0.18.0 // indirect
golang.org/x/net v0.0.0-20181207154023-610586996380 // indirect
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890 // indirect
golang.org/x/sys v0.0.0-20181212120007-b05ddf57801d // indirect
google.golang.org/api v0.0.0-20181212003324-40e757e92c52 // indirect
google.golang.org/appengine v1.3.0 // indirect
google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 // indirect
google.golang.org/grpc v1.17.0 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/vmihailenco/msgpack.v2 v2.9.1 // indirect
gopkg.in/yaml.v2 v2.2.1
gotest.tools v2.1.0+incompatible // indirect
gopkg.in/yaml.v2 v2.2.2
gotest.tools v2.2.0+incompatible // indirect
nanomsg.org/go-mangos v1.4.0
)

0 comments on commit 8c2b71a

Please sign in to comment.