A multiplexer and simple TCP sink for Cloudera's Flume
Switch branches/tags
Nothing to show
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.



This is a collection of plugins for Cloudera's Flume that are used internally at Cloudspace. They make it slightly easier to include Flume in a custom architecture that might not include Hadoop or HDFS for storage. Included are:

  • TCPSink
  • JsonMultiplexDecorator


This package is designed to be a git submodule of the Flume source.

$ cd /usr/local/src
$ git clone git://github.com/cloudera/flume.git
$ cd flume
$ ant
$ git submodule add git://github.com/cloudspace/flume_multiplex_plugin.git plugins/multiplex
$ cd plugins/multiplex
$ ant install

After building and installing with ant install, the jar file will be copied to /usr/local/flume/plugins/ so that will need to be added to your FLUME_CLASSPATH shell variable:

$ export FLUME_CLASSPATH="$FLUME_CLASSPATH:/usr/local/flume/plugins/cloudspace_multiplex_plugin.jar"


The plugin is fairly simple to use. It exposes two new elements to Flume's dataflow specification language: the tcpSink() sink and the multiplexDecorator() decorator.


The tcpSink() sink takes two arguments:

  • serverAddress which is a quoted String specifying the DNS name or IP address of the TCP server you will be sending to
  • serverPort which is an Integer specifying the port the TCP server is listening on

This sink simply opens a TCP Socket connection and writes all incoming events to it.


The jsonMultiplexDecorator() decorator takes two arguments:

  • serverName which is a quoted String specifying the name of the server these events belong to
  • logType which is a quoted String specifying what kind of log data (eg. apache_access, syslog, etc) these events belong to

This decorator transforms incoming events into JSON objects, like so:

{ "server": "example.com_production", "log_type": "apache_access", "body": "apache_access_event" }

###Dataflow Specification Examples###

Simple example with a single agent that reads from a log and sends it to a TCP listener:

agent: tail("/var/log/syslog") | tcpSink("", 12345)

Slightly more complex, with the decorator:

agent: tail("/var/log/syslog") | { jsonMultiplexDecorator("localhost", "syslog") => tcpSink("", 12345) }

Using several nodes, 2 agents and a collector:

agent1: tail("/var/log/syslog") | { jsonMultiplexDecorator("example.com", "syslog") => agentSink("collector", 35872) }
agent2: tail("/var/log/syslog") | { jsonMultiplexDecorator("test.com", "syslog") => agentSink("collector", 35872) }
collector: collectorSource(35872) | tcpSink("", 12345)



Copyright (c) Josh Lindsey at Cloudspace. See LICENSE for details.