From f8abaf78fb98e91b7a228aaa231f4164d8dcfc97 Mon Sep 17 00:00:00 2001 From: Jarek Jarcec Cecho Date: Tue, 29 Mar 2016 09:42:24 -0700 Subject: [PATCH] FLUME-2821: Flume-Kafka Source with new Consumer (Grigoriy Rozhkov via Jarek Jarcec Cecho) --- flume-ng-doc/sphinx/FlumeUserGuide.rst | 58 ++- flume-ng-sources/flume-kafka-source/pom.xml | 5 + .../flume/source/kafka/KafkaSource.java | 398 ++++++++++++------ .../source/kafka/KafkaSourceConstants.java | 36 +- .../flume/source/kafka/KafkaSourceUtil.java | 112 ----- .../kafka/KafkaSourceEmbeddedKafka.java | 96 +++-- .../kafka/KafkaSourceEmbeddedZookeeper.java | 17 +- .../flume/source/kafka/TestKafkaSource.java | 281 ++++++++++--- .../source/kafka/TestKafkaSourceUtil.java | 92 ---- pom.xml | 5 +- 10 files changed, 649 insertions(+), 451 deletions(-) delete mode 100644 flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java delete mode 100644 flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 423e0cf8a3..341ae4229c 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1188,9 +1188,9 @@ Example for agent named a1: Kafka Source ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic. +Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group -so each will read a unique set of partitions for the topic. +so each will read a unique set of partitions for the topics. @@ -1198,11 +1198,13 @@ so each will read a unique set of partitions for the topic. Property Name Default Description =============================== =========== =================================================== **channels** -- -**type** -- The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource`` -**zookeeperConnect** -- URI of ZooKeeper used by Kafka cluster -**groupId** flume Unique identified of consumer group. Setting the same id in multiple sources or agents +**type** -- The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource`` +**kafka.bootstrap.servers** -- List of brokers in the Kafka cluster used by the source +kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group -**topic** -- Kafka topic we'll read messages from. At the time, this is a single topic only. +**kafka.topics** -- Comma-separated list of topics the kafka consumer will read messages from. +**kafka.topics.regex** -- Regex that defines set of topics the source is subscribed on. This property has higher priority + than ``kafka.topics`` and overrides ``kafka.topics`` if exists. batchSize 1000 Maximum number of messages written to Channel in one batch batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached. @@ -1214,31 +1216,49 @@ maxBackoffSleep 5000 Maximum wait time that is triggere ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors. Other Kafka Consumer Properties -- These properties are used to configure the Kafka Consumer. Any producer property supported - by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``. - For example: kafka.consumer.timeout.ms - Check `Kafka documentation ` for details + by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.consumer``. + For example: kafka.consumer.auto.offset.reset + Check `Kafka documentation ` for details =============================== =========== =================================================== .. note:: The Kafka Source overrides two Kafka consumer parameters: - auto.commit.enable is set to "false" by the source and we commit every batch. For improved performance - this can be set to "true", however, this can lead to loss of data - consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive - setting this to a higher value can reduce CPU utilization (we'll poll Kafka in less of a tight loop), but also means - higher latency in writing batches to channel (since we'll wait longer for data to arrive). + auto.commit.enable is set to "false" by the source and every batch is committed. Kafka source guarantees at least once + strategy of messages retrieval. The duplicates can be present when the source starts. + The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer) + and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended. +Deprecated Properties -Example for agent named tier1: +=============================== =================== ============================================================================================= +Property Name Default Description +=============================== =================== ============================================================================================= +topic -- Use kafka.topics +groupId flume Use kafka.consumer.group.id +zookeeperConnect -- Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers + to establish connection with kafka cluster +=============================== =================== ============================================================================================= + +Example for topic subscription by comma-separated topic list. .. code-block:: properties tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 - tier1.sources.source1.zookeeperConnect = localhost:2181 - tier1.sources.source1.topic = test1 - tier1.sources.source1.groupId = flume - tier1.sources.source1.kafka.consumer.timeout.ms = 100 + tier1.sources.source1.batchSize = 5000 + tier1.sources.source1.batchDurationMillis = 2000 + tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 + tier1.sources.source1.kafka.topics = test1, test2 + tier1.sources.source1.kafka.consumer.group.id = custom.g.id +Example for topic subscription by regex +.. code-block:: properties + + tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource + tier1.sources.source1.channels = channel1 + tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 + tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ + # the default kafka.consumer.group.id=flume is used NetCat Source diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml index 0f93476c61..5f5c2a8479 100644 --- a/flume-ng-sources/flume-kafka-source/pom.xml +++ b/flume-ng-sources/flume-kafka-source/pom.xml @@ -60,6 +60,11 @@ org.mockito mockito-all + + org.apache.kafka + kafka-clients + ${kafka.version} + org.apache.kafka kafka_2.10 diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index fd1dd3c17b..db806ccfd9 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -17,40 +17,49 @@ package org.apache.flume.source.kafka; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; - -import org.apache.flume.*; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.event.EventBuilder; -import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.instrumentation.kafka.KafkaSourceCounter; import org.apache.flume.source.AbstractPollableSource; -import org.apache.flume.source.AbstractSource; -import org.apache.flume.source.BasicSourceSemantics; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * A Source for Kafka which reads messages from a kafka topic. + * A Source for Kafka which reads messages from kafka topics. * - * zookeeperConnect: Kafka's zookeeper connection string. - * Required + * kafka.bootstrap.servers: A comma separated list of host:port pairs + * to use for establishing the initial connection to the Kafka cluster. + * For example host1:port1,host2:port2,... + * Required for kafka. *

- * groupId: the group ID of consumer group. Required + * kafka.consumer.group.id: the group ID of consumer group. Required *

- * topic: the topic to consume messages from. Required + * kafka.topics: the topic list separated by commas to consume messages from. + * Required *

* maxBatchSize: Maximum number of messages written to Channel in one * batch. Default: 1000 @@ -58,99 +67,167 @@ * maxBatchDurationMillis: Maximum number of milliseconds before a * batch (of any size) will be written to a channel. Default: 1000 *

- * kafka.auto.commit.enable: If true, commit automatically every time - * period. if false, commit on each batch. Default: false + * kafka.consumer.*: Any property starting with "kafka.consumer" will be + * passed to the kafka consumer So you can use any configuration supported by Kafka 0.9.0.X *

- * kafka.consumer.timeout.ms: Polling interval for new data for batch. - * Low value means more CPU usage. High value means the time.upper.limit may be - * missed. Default: 10 - * - * Any property starting with "kafka" will be passed to the kafka consumer So - * you can use any configuration supported by Kafka 0.8.1.1 */ public class KafkaSource extends AbstractPollableSource implements Configurable { private static final Logger log = LoggerFactory.getLogger(KafkaSource.class); - private ConsumerConnector consumer; - private ConsumerIterator it; - private String topic; - private int batchUpperLimit; - private int timeUpperLimit; - private int consumerTimeout; - private boolean kafkaAutoCommitEnabled; + private Context context; private Properties kafkaProps; - private final List eventList = new ArrayList(); private KafkaSourceCounter counter; + private KafkaConsumer consumer; + private Iterator> it; + + private final List eventList = new ArrayList(); + private Map tpAndOffsetMetadata; + private AtomicBoolean rebalanceFlag; + + private Map headers; + + private int batchUpperLimit; + private int maxBatchDurationMillis; + + private Subscriber subscriber; + + + /** + * This class is a helper to subscribe for topics by using + * different strategies + */ + public abstract class Subscriber { + public abstract void subscribe(KafkaConsumer consumer, SourceRebalanceListener listener); + public T get() {return null;} + } + + private class TopicListSubscriber extends Subscriber> { + private List topicList; + public TopicListSubscriber(String commaSeparatedTopics) { + this.topicList = Arrays.asList(commaSeparatedTopics.split("^\\s+|\\s*,\\s*|\\s+$")); + } + @Override + public void subscribe(KafkaConsumer consumer, SourceRebalanceListener listener) { + consumer.subscribe(topicList, listener); + } + @Override + public List get() { + return topicList; + } + } + + private class PatternSubscriber extends Subscriber { + private Pattern pattern; + public PatternSubscriber(String regex) { + this.pattern = Pattern.compile(regex); + } + @Override + public void subscribe(KafkaConsumer consumer, SourceRebalanceListener listener) { + consumer.subscribe(pattern, listener); + } + @Override + public Pattern get() { + return pattern; + } + } + @Override protected Status doProcess() throws EventDeliveryException { + final String batchUUID = UUID.randomUUID().toString(); byte[] kafkaMessage; - byte[] kafkaKey; + String kafkaKey; Event event; - Map headers; - long batchStartTime = System.currentTimeMillis(); - long batchEndTime = System.currentTimeMillis() + timeUpperLimit; + try { - boolean iterStatus = false; - long startTime = System.nanoTime(); + // prepare time variables for new batch + final long nanoBatchStartTime = System.nanoTime(); + final long batchStartTime = System.currentTimeMillis(); + final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis; + while (eventList.size() < batchUpperLimit && - System.currentTimeMillis() < batchEndTime) { - iterStatus = hasNext(); - if (iterStatus) { - // get next message - MessageAndMetadata messageAndMetadata = it.next(); - kafkaMessage = messageAndMetadata.message(); - kafkaKey = messageAndMetadata.key(); - - // Add headers to event (topic, timestamp, and key) - headers = new HashMap(); - headers.put(KafkaSourceConstants.TIMESTAMP, - String.valueOf(System.currentTimeMillis())); - headers.put(KafkaSourceConstants.TOPIC, topic); - if (kafkaKey != null) { - headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); + System.currentTimeMillis() < maxBatchEndTime) { + + if (it == null || !it.hasNext()) { + // Obtaining new records + // Poll time is remainder time for current batch. + ConsumerRecords records = consumer.poll( + Math.max(0, maxBatchEndTime - System.currentTimeMillis())); + it = records.iterator(); + + // this flag is set to true in a callback when some partitions are revoked. + // If there are any records we commit them. + if (rebalanceFlag.get()) { + rebalanceFlag.set(false); + break; } - if (log.isDebugEnabled()) { - log.debug("Message: {}", new String(kafkaMessage)); + // check records after poll + if (!it.hasNext()) { + if (log.isDebugEnabled()) { + counter.incrementKafkaEmptyCount(); + log.debug("Returning with backoff. No more data to read"); + } + // batch time exceeded + break; } - event = EventBuilder.withBody(kafkaMessage, headers); - eventList.add(event); } + + // get next message + ConsumerRecord message = it.next(); + kafkaKey = message.key(); + kafkaMessage = message.value(); + + headers.clear(); + // Add headers to event (timestamp, topic, partition, key) + headers.put(KafkaSourceConstants.TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis())); + headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic()); + headers.put(KafkaSourceConstants.PARTITION_HEADER, String.valueOf(message.partition())); + if (kafkaKey != null) { + headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey); + } + + if (log.isDebugEnabled()) { + log.debug("Topic: {} Partition: {} Message: {}", new String[]{ + message.topic(), + String.valueOf(message.partition()), + new String(kafkaMessage)}); + } + + event = EventBuilder.withBody(kafkaMessage, headers); + eventList.add(event); + if (log.isDebugEnabled()) { log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime); log.debug("Event #: {}", eventList.size()); } + + // For each partition store next offset that is going to be read. + tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()), + new OffsetAndMetadata(message.offset() + 1, batchUUID)); } - long endTime = System.nanoTime(); - counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000)); - counter.addToEventReceivedCount(Long.valueOf(eventList.size())); - // If we have events, send events to channel - // clear the event list - // and commit if Kafka doesn't auto-commit + if (eventList.size() > 0) { + counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000)); + counter.addToEventReceivedCount((long) eventList.size()); getChannelProcessor().processEventBatch(eventList); counter.addToEventAcceptedCount(eventList.size()); - eventList.clear(); if (log.isDebugEnabled()) { log.debug("Wrote {} events to channel", eventList.size()); } - if (!kafkaAutoCommitEnabled) { - // commit the read transactions to Kafka to avoid duplicates + eventList.clear(); + + if (!tpAndOffsetMetadata.isEmpty()) { long commitStartTime = System.nanoTime(); - consumer.commitOffsets(); + consumer.commitSync(tpAndOffsetMetadata); long commitEndTime = System.nanoTime(); - counter.addToKafkaCommitTimer((commitEndTime-commitStartTime)/(1000*1000)); - } - } - if (!iterStatus) { - if (log.isDebugEnabled()) { - counter.incrementKafkaEmptyCount(); - log.debug("Returning with backoff. No more data to read"); + counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000)); + tpAndOffsetMetadata.clear(); } - return Status.BACKOFF; + return Status.READY; } - return Status.READY; + + return Status.BACKOFF; } catch (Exception e) { log.error("KafkaSource EXCEPTION, {}", e); return Status.BACKOFF; @@ -161,96 +238,153 @@ protected Status doProcess() throws EventDeliveryException { * We configure the source and generate properties for the Kafka Consumer * * Kafka Consumer properties are generated as follows: - * * 1. Generate a properties object with some static defaults that can be - * overridden by Source configuration 2. We add the configuration users added - * for Kafka (parameters starting with kafka. and must be valid Kafka Consumer - * properties 3. We add the source documented parameters which can override - * other properties - * + * overridden if corresponding properties are specified + * 2. We add the configuration users added for Kafka (parameters starting + * with kafka.consumer and must be valid Kafka Consumer properties + * 3. Add source level properties (with no prefix) * @param context */ @Override protected void doConfigure(Context context) throws FlumeException { this.context = context; + headers = new HashMap(4); + tpAndOffsetMetadata = new HashMap(); + rebalanceFlag = new AtomicBoolean(false); + kafkaProps = new Properties(); + + // can be removed in the next release + // See https://issues.apache.org/jira/browse/FLUME-2896 + translateOldProperties(context); + + String topicProperty = context.getString(KafkaSourceConstants.TOPICS_REGEX); + if (topicProperty != null && !topicProperty.isEmpty()) { + // create subscriber that uses pattern-based subscription + subscriber = new PatternSubscriber(topicProperty); + } else + if((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null && !topicProperty.isEmpty()) { + // create subscriber that uses topic list subscription + subscriber = new TopicListSubscriber(topicProperty); + } else + if (subscriber == null) { + throw new ConfigurationException("At least one Kafka topic must be specified."); + } + batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE, KafkaSourceConstants.DEFAULT_BATCH_SIZE); - timeUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS, + maxBatchDurationMillis = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS, KafkaSourceConstants.DEFAULT_BATCH_DURATION); - topic = context.getString(KafkaSourceConstants.TOPIC); - if(topic == null) { - throw new ConfigurationException("Kafka topic must be specified."); + String bootstrapServers = context.getString(KafkaSourceConstants.BOOTSTRAP_SERVERS); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + throw new ConfigurationException("Bootstrap Servers must be specified"); } - kafkaProps = KafkaSourceUtil.getKafkaProperties(context); - consumerTimeout = Integer.parseInt(kafkaProps.getProperty( - KafkaSourceConstants.CONSUMER_TIMEOUT)); - kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty( - KafkaSourceConstants.AUTO_COMMIT_ENABLED)); + setConsumerProps(context, bootstrapServers); if (counter == null) { counter = new KafkaSourceCounter(getName()); } } + + // We can remove this once the properties are officially deprecated + private void translateOldProperties(Context ctx) { + // topic + String topic = context.getString(KafkaSourceConstants.TOPIC); + if (topic != null && !topic.isEmpty()) { + subscriber = new TopicListSubscriber(topic); + log.warn("{} is deprecated. Please use the parameter {}", + KafkaSourceConstants.TOPIC, KafkaSourceConstants.TOPICS); + } + + // old groupId + String groupId = ctx.getString(KafkaSourceConstants.OLD_GROUP_ID); + if (groupId != null && !groupId.isEmpty()) { + kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + log.warn("{} is deprecated. Please use the parameter {}", + KafkaSourceConstants.OLD_GROUP_ID, + KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); + } + } + + + private void setConsumerProps(Context ctx, String bootStrapServers) { + String groupId = ctx.getString(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG); + if ((groupId == null || groupId.isEmpty()) && + kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) { + groupId = KafkaSourceConstants.DEFAULT_GROUP_ID; + log.info("Group ID was not specified. Using " + groupId + " as the group id."); + } + kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER); + kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER); + //Defaults overridden based on config + kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX)); + //These always take precedence over config + kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + if (groupId != null) { + kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + } + kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaSourceConstants.DEFAULT_AUTO_COMMIT); + + log.info(kafkaProps.toString()); + } + + Properties getConsumerProps() { + return kafkaProps; + } + + Subscriber getSubscriber() { + return subscriber; + } + @Override protected void doStart() throws FlumeException { log.info("Starting {}...", this); - try { - //initialize a consumer. This creates the connection to ZooKeeper - consumer = KafkaSourceUtil.getConsumer(kafkaProps); - } catch (Exception e) { - throw new FlumeException("Unable to create consumer. " + - "Check whether the ZooKeeper server is up and that the " + - "Flume agent can connect to it.", e); - } + //initialize a consumer. + consumer = new KafkaConsumer(kafkaProps); - Map topicCountMap = new HashMap(); - // We always have just one topic being read by one thread - topicCountMap.put(topic, 1); + // Subscribe for topics by already specified strategy + subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag)); - // Get the message iterator for our topic - // Note that this succeeds even if the topic doesn't exist - // in that case we simply get no messages for the topic - // Also note that currently we only support a single topic - try { - Map>> consumerMap = - consumer.createMessageStreams(topicCountMap); - List> topicList = consumerMap.get(topic); - KafkaStream stream = topicList.get(0); - it = stream.iterator(); - } catch (Exception e) { - throw new FlumeException("Unable to get message iterator from Kafka", e); - } - log.info("Kafka source {} do started.", getName()); + // Connect to kafka. 1 second is optimal time. + it = consumer.poll(1000).iterator(); + log.info("Kafka source {} started.", getName()); counter.start(); } @Override protected void doStop() throws FlumeException { if (consumer != null) { - // exit cleanly. This syncs offsets of messages read to ZooKeeper - // to avoid reading the same messages again - consumer.shutdown(); + consumer.wakeup(); + consumer.close(); } counter.stop(); - log.info("Kafka Source {} do stopped. Metrics: {}", getName(), counter); + log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter); } +} - /** - * Check if there are messages waiting in Kafka, - * waiting until timeout (10ms by default) for messages to arrive. - * and catching the timeout exception to return a boolean - */ - boolean hasNext() { - try { - it.hasNext(); - return true; - } catch (ConsumerTimeoutException e) { - return false; + +class SourceRebalanceListener implements ConsumerRebalanceListener { + private static final Logger log = LoggerFactory.getLogger(SourceRebalanceListener.class); + private AtomicBoolean rebalanceFlag; + + public SourceRebalanceListener(AtomicBoolean rebalanceFlag) { + this.rebalanceFlag = rebalanceFlag; + } + + // Set a flag that a rebalance has occurred. Then commit already read events to kafka. + public void onPartitionsRevoked(Collection partitions) { + for (TopicPartition partition : partitions) { + log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition()); + rebalanceFlag.set(true); } } -} \ No newline at end of file + public void onPartitionsAssigned(Collection partitions) { + for (TopicPartition partition : partitions) { + log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition()); + } + } +} diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 911012cefc..2999cf272b 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -16,25 +16,33 @@ */ package org.apache.flume.source.kafka; +import org.apache.kafka.clients.CommonClientConfigs; + public class KafkaSourceConstants { - public static final String TOPIC = "topic"; - public static final String KEY = "key"; - public static final String TIMESTAMP = "timestamp"; + + public static final String KAFKA_PREFIX = "kafka."; + public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer."; + public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + public static final String BOOTSTRAP_SERVERS = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + public static final String TOPICS = KAFKA_PREFIX + "topics"; + public static final String TOPICS_REGEX = TOPICS + "." + "regex"; + public static final String DEFAULT_AUTO_COMMIT = "false"; public static final String BATCH_SIZE = "batchSize"; public static final String BATCH_DURATION_MS = "batchDurationMillis"; - public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms"; - public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable"; - public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; - public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect"; - public static final String GROUP_ID = "group.id"; - public static final String GROUP_ID_FLUME = "groupId"; - public static final String PROPERTY_PREFIX = "kafka."; - - public static final int DEFAULT_BATCH_SIZE = 1000; public static final int DEFAULT_BATCH_DURATION = 1000; - public static final String DEFAULT_CONSUMER_TIMEOUT = "10"; - public static final String DEFAULT_AUTO_COMMIT = "false"; public static final String DEFAULT_GROUP_ID = "flume"; + /* Old Properties */ + + public static final String TOPIC = "topic"; + public static final String OLD_GROUP_ID = "groupId"; + + // flume event headers + public static final String TOPIC_HEADER = "topic"; + public static final String KEY_HEADER = "key"; + public static final String TIMESTAMP_HEADER = "timestamp"; + public static final String PARTITION_HEADER = "partition"; + } diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java deleted file mode 100644 index 4a4034bd82..0000000000 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flume.source.kafka; - -import java.util.Map; -import java.util.Properties; - -import kafka.common.KafkaException; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.javaapi.consumer.ConsumerConnector; - -import org.apache.flume.Context; -import org.apache.flume.conf.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaSourceUtil { - private static final Logger log = - LoggerFactory.getLogger(KafkaSourceUtil.class); - - public static Properties getKafkaProperties(Context context) { - log.info("context={}",context.toString()); - Properties props = generateDefaultKafkaProps(); - setKafkaProps(context,props); - addDocumentedKafkaProps(context,props); - return props; - } - - public static ConsumerConnector getConsumer(Properties kafkaProps) { - ConsumerConfig consumerConfig = - new ConsumerConfig(kafkaProps); - ConsumerConnector consumer = - Consumer.createJavaConsumerConnector(consumerConfig); - return consumer; - } - - /** - * Generate consumer properties object with some defaults - * @return - */ - private static Properties generateDefaultKafkaProps() { - Properties props = new Properties(); - props.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED, - KafkaSourceConstants.DEFAULT_AUTO_COMMIT); - props.put(KafkaSourceConstants.CONSUMER_TIMEOUT, - KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT); - props.put(KafkaSourceConstants.GROUP_ID, - KafkaSourceConstants.DEFAULT_GROUP_ID); - return props; - } - - /** - * Add all configuration parameters starting with "kafka" - * to consumer properties - */ - private static void setKafkaProps(Context context,Properties kafkaProps) { - - Map kafkaProperties = - context.getSubProperties(KafkaSourceConstants.PROPERTY_PREFIX); - - for (Map.Entry prop : kafkaProperties.entrySet()) { - - kafkaProps.put(prop.getKey(), prop.getValue()); - if (log.isDebugEnabled()) { - log.debug("Reading a Kafka Producer Property: key: " - + prop.getKey() + ", value: " + prop.getValue()); - } - } - } - - /** - * Some of the producer properties are especially important - * We documented them and gave them a camel-case name to match Flume config - * If user set these, we will override any existing parameters with these - * settings. - * Knowledge of which properties are documented is maintained here for now. - * If this will become a maintenance issue we'll set a proper data structure. - */ - private static void addDocumentedKafkaProps(Context context, - Properties kafkaProps) - throws ConfigurationException { - String zookeeperConnect = context.getString( - KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME); - if (zookeeperConnect == null) { - throw new ConfigurationException("ZookeeperConnect must contain " + - "at least one ZooKeeper server"); - } - kafkaProps.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, zookeeperConnect); - - String groupID = context.getString(KafkaSourceConstants.GROUP_ID_FLUME); - - if (groupID != null ) { - kafkaProps.put(KafkaSourceConstants.GROUP_ID, groupID); - } - } - -} \ No newline at end of file diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index 26c5c9d0aa..46d545f90a 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -18,27 +18,59 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import kafka.admin.AdminUtils; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; -import kafka.utils.ZKStringSerializer$; +import org.apache.commons.io.FileUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; public class KafkaSourceEmbeddedKafka { + + public static String HOST; + + static { + try { + HOST = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + throw new RuntimeException("Host address can not be obtained", e); + } + } + KafkaServerStartable kafkaServer; KafkaSourceEmbeddedZookeeper zookeeper; + int zkPort = 21818; // none-standard - Producer producer; + int serverPort = 18922; + + KafkaProducer producer; + File dir; - public KafkaSourceEmbeddedKafka() { + public KafkaSourceEmbeddedKafka(Properties properties) { zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort); + dir = new File(System.getProperty("java.io.tmpdir"), "kafka_log-" + UUID.randomUUID()); + try { + FileUtils.deleteDirectory(dir); + } catch (IOException e) { + e.printStackTrace(); + } Properties props = new Properties(); props.put("zookeeper.connect",zookeeper.getConnectString()); props.put("broker.id","1"); + props.put("host.name", "localhost"); + props.put("port", String.valueOf(serverPort)); + props.put("log.dir", dir.getAbsolutePath()); + if (properties != null) + props.putAll(props); KafkaConfig config = new KafkaConfig(props); kafkaServer = new KafkaServerStartable(config); kafkaServer.startup(); @@ -55,37 +87,49 @@ public String getZkConnectString() { return zookeeper.getConnectString(); } - private void initProducer() - { - Properties props = new Properties(); - props.put("metadata.broker.list","127.0.0.1:" + - kafkaServer.serverConfig().port()); - props.put("serializer.class","kafka.serializer.StringEncoder"); - props.put("request.required.acks", "1"); - - ProducerConfig config = new ProducerConfig(props); - - producer = new Producer(config); + public String getBrockers() { + return HOST + ":" + serverPort; + } + private void initProducer() { + Properties props = new Properties(); + props.put("bootstrap.servers", HOST + ":" + serverPort); + props.put("acks", "1"); + producer = new KafkaProducer(props, + new StringSerializer(), new StringSerializer()); } public void produce(String topic, String k, String v) { - KeyedMessage message = new KeyedMessage(topic,k,v); - producer.send(message); + ProducerRecord rec = new ProducerRecord(topic, k, v); + try { + producer.send(rec).get(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } } - public void createTopic(String topicName) { + public void produce(String topic, int partition, String k, String v) { + ProducerRecord rec = new ProducerRecord(topic, partition, k, v); + try { + producer.send(rec).get(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + } + + public void createTopic(String topicName, int numPartitions) { // Create a ZooKeeper client int sessionTimeoutMs = 10000; int connectionTimeoutMs = 10000; - ZkClient zkClient = new ZkClient(zookeeper.getConnectString(), - sessionTimeoutMs, connectionTimeoutMs, - ZKStringSerializer$.MODULE$); - - int numPartitions = 1; + ZkClient zkClient = ZkUtils.createZkClient(HOST + ":" + zkPort, sessionTimeoutMs, connectionTimeoutMs); + ZkUtils zkUtils = ZkUtils.apply(zkClient, false); int replicationFactor = 1; Properties topicConfig = new Properties(); - AdminUtils.createTopic(zkClient, topicName, numPartitions, + AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig); } diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java index 1b8a27106e..db144c2b8e 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.UUID; public class KafkaSourceEmbeddedZookeeper { private int zkPort; @@ -31,19 +32,25 @@ public class KafkaSourceEmbeddedZookeeper { File dir; - public KafkaSourceEmbeddedZookeeper(int zkPort){ - int numConnections = 5000; + public KafkaSourceEmbeddedZookeeper(int zkPort) { int tickTime = 2000; this.zkPort = zkPort; String dataDirectory = System.getProperty("java.io.tmpdir"); - dir = new File(dataDirectory, "zookeeper").getAbsoluteFile(); + dir = new File(dataDirectory, "zookeeper" + UUID.randomUUID()).getAbsoluteFile(); + + try { + FileUtils.deleteDirectory(dir); + } catch (IOException e) { + e.printStackTrace(); + System.exit(1); + } try { this.zookeeper = new ZooKeeperServer(dir,dir,tickTime); this.factory = new NIOServerCnxnFactory(); - factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0); + factory.configure(new InetSocketAddress(KafkaSourceEmbeddedKafka.HOST, zkPort),0); factory.startup(zookeeper); } catch (IOException e) { e.printStackTrace(); @@ -59,6 +66,6 @@ public void stopZookeeper() throws IOException { } public String getConnectString() { - return "127.0.0.1:"+zkPort; + return KafkaSourceEmbeddedKafka.HOST + ":" + zkPort; } } diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 8ec14cccf5..8e04da8387 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -23,17 +23,18 @@ import static org.mockito.Mockito.*; import java.util.List; +import java.util.Properties; +import java.util.regex.Pattern; import com.google.common.base.Charsets; import com.google.common.collect.Lists; import junit.framework.Assert; import kafka.common.TopicExistsException; -import kafka.consumer.ConsumerIterator; -import kafka.message.Message; import org.apache.flume.*; import org.apache.flume.PollableSource.Status; import org.apache.flume.channel.ChannelProcessor; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,63 +43,133 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.flume.source.kafka.KafkaSourceConstants.*; + public class TestKafkaSource { private static final Logger log = LoggerFactory.getLogger(TestKafkaSource.class); private KafkaSource kafkaSource; private KafkaSourceEmbeddedKafka kafkaServer; - private ConsumerIterator mockIt; - private Message message; private Context context; private List events; - private String topicName = "test1"; - + private String topic0 = "test1"; + private String topic1 = "topic1"; @SuppressWarnings("unchecked") @Before public void setup() throws Exception { - kafkaSource = new KafkaSource(); - kafkaServer = new KafkaSourceEmbeddedKafka(); + kafkaServer = new KafkaSourceEmbeddedKafka(null); try { - kafkaServer.createTopic(topicName); + kafkaServer.createTopic(topic0, 1); + kafkaServer.createTopic(topic1, 3); } catch (TopicExistsException e) { //do nothing + e.printStackTrace(); } - - - context = new Context(); - context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME, - kafkaServer.getZkConnectString()); - context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume"); - context.put(KafkaSourceConstants.TOPIC,topicName); - context.put("kafka.consumer.timeout.ms","100"); - + context = prepareDefaultContext(); kafkaSource.setChannelProcessor(createGoodChannel()); } + private Context prepareDefaultContext() { + Context context = new Context(); + context.put(BOOTSTRAP_SERVERS, kafkaServer.getBrockers()); + context.put(KAFKA_CONSUMER_PREFIX + "group.id", "flume-group"); + return context; + } + @After public void tearDown() throws Exception { kafkaSource.stop(); kafkaServer.stop(); } + @SuppressWarnings("unchecked") + @Test + public void testOffsets() throws InterruptedException, EventDeliveryException { + long batchDuration = 2000; + context.put(TOPICS, topic1); + context.put(BATCH_DURATION_MS, + String.valueOf(batchDuration)); + context.put(BATCH_SIZE, "3"); + kafkaSource.configure(context); + kafkaSource.start(); + Thread.sleep(500L); + Status status = kafkaSource.process(); + assertEquals(Status.BACKOFF, status); + assertEquals(0, events.size()); + kafkaServer.produce(topic1, "", "record1"); + kafkaServer.produce(topic1, "", "record2"); + Thread.sleep(500L); + status = kafkaSource.process(); + assertEquals(Status.READY, status); + assertEquals(2, events.size()); + events.clear(); + kafkaServer.produce(topic1, "", "record3"); + kafkaServer.produce(topic1, "", "record4"); + kafkaServer.produce(topic1, "", "record5"); + Thread.sleep(500L); + assertEquals(Status.READY, kafkaSource.process()); + assertEquals(3, events.size()); + assertEquals("record3", new String(events.get(0).getBody(), Charsets.UTF_8)); + assertEquals("record4", new String(events.get(1).getBody(), Charsets.UTF_8)); + assertEquals("record5", new String(events.get(2).getBody(), Charsets.UTF_8)); + events.clear(); + kafkaServer.produce(topic1, "", "record6"); + kafkaServer.produce(topic1, "", "record7"); + kafkaServer.produce(topic1, "", "record8"); + kafkaServer.produce(topic1, "", "record9"); + kafkaServer.produce(topic1, "", "record10"); + Thread.sleep(500L); + assertEquals(Status.READY, kafkaSource.process()); + assertEquals(3, events.size()); + assertEquals("record6", new String(events.get(0).getBody(), Charsets.UTF_8)); + assertEquals("record7", new String(events.get(1).getBody(), Charsets.UTF_8)); + assertEquals("record8", new String(events.get(2).getBody(), Charsets.UTF_8)); + events.clear(); + kafkaServer.produce(topic1, "", "record11"); + // status must be READY due to time out exceed. + assertEquals(Status.READY, kafkaSource.process()); + assertEquals(3, events.size()); + assertEquals("record9", new String(events.get(0).getBody(), Charsets.UTF_8)); + assertEquals("record10", new String(events.get(1).getBody(), Charsets.UTF_8)); + assertEquals("record11", new String(events.get(2).getBody(), Charsets.UTF_8)); + events.clear(); + kafkaServer.produce(topic1, "", "record12"); + kafkaServer.produce(topic1, "", "record13"); + // stop kafka source + kafkaSource.stop(); + // start again + kafkaSource = new KafkaSource(); + kafkaSource.setChannelProcessor(createGoodChannel()); + kafkaSource.configure(context); + kafkaSource.start(); + kafkaServer.produce(topic1, "", "record14"); + Thread.sleep(1000L); + assertEquals(Status.READY, kafkaSource.process()); + assertEquals(3, events.size()); + assertEquals("record12", new String(events.get(0).getBody(), Charsets.UTF_8)); + assertEquals("record13", new String(events.get(1).getBody(), Charsets.UTF_8)); + assertEquals("record14", new String(events.get(2).getBody(), Charsets.UTF_8)); + events.clear(); + } + @SuppressWarnings("unchecked") @Test public void testProcessItNotEmpty() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); - kafkaServer.produce(topicName, "", "hello, world"); + kafkaServer.produce(topic0, "", "hello, world"); Thread.sleep(500L); - Assert.assertEquals(Status.READY, kafkaSource.process()); Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); Assert.assertEquals(1, events.size()); @@ -112,14 +183,15 @@ public void testProcessItNotEmpty() throws EventDeliveryException, public void testProcessItNotEmptyBatch() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.BATCH_SIZE,"2"); + context.put(TOPICS, topic0); + context.put(BATCH_SIZE,"2"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); - kafkaServer.produce(topicName, "", "hello, world"); - kafkaServer.produce(topicName, "", "foo, bar"); + kafkaServer.produce(topic0, "", "hello, world"); + kafkaServer.produce(topic0, "", "foo, bar"); Thread.sleep(500L); @@ -138,6 +210,7 @@ public void testProcessItNotEmptyBatch() throws EventDeliveryException, public void testProcessItEmpty() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException, InterruptedException { + context.put(TOPICS, topic0); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); @@ -151,7 +224,7 @@ public void testProcessItEmpty() throws EventDeliveryException, public void testNonExistingTopic() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.TOPIC,"faketopic"); + context.put(TOPICS,"faketopic"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); @@ -162,10 +235,11 @@ public void testNonExistingTopic() throws EventDeliveryException, @SuppressWarnings("unchecked") @Test(expected= FlumeException.class) - public void testNonExistingZk() throws EventDeliveryException, + public void testNonExistingKafkaServer() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666"); + context.put(TOPICS, topic0); + context.put(BOOTSTRAP_SERVERS,"blabla:666"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); @@ -177,37 +251,39 @@ public void testNonExistingZk() throws EventDeliveryException, @Test public void testBatchTime() throws InterruptedException, EventDeliveryException { - context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250"); + context.put(TOPICS, topic0); + context.put(BATCH_DURATION_MS, "250"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); for (int i=1; i<5000; i++) { - kafkaServer.produce(topicName, "", "hello, world " + i); + kafkaServer.produce(topic0, "", "hello, world " + i); } Thread.sleep(500L); + long error = 50; long startTime = System.currentTimeMillis(); Status status = kafkaSource.process(); long endTime = System.currentTimeMillis(); assertEquals(Status.READY, status); assertTrue(endTime - startTime < - ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) + - context.getLong("kafka.consumer.timeout.ms")) ); + (context.getLong(BATCH_DURATION_MS) + error)); } // Consume event, stop source, start again and make sure we are not // consuming same event again @Test public void testCommit() throws InterruptedException, EventDeliveryException { - context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); - kafkaServer.produce(topicName, "", "hello, world"); + kafkaServer.produce(topic0, "", "hello, world"); Thread.sleep(500L); @@ -224,14 +300,14 @@ public void testCommit() throws InterruptedException, EventDeliveryException { @Test public void testNonCommit() throws EventDeliveryException, InterruptedException { - - context.put(KafkaSourceConstants.BATCH_SIZE,"1"); - context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000"); + context.put(TOPICS, topic0); + context.put(BATCH_SIZE,"1"); + context.put(BATCH_DURATION_MS,"30000"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); - kafkaServer.produce(topicName, "", "hello, world"); + kafkaServer.produce(topic0, "", "hello, world"); Thread.sleep(500L); kafkaSource.setChannelProcessor(createBadChannel()); @@ -252,13 +328,14 @@ public void testNonCommit() throws EventDeliveryException, @Test public void testTwoBatches() throws InterruptedException, EventDeliveryException { - context.put(KafkaSourceConstants.BATCH_SIZE,"1"); - context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000"); + context.put(TOPICS, topic0); + context.put(BATCH_SIZE,"1"); + context.put(BATCH_DURATION_MS, "30000"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); - kafkaServer.produce(topicName, "", "event 1"); + kafkaServer.produce(topic0, "", "event 1"); Thread.sleep(500L); kafkaSource.process(); @@ -266,7 +343,7 @@ public void testTwoBatches() throws InterruptedException, Charsets.UTF_8)); events.clear(); - kafkaServer.produce(topicName, "", "event 2"); + kafkaServer.produce(topic0, "", "event 2"); Thread.sleep(500L); kafkaSource.process(); Assert.assertEquals("event 2", new String(events.get(0).getBody(), @@ -276,14 +353,15 @@ public void testTwoBatches() throws InterruptedException, @Test public void testTwoBatchesWithAutocommit() throws InterruptedException, EventDeliveryException { - context.put(KafkaSourceConstants.BATCH_SIZE,"1"); - context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000"); - context.put("kafka.auto.commit.enable","true"); + context.put(TOPICS, topic0); + context.put(BATCH_SIZE,"1"); + context.put(BATCH_DURATION_MS,"30000"); + context.put(KAFKA_CONSUMER_PREFIX + "enable.auto.commit", "true"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); - kafkaServer.produce(topicName, "", "event 1"); + kafkaServer.produce(topic0, "", "event 1"); Thread.sleep(500L); kafkaSource.process(); @@ -291,7 +369,7 @@ public void testTwoBatchesWithAutocommit() throws InterruptedException, Charsets.UTF_8)); events.clear(); - kafkaServer.produce(topicName, "", "event 2"); + kafkaServer.produce(topic0, "", "event 2"); Thread.sleep(500L); kafkaSource.process(); Assert.assertEquals("event 2", new String(events.get(0).getBody(), @@ -304,13 +382,14 @@ public void testTwoBatchesWithAutocommit() throws InterruptedException, public void testNullKey() throws EventDeliveryException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException, InterruptedException { - context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); kafkaSource.configure(context); kafkaSource.start(); Thread.sleep(500L); - kafkaServer.produce(topicName, null , "hello, world"); + kafkaServer.produce(topic0, null, "hello, world"); Thread.sleep(500L); @@ -322,6 +401,110 @@ public void testNullKey() throws EventDeliveryException, Charsets.UTF_8)); } + @Test + public void testSourceProperties() { + Context context = new Context(); + context.put(TOPICS, "test1, test2"); + context.put(TOPICS_REGEX, "^stream[0-9]$"); + context.put(BOOTSTRAP_SERVERS, "bootstrap-servers-list"); + KafkaSource source = new KafkaSource(); + source.doConfigure(context); + + //check that kafka.topics.regex has higher priority than topics + //type of subscriber should be PatternSubscriber + KafkaSource.Subscriber subscriber = source.getSubscriber(); + Pattern pattern = subscriber.get(); + Assert.assertTrue(pattern.matcher("stream1").find()); + } + + @Test + public void testKafkaProperties() { + Context context = new Context(); + context.put(TOPICS, "test1, test2"); + context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.default.group.id"); + context.put(KAFKA_CONSUMER_PREFIX + "fake.property", "kafka.property.value"); + context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list"); + context.put(KAFKA_CONSUMER_PREFIX + "bootstrap.servers", "bad-bootstrap-servers-list"); + KafkaSource source = new KafkaSource(); + source.doConfigure(context); + Properties kafkaProps = source.getConsumerProps(); + + //check that we have defaults set + assertEquals( + String.valueOf(DEFAULT_AUTO_COMMIT), + kafkaProps.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + //check that kafka properties override the default and get correct name + assertEquals( + "override.default.group.id", + kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + //check that any kafka property gets in + assertEquals( + "kafka.property.value", + kafkaProps.getProperty("fake.property")); + //check that documented property overrides defaults + assertEquals( + "real-bootstrap-servers-list", + kafkaProps.getProperty("bootstrap.servers")); + } + + @Test + public void testOldProperties() { + Context context = new Context(); + + context.put(TOPIC, "old.topic"); + context.put(OLD_GROUP_ID, "old.groupId"); + context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list"); + KafkaSource source = new KafkaSource(); + source.doConfigure(context); + Properties kafkaProps = source.getConsumerProps(); + + KafkaSource.Subscriber> subscriber = source.getSubscriber(); + //check topic was set + assertEquals( + "old.topic", + subscriber.get().get(0)); + //check that kafka old properties override the default and get correct name + assertEquals( + "old.groupId", + kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + + source = new KafkaSource(); + context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.old.group.id"); + source.doConfigure(context); + kafkaProps = source.getConsumerProps(); + //check that kafka new properties override old + assertEquals( + "override.old.group.id", + kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + + context.clear(); + context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list"); + context.put(TOPIC, "old.topic"); + source = new KafkaSource(); + source.doConfigure(context); + kafkaProps = source.getConsumerProps(); + //check defaults set + assertEquals( + KafkaSourceConstants.DEFAULT_GROUP_ID, + kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); + } + + @Test + public void testPatternBasedSubscription() { + Context context = new Context(); + + context.put(TOPICS_REGEX, "^topic[0-9]$"); + context.put(OLD_GROUP_ID, "old.groupId"); + context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list"); + KafkaSource source = new KafkaSource(); + source.doConfigure(context); + KafkaSource.Subscriber subscriber = source.getSubscriber(); + for (int i = 0; i < 10; i++) { + Assert.assertTrue(subscriber.get().matcher("topic" + i).find()); + } + Assert.assertFalse(subscriber.get().matcher("topic").find()); + } + ChannelProcessor createGoodChannel() { ChannelProcessor channelProcessor = mock(ChannelProcessor.class); @@ -352,4 +535,4 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return channelProcessor; } -} \ No newline at end of file +} diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java deleted file mode 100644 index 0cbb4b69ec..0000000000 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flume.source.kafka; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.util.Properties; - -import kafka.javaapi.consumer.ConsumerConnector; -import org.apache.flume.Context; -import org.apache.zookeeper.server.*; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestKafkaSourceUtil { - private Properties props = new Properties(); - private Context context = new Context(); - private int zkPort = 21818; // none-standard - private KafkaSourceEmbeddedZookeeper zookeeper; - - @Before - public void setUp() throws Exception { - context.put("kafka.consumer.timeout", "10"); - context.put("type", "KafkaSource"); - context.put("topic", "test"); - context.put("zookeeperConnect", "127.0.0.1:"+zkPort); - context.put("groupId","test"); - props = KafkaSourceUtil.getKafkaProperties(context); - zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort); - - - } - - @After - public void tearDown() throws Exception { - zookeeper.stopZookeeper(); - } - - - @Test - public void testGetConsumer() { - ConsumerConnector cc = KafkaSourceUtil.getConsumer(props); - assertNotNull(cc); - - } - - @Test - public void testKafkaConsumerProperties() { - Context context = new Context(); - context.put("kafka.auto.commit.enable", "override.default.autocommit"); - context.put("kafka.fake.property", "kafka.property.value"); - context.put("kafka.zookeeper.connect","bad-zookeeper-list"); - context.put("zookeeperConnect","real-zookeeper-list"); - Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context); - - //check that we have defaults set - assertEquals( - kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID), - KafkaSourceConstants.DEFAULT_GROUP_ID); - //check that kafka properties override the default and get correct name - assertEquals( - kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED), - "override.default.autocommit"); - //check that any kafka property gets in - assertEquals(kafkaProps.getProperty("fake.property"), - "kafka.property.value"); - //check that documented property overrides defaults - assertEquals(kafkaProps.getProperty("zookeeper.connect") - ,"real-zookeeper-list"); - } - - -} diff --git a/pom.xml b/pom.xml index 15c086b509..3b2c97ce52 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ limitations under the License. 0.90.1 2.4.0 0.7.0 + 0.9.0.1 1.0.0 1.0.0 2.7.1 @@ -1337,12 +1338,12 @@ limitations under the License. org.apache.kafka kafka_2.10 - 0.8.1.1 + ${kafka.version} org.apache.kafka kafka_2.10 - 0.8.1.1 + ${kafka.version} test test