Stream an NSQ channel to Google Cloud Storage
Go Makefile
Pull request Compare This branch is 11 commits ahead of chrusty:master.
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
deb/DEBIAN
.gitignore
.travis.yml
Godeps
LICENSE
Makefile
README.md
handler_abandoned_channel.go
handler_in_memory.go
handler_on_disk.go
main.go
process_arguments.go
store_messages.go

README.md

nsq-to-gs

Build Status

nsq-to-gs

Stream an NSQ channel to Google Cloud Storage

Based on nsq-to-s3 by chrusty

Written (more like adjusted) by Eran Sandler (@erans) http://eran.sandler.co.il

Parameters

  • topic: The NSQ topic to subscribe to
  • channel: An NSQ channel name to use (defaults to an automatically-generated ephemeral channel)
  • max-in-flight: The maximum number of unFinished messages to allow (effectively a flush-batch size)
  • max-in-flight-time: The maximum number of seconds to wait before flushing (in case maxInFlight is not enough)
  • lookupd-http-address: The address of an NSQLookup daemon to connect to
  • nsqd-tcp-address: A specific NSQ daemon to connect to
  • bucket-seconds: The time-bucket-size of each file you want to end up with on GS, if we don't hit bucketMessages first (eg 3600 will give you one file on GS per-hour)
  • bucket-messages: Total number of messages to bucket (if bucketSeconds doesn't elapse first)
  • gsbucket: The GS bucket to store the files on (eg "nsq-archive")
  • gspath: A path to store the archive files under (eg "/live-dumps")
  • gsfileprefix: The generate file name prefix (eg "mylogfile" which would be mylogfile-20151117_1003.json.gz)
  • batchmode: Which mode to run in [memory, disk, channel]
  • bufferfile: The name of a file to use as a local on-disk buffer between flushes to GS (should be something durable)
  • extension: Extension for files on GS (default is json)

Modes (current)

NSQ-to-GS can operate in several different modes, depending on your storage and/or durability requirements:

"Batch-on-disk"

  • Subs to NSQ
  • De-dupes in memory (map[string][bool] where string is a hash of the message payload)
  • Once max-in-flight is reached it flushes messages to disk then Finish()es them
  • After timeBucket has elapsed it stops consuming, sticks the file on GS, clears the de-dupe map and continues
  • You would be well-advised to use some kind of persistent storage

"In-memory"

As with batch-on-disk but all messages are kept in-memory between flushes to GS. If you stop the process then you will lose messages!

Modes (planned)

"Abandoned-channel"

  • Subs to NSQ (creates a channel)
  • Waits for timeBucket to elapse
  • Pauses the channel
  • Takes all the messages off the queue, de-dupes in memory, sticks them on GS
  • Finish()es the messages
  • Unpauses the channel
  • Repeat

"Continuous-sync-to-gs"

  • As with batch-on-disk but syncs to GS every x seconds
  • Either overwrites the same file on GS, or piles up new ones
  • At the end of the time-bucket the interim files are removed from GS

Examples

Consuming a topic, buffering on disk, flushing in-flight at 1000 messages, flushing to GS every 5 minutes:

nsq-to-gs -gsproject=myproject -gsbucket=nsq-archive -topic=firehose -channel='nsq-to-gs#ephemeral' -lookupd-http-address=10.0.0.2:4161 -gspath=/live-dumps/firehose -bucket-seconds=300 -max-in-flight=1000 -batchmode=disk

Bugs (current)

  • Dupes can still occur around flush boundaries
  • The timer for flushing to GS is based on events arriving (not on absolute time). This means that he filenames/numbers will creep (just being pedantic)
  • Should optionally compress files for GS