Flume NG Apache Cassandra Sink
Switch branches/tags
Nothing to show
Latest commit c756d11 Mar 4, 2013 tburruss modified to propogate exceptions from CassandraWriteWork instances to…
… CassandraJob. the exception will be thrown when client is waiting for work to complete
Failed to load latest commit information.
src modified to propogate exceptions from CassandraWriteWork instances to… Mar 4, 2013
.gitignore yea huh Sep 8, 2012



A Flume sink using Apache Cassandra

The Cassandra Sink will persist flume events to a Cassandra Cluster. The configuration is located in the flume config (see sample below.) Available config parameters:

  • hosts - comma separated list of Cassandra hosts the sink should connect to
  • port - [9160] Cassandra RPC port for client connections
  • cluster-name - [Logging] name of Cassandra Cluster (if changed, may have trouble with hector client stats)
  • keyspace-name - [logs] name of keyspace to use
  • records-colfam - [records] name of column family for storing log data
  • socket-timeout-millis - [5000] Hector client socket timeout
  • max-conns-per-host - [2] Hector client number of connections per Cassandra host
  • max-exhausted-wait-millis - [5000]

The Sink expects several flume event headers to be present:

  • key - used (combined with src) to create the Cassandra row key. It should be generated by the application doing the logging
  • timestamp - timestamp of when the log occurred, not necessarily when the flume event is created
  • src - A logical source of the flume event. Could be host, but probably you will have many hosts for a source. A more likely candidate for source is the name of the application
  • host - the name of the host where the message was generated

The records column family is keyed by 'src' + 'key' and will contain all the log data. It has these columns:

  • ts - timestamp from flume event header
  • src - source from flume event header
  • host - host name from flume event header
  • data - body of flume event

Cassandra Schema

The following is an example schema, but of course the problem is not how to get data into Cassandra. It is how to get data out! Unless you will only be retrieving logs by Cassandra row key, you will probably want to add some secondary indices or use DataStax Enterprise Search with its SOLR capabilities.

create keyspace logs with
   strategy_options = {datacenter1:1}

use logs;

create column family records with
   comparator = UTF8Type
   and gc_grace = 86400

Sample Flume config

agent.sources = avrosource
agent.channels = channel1
agent.sinks = cassandraSink

agent.sources.avrosource.type = avro
agent.sources.avrosource.channels = channel1
agent.sources.avrosource.bind =
agent.sources.avrosource.port = 4141

agent.sources.avrosource.interceptors = addHost addTimestamp
agent.sources.avrosource.interceptors.addHost.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.avrosource.interceptors.addHost.preserveExisting = false
agent.sources.avrosource.interceptors.addHost.useIP = false
agent.sources.avrosource.interceptors.addHost.hostHeader = host

agent.sources.avrosource.interceptors.addTimestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

# Cassandra flow
agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000
###agent.channels.channel1.type = FILE
###agent.channels.channel1.checkpointDir = file-channel1/check
###agent.channels.channel1.dataDirs = file-channel1/data

agent.sinks.cassandraSink.channel = channel1

agent.sinks.cassandraSink.type = com.btoddb.flume.sinks.cassandra.CassandraSink
agent.sinks.cassandraSink.hosts = localhost

Building Cassandra Sink

The sink is built using Maven

mvn clean package -P assemble-artifacts

... runs all junits and produces flume-ng-cassandra-sink-1.0.0-SNAPSHOT.jar and flume-ng-cassandra-sink-1.0.0-SNAPSHOT-dist.tar.gz

The tar contains all the dependencies needed, and then some. See the list below regarding what is actually needed to use the sink in the flume environment.

Required Dependencies

  • hector-core*
  • guava*
  • speed4j*
  • uuid*
  • libthrift*
  • cassandra-thrift*