Flume sink plugin for Elasticsearch
Switch branches/tags
Nothing to show
Clone or download

README.md

Elasticsearch Sink

The sink reads events from a channel, serializes them into json documents and batches them into a bulk processor. Bulk processor batches the writes to elasticsearch as per configuration.

The elasticsearch index and type for each event can be defined statically in the configuration file or can be derived dynamically using a custom IndexBuilder.

By default, events are assumed to be in json format. This assumption can be overridden by implementing the Serializer interface.

Follow these steps to use this sink in Apache flume:

  • Build the plugin. This command will create the zip file inside the target directory.

mvn clean assembly:assembly

  • Extract the file into the flume installation directories plugin.d folder.

  • Configure the sink in the flume configuration file with properties as below

Required properties are in bold.

Property Name Default Description
channel -
type - The component type name, has to be com.cognitree.flume.sink.elasticsearch.ElasticSearchSink
es.cluster.name elasticsearch Name of the elasticsearch cluster to connect to
es.client.hosts - Comma separated hostname:port pairs ex: host1:9300,host2:9300. The default port is 9300
es.bulkActions 1000 The number of actions to batch into a request
es.bulkProcessor.name flume Name of the bulk processor
es.bulkSize 5 Flush the bulk request every mentioned size
es.bulkSize.unit MB Bulk request unit, supported values are KB and MB
es.concurrent.request 1 The maximum number of concurrent requests to allow while accumulating new bulk requests
es.flush.interval.time 10s Flush a batch as a bulk request every mentioned seconds irrespective of the number of requests
es.backoff.policy.time.interval 50M Backoff policy time interval, wait initially for the 50 miliseconds
es.backoff.policy.retries 8 Number of backoff policy retries
es.client.transport.sniff false Enable or disable the sniff feature of the elastic search
es.client.transport.ignore_cluster_name false Ignore cluster name validation of connected nodes
es.client.transport.ping_timeout 5s The time to wait for a ping response from a node
es.client.transport.nodes_sampler_interval 5s How often to sample / ping the nodes listed and connected
es.index default Index name to be used to store the documents
es.type default Type to be used to store the documents
es.index.builder com.cognitree.
flume.sink.
elasticsearch.
StaticIndexBuilder
Implementation of com.cognitree.flume.sink.elasticsearch.IndexBuilder interface
es.serializer com.cognitree.
flume.sink.
elasticsearch.
SimpleSerializer
Implementation of com.cognitree.flume.sink.elasticsearch.Serializer interface
es.serializer.csv.fields - Comma separated csv field name with data type i.e. column1:type1,column2:type2, Supported data types are string, boolean, int and float
es.serializer.csv.delimiter ,(comma) Delimiter for the data in flume event body
es.serializer.avro.schema.file - Absolute path for the schema configuration file

Example of agent named agent

  agent.channels = es_channel
  agent.sinks = es_sink
  agent.sinks.es_sink.type=com.cognitree.flume.sink.elasticsearch.ElasticSearchSink
  agent.sinks.es_sink.es.bulkActions=5
  agent.sinks.es_sink.es.bulkProcessor.name=bulkprocessor
  agent.sinks.es_sink.es.bulkSize=5
  agent.sinks.es_sink.es.bulkSize.unit=MB
  agent.sinks.es_sink.es.concurrent.request=1
  agent.sinks.es_sink.es.flush.interval.time=5m
  agent.sinks.es_sink.es.backoff.policy.time.interval=50M
  agent.sinks.es_sink.es.backoff.policy.retries=8
  agent.sinks.es_sink.es.cluster.name=es-cluster
  agent.sinks.es_sink.es.client.transport.sniff=false
  agent.sinks.es_sink.es.client.transport.ignore_cluster_name=false
  agent.sinks.es_sink.es.client.transport.ping_timeout=5s
  agent.sinks.es_sink.es.client.transport.nodes_sampler_interval=5s
  agent.sinks.es_sink.es.client.hosts=127.0.0.1:9300
  agent.sinks.es_sink.es.index.name=defaultindex
  agent.sinks.es_sink.es.index.type=defaulttype
  agent.sinks.es_sink.es.index.builder=com.cognitree.flume.sink.elasticsearch.HeaderBasedIndexBuilder
  agent.sinks.es_sink.es.serializer=com.cognitree.flume.sink.elasticsearch.SimpleSerializer
  agent.sinks.es_sink.es.serializer.csv.fields=id:int,name:string,isemployee:boolean,leaves:float
  agent.sinks.es_sink.es.serializer.csv.delimiter=,
  agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc