Skip to content

Commit

Permalink
FLUME-2821: Flume-Kafka Source with new Consumer
Browse files Browse the repository at this point in the history
(Grigoriy Rozhkov via Jarek Jarcec Cecho)
  • Loading branch information
Jarek Jarcec Cecho committed Mar 29, 2016
1 parent 5293eba commit f8abaf7
Show file tree
Hide file tree
Showing 10 changed files with 649 additions and 451 deletions.
58 changes: 39 additions & 19 deletions flume-ng-doc/sphinx/FlumeUserGuide.rst
Expand Up @@ -1188,21 +1188,23 @@ Example for agent named a1:
Kafka Source
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.
Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.
If you have multiple Kafka sources running, you can configure them with the same Consumer Group
so each will read a unique set of partitions for the topic.
so each will read a unique set of partitions for the topics.



=============================== =========== ===================================================
Property Name Default Description
=============================== =========== ===================================================
**channels** --
**type** -- The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource``
**zookeeperConnect** -- URI of ZooKeeper used by Kafka cluster
**groupId** flume Unique identified of consumer group. Setting the same id in multiple sources or agents
**type** -- The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource``
**kafka.bootstrap.servers** -- List of brokers in the Kafka cluster used by the source
kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agents
indicates that they are part of the same consumer group
**topic** -- Kafka topic we'll read messages from. At the time, this is a single topic only.
**kafka.topics** -- Comma-separated list of topics the kafka consumer will read messages from.
**kafka.topics.regex** -- Regex that defines set of topics the source is subscribed on. This property has higher priority
than ``kafka.topics`` and overrides ``kafka.topics`` if exists.
batchSize 1000 Maximum number of messages written to Channel in one batch
batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel
The batch will be written whenever the first of size and time will be reached.
Expand All @@ -1214,31 +1216,49 @@ maxBackoffSleep 5000 Maximum wait time that is triggere
ideal for ingestion use cases but a lower value may be required for low latency operations
with interceptors.
Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any producer property supported
by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
For example: kafka.consumer.timeout.ms
Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>` for details
by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.consumer``.
For example: kafka.consumer.auto.offset.reset
Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>` for details
=============================== =========== ===================================================

.. note:: The Kafka Source overrides two Kafka consumer parameters:
auto.commit.enable is set to "false" by the source and we commit every batch. For improved performance
this can be set to "true", however, this can lead to loss of data
consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive
setting this to a higher value can reduce CPU utilization (we'll poll Kafka in less of a tight loop), but also means
higher latency in writing batches to channel (since we'll wait longer for data to arrive).
auto.commit.enable is set to "false" by the source and every batch is committed. Kafka source guarantees at least once
strategy of messages retrieval. The duplicates can be present when the source starts.
The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer)
and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.

Deprecated Properties

Example for agent named tier1:
=============================== =================== =============================================================================================
Property Name Default Description
=============================== =================== =============================================================================================
topic -- Use kafka.topics
groupId flume Use kafka.consumer.group.id
zookeeperConnect -- Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers
to establish connection with kafka cluster
=============================== =================== =============================================================================================

Example for topic subscription by comma-separated topic list.

.. code-block:: properties
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = test1
tier1.sources.source1.groupId = flume
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex

.. code-block:: properties
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
NetCat Source
Expand Down
5 changes: 5 additions & 0 deletions flume-ng-sources/flume-kafka-source/pom.xml
Expand Up @@ -60,6 +60,11 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
Expand Down

0 comments on commit f8abaf7

Please sign in to comment.