Skip to content

Commit

Permalink
Update streaming s3 example
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jun 4, 2018
1 parent 99ead64 commit a57c9eb
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 50 deletions.
72 changes: 40 additions & 32 deletions docs/cookbook/streaming-aws-s3-archives.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ This example demonstrates how Benthos can be used to stream an S3 bucket of
`.tar.gz` archives containing JSON documents into any output target. This
example is able to listen for newly added archives and then downloads,
decompresses, unarchives and streams the JSON documents found within to a Kafka
topic.
topic. The Kafka output in this example can be replaced with any Benthos
[output target][outputs].

The method used to stream archives is via an [SQS queue][s3-tracking], which is
a common pattern. Benthos can work either with S3 events sent via SQS directly,
Expand All @@ -18,24 +19,20 @@ The full config for this [example can be found here][example].

``` yaml
input:
type: broker
broker:
copies: 8 # Increase this to gain more parallel consumers
inputs:
- type: amazon_s3
amazon_s3:
region: eu-west-1 # TODO
bucket: TODO
delete_objects: false
sqs_url: TODO
sqs_body_path: Records.s3.object.key
sqs_envelope_path: ""
sqs_max_messages: 10
credentials:
id: "TODO"
secret: "TODO"
token: "TODO"
role: "TODO"
type: amazon_s3
amazon_s3:
region: eu-west-1 # TODO
bucket: TODO
delete_objects: false
sqs_url: TODO
sqs_body_path: Records.s3.object.key
sqs_envelope_path: ""
sqs_max_messages: 10
credentials:
id: "TODO"
secret: "TODO"
token: "TODO"
role: "TODO"
```
This input section contains lots of fields to be completed which are self
Expand All @@ -50,11 +47,23 @@ you will need to fill in the `sqs_envelope_path`, which is the JSON path inside
an SNS message that contains the enveloped S3 event. The value of
`sqs_envelope_path` should be `Message` when using the standard AWS set up.

Another unique aspect of the example input is that we have used a broker instead
of a single input. This is because you can have any number of consumers of the
bucket and messages will automatically be distributed amongst them. Later on in
the pipeline we do some heavy CPU processing so it will be useful to tune the
number of parallel consumers.
This example uses a single consumer, but if the throughput isn't high enough to
keep up with the bucket it is possible to use a `broker` type to have multiple
parallel consumers:

``` yaml
input:
type: broker
broker:
copies: 8 # Increase this to gain more parallel consumers
inputs:
- type: amazon_s3
amazon_s3:
... etc
```

You can have any number of consumers of a bucket and messages (archives) will
automatically be distributed amongst them via the SQS queue.

## Pipeline

Expand All @@ -79,21 +88,20 @@ of the payload. This results in a single payload of multiple documents. The
split processor turns this payload into individual messages.

The final processor is optional. It is a combine stage that bundles the
individual messages back into smaller batches to be sent to the Kafka topic,
increasing the throughput. If the combine processor is used it should be a
factor of the number of messages inside the S3 archives. The size also needs to
be low enough so that the overall size of the batch doesn't exceed the maximum
bytes of a Kafka request.
individual messages back into batches to be sent to the Kafka topic, increasing
the throughput. If the combine processor is used it should be a factor of the
number of messages inside the S3 archives. The size also needs to be low enough
so that the overall size of the batch doesn't exceed the maximum bytes of a
Kafka request.

These processors are heavy on CPU, which is why they are configured inside the
pipeline section. This allows you to explicitly set the number of parallel
threads to exactly match the number of logical CPU cores available. Make sure
the number of parallel consumers is greater than this number so that the
pipelines are fully utilised.
threads to exactly match the number of logical CPU cores available.

## Output

The output config is a standard Kafka output.

[s3-tracking]: https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html
[example]: ./streaming-aws-s3-archives.yaml
[outputs]: ../outputs/README.md
32 changes: 14 additions & 18 deletions docs/cookbook/streaming-aws-s3-archives.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
input:
type: broker
broker:
copies: 8 # Increase this to gain more parallel consumers
inputs:
- type: amazon_s3
amazon_s3:
region: eu-west-1 # TODO
bucket: "TODO"
delete_objects: false
sqs_url: "TODO"
sqs_body_path: Records.s3.object.key
sqs_envelope_path: ""
sqs_max_messages: 10
credentials:
id: "TODO"
secret: "TODO"
token: "TODO"
role: "TODO"
type: amazon_s3
amazon_s3:
region: eu-west-1 # TODO
bucket: "TODO"
delete_objects: false
sqs_url: "TODO"
sqs_body_path: Records.s3.object.key
sqs_envelope_path: ""
sqs_max_messages: 10
credentials:
id: "TODO"
secret: "TODO"
token: "TODO"
role: "TODO"

pipeline:
threads: 4 # Try to match the number of available logical CPU cores
Expand Down

0 comments on commit a57c9eb

Please sign in to comment.