This app continuously reads data from a named pipe (FIFO) and publishes it to a Kinesis stream.
FIFOs are a great way to send data from one application to another. Having an open pipe that ships data to Kinesis facilitates a lot of interesting use cases. One such example is using the named pipe support in rsyslog and syslog-ng to send log streams to Kinesis.
Admittedly, it would be really easy to write a handful of lines of code in a bash script using the AWS CLI to achieve the same result, however the fifo2kinesis app is designed to reliably handle large volumes of data. It achieves this by making good use of Go's concurrency primitives, buffering and batch publishing data read from the fifo, and handling failures in a way that can tolerate network and AWS outages.
Either download the latest binary for your platform, or run the following command in the project's root to build the aws-proxy binary from source:
GOPATH=$PWD go build -o ./bin/fifo2kinesis fifo2kinesis
Create a named pipe:
mkfifo ./kinesis.pipe
Run the app:
./bin/fifo2kinesis --fifo-name=$(pwd)/kinesis.pipe --stream-name=my-stream
Write to the FIFO:
echo "Streamed at $(date)" > kinesis.pipe
The line will be published to the my-stream
Kinesis stream within the
default flush interval of 5 seconds.
If you are impatient like me and want your oompa loompa now, modify the
--buffer-queue-limit
, --flush-interval
, and --flush-handler
options so
that what you send to the FIFO is written to STDOUT immediately instead of a
buffered write to Kinesis. This doesn't do much, but it provides immediate
gratification and shows how the app works when you play with the options.
./bin/fifo2kinesis --fifo-name=$(pwd)/kinesis.pipe --buffer-queue-limit=1 --flush-interval=0 --flush-handler=logger
Configuration is read from command line options and environment variables in that order of precedence. The following options and env variables are available:
--fifo-name
,FIFO2KINESIS_FIFO_NAME
: The absolute path of the named pipe.--stream-name
,FIFO2KINESIS_STREAM_NAME
: The name of the Kinesis stream.--partition-key
,FIFO2KINESIS_PARTITION_KEY
: The partition key, a random string if omitted.--buffer-queue-limit
,FIFO2KINESIS_BUFFER_QUEUE_LIMIT
: The number of items that trigger a buffer flush.--failed-attempts-dir
,FIFO2KINESIS_FAILED_ATTEMPTS_DIR
: The directory that logs failed attempts for retry.--flush-interval
,FIFO2KINESIS_FLUSH_INTERVAL
: The number of seconds before the buffer is flushed.--flush-handler
,FIFO2KINESIS_FLUSH_HANDLER
: Defaults to "kinesis", use "logger" for debugging.--region
,FIFO2KINESIS_REGION
: The AWS region that the Kinesis stream is provisioned in.--role-arn
,FIFO2KINESIS_ROLE_ARN
: The ARN of the AWS role being assumed.--role-session-name
,FIFO2KINESIS_ROLE_SESSION_NAME
: The session name used when assuming a role.--debug
,FIFO2KINESIS_DEBUG
: Show debug level log messages.
The application also requires credentials to publish to the specified Kinesis stream. It uses the same configuration mechanism as the AWS CLI tool, minus the command line options.
Use Upstart to start fifo2kinesis during boot
and supervise it while the system is running. Add a file to /etc/init
with
the following contents, replacing /path/to
and my-stream
according to
your environment.
description "FIFO to Kinesis Pipeline"
start on runlevel [2345]
respawn
respawn limit 3 30
post-stop exec sleep 5
exec /path/to/fifo2kinesis --fifo-name=/path/to/named.pipe --stream-name=my-stream --region=us-east-1
NOTE: You might also want to check out fluentd and the Amazon Kinesis Agent. You won't find an argument in this README as to why you should choose one over the other, I just want to make sure you have all the options in front of you so that you can make the best decision for your specific use case.
Syslog NG provides the capability to use a named pipe as a destination. Use fifo2kinesis to read log messages from the FIFO and publish them Kenisis.
Make a FIFO:
mkfifo /var/syslog.pipe
Modify syslog-ng configuration to send the logs to the named pipe. For example,
on Ubuntu 14.04 create a file named /etc/syslog-ng/conf.d/01-kinesis.conf
with
the following configration:
destination d_pipe { pipe("/var/syslog.pipe"); };
log { source(s_src); destination(d_pipe); };
Start the app:
./bin/fifo2kinesis --fifo-name=/var/syslog.pipe --stream-name=my-stream
Restart syslog-ng:
service syslog-ng restart
The log stream will now be published to Kinesis.
AWS Proxy uses Glide to manage dependencies.
Run the following commands to run tests and generate a coverage report:
GOPATH=$PWD go test -coverprofile=build/coverage.out fifo2kinesis
GOPATH=$PWD go tool cover -html=build/coverage.out