From b8d5b31688c0aed3b5101f2c45557afe518d18be Mon Sep 17 00:00:00 2001 From: Ivan Vasylyev Date: Thu, 27 Nov 2014 15:44:09 +0200 Subject: [PATCH 1/3] https://issues.apache.org/jira/browse/CAMEL-8085 - add handling of offset if auto commit of offset is disabled - add embedded kafka into unit tests --- components/camel-kafka/pom.xml | 4 + .../component/kafka/KafkaConfiguration.java | 18 ++ .../camel/component/kafka/KafkaConsumer.java | 117 +++++++++--- .../camel/component/kafka/KafkaEndpoint.java | 18 +- .../kafka/BaseEmbeddedKafkaTest.java | 58 ++++++ .../kafka/KafkaConsumerBatchSizeTest.java | 112 ++++++++++++ ...umerIT.java => KafkaConsumerFullTest.java} | 12 +- ...ucerIT.java => KafkaProducerFullTest.java} | 17 +- .../kafka/embedded/EmbeddedKafkaCluster.java | 170 ++++++++++++++++++ .../kafka/embedded/EmbeddedZookeeper.java | 113 ++++++++++++ .../component/kafka/embedded/SystemTime.java | 37 ++++ .../component/kafka/embedded/TestUtils.java | 65 +++++++ parent/pom.xml | 2 +- 13 files changed, 698 insertions(+), 45 deletions(-) create mode 100644 components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java create mode 100644 components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java rename components/camel-kafka/src/test/java/org/apache/camel/component/kafka/{KafkaConsumerIT.java => KafkaConsumerFullTest.java} (86%) rename components/camel-kafka/src/test/java/org/apache/camel/component/kafka/{KafkaProducerIT.java => KafkaProducerFullTest.java} (93%) create mode 100644 components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java create mode 100644 components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java create mode 100644 components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java create mode 100644 components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml index 9ea9d8537db16..b6ddff051e0eb 100644 --- a/components/camel-kafka/pom.xml +++ b/components/camel-kafka/pom.xml @@ -50,6 +50,10 @@ org.slf4j slf4j-simple + + scala-library + org.scala-lang + diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 881ef3cb67bae..7d59dd261470b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -28,6 +28,8 @@ public class KafkaConfiguration { private String groupId; private String partitioner = DefaultPartitioner.class.getCanonicalName(); private int consumerStreams = 10; + private int consumersCount = 1; + private int batchSize = 100; //Common configuration properties private String clientId; @@ -197,6 +199,22 @@ public void setConsumerStreams(int consumerStreams) { this.consumerStreams = consumerStreams; } + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public int getConsumersCount() { + return consumersCount; + } + + public void setConsumersCount(int consumersCount) { + this.consumersCount = consumersCount; + } + public String getClientId() { return clientId; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 3ff40eb669e14..4e3cd0affc640 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -16,12 +16,6 @@ */ package org.apache.camel.component.kafka; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutorService; - import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; @@ -30,23 +24,33 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.*; /** * */ public class KafkaConsumer extends DefaultConsumer { + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class); + protected ExecutorService executor; private final KafkaEndpoint endpoint; private final Processor processor; - - private ConsumerConnector consumer; + private Map consumerBarriers; public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; - if (endpoint.getZookeeperConnect() == null) { + this.consumerBarriers = new HashMap(); + if (endpoint.getZookeeperConnect() == null) { throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified"); } if (endpoint.getGroupId() == null) { @@ -65,27 +69,38 @@ Properties getProps() { protected void doStart() throws Exception { super.doStart(); log.info("Starting Kafka consumer"); - - consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps())); - - Map topicCountMap = new HashMap(); - topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams()); - Map>> consumerMap = consumer.createMessageStreams(topicCountMap); - List> streams = consumerMap.get(endpoint.getTopic()); - executor = endpoint.createExecutor(); - for (final KafkaStream stream : streams) { - executor.submit(new ConsumerTask(stream)); + + for (int i = 0; i < endpoint.getConsumersCount(); i++) { + ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps())); + Map topicCountMap = new HashMap(); + topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams()); + Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + List> streams = consumerMap.get(endpoint.getTopic()); + if (endpoint.isAutoCommitEnable() != null && Boolean.FALSE == endpoint.isAutoCommitEnable().booleanValue()) { + CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer)); + for (final KafkaStream stream : streams) { + executor.submit(new BatchingConsumerTask(stream, barrier)); + } + consumerBarriers.put(consumer, barrier); + } else{ + for (final KafkaStream stream : streams) { + executor.submit(new AutoCommitConsumerTask(stream)); + } + consumerBarriers.put(consumer, null); + } } + } @Override protected void doStop() throws Exception { super.doStop(); log.info("Stopping Kafka consumer"); - - if (consumer != null) { - consumer.shutdown(); + for (ConsumerConnector consumer : consumerBarriers.keySet()) { + if (consumer != null) { + consumer.shutdown(); + } } if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { @@ -97,11 +112,63 @@ protected void doStop() throws Exception { executor = null; } - class ConsumerTask implements Runnable { + class BatchingConsumerTask implements Runnable { + + private KafkaStream stream; + private CyclicBarrier berrier; + + public BatchingConsumerTask(KafkaStream stream, CyclicBarrier berrier) { + this.stream = stream; + this.berrier = berrier; + } + + public void run() { + ConsumerIterator it = stream.iterator(); + int processed = 0; + while (it.hasNext()) { + MessageAndMetadata mm = it.next(); + Exchange exchange = endpoint.createKafkaExchange(mm); + try { + processor.process(exchange); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + processed++; + if (processed >= endpoint.getBatchSize()) { + try { + berrier.await(10, TimeUnit.SECONDS); + processed = 0; + } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); + } catch (BrokenBarrierException e) { + LOG.error(e.getMessage(), e); + } catch (TimeoutException e) { + LOG.error(e.getMessage(), e); + } + } + } + } + } + + class CommitOffsetTask implements Runnable { + + private ConsumerConnector consumer; + + public CommitOffsetTask(ConsumerConnector consumer) { + this.consumer = consumer; + } + + @Override + public void run() { + consumer.commitOffsets(); + } + } + + class AutoCommitConsumerTask implements Runnable { private KafkaStream stream; - public ConsumerTask(KafkaStream stream) { + public AutoCommitConsumerTask(KafkaStream stream) { this.stream = stream; } @@ -113,7 +180,7 @@ public void run() { try { processor.process(exchange); } catch (Exception e) { - e.printStackTrace(); + LOG.error(e.getMessage(), e); } } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index af1bcd1b77add..1ca50d9348834 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -162,6 +162,22 @@ public void setConsumerStreams(int consumerStreams) { configuration.setConsumerStreams(consumerStreams); } + public int getBatchSize() { + return configuration.getBatchSize(); + } + + public void setBatchSize(int batchSize) { + this.configuration.setBatchSize(batchSize); + } + + public int getConsumersCount() { + return this.configuration.getConsumersCount(); + } + + public void setConsumersCount(int consumersCount) { + this.configuration.setConsumersCount(consumersCount); + } + public void setConsumerTimeoutMs(int consumerTimeoutMs) { configuration.setConsumerTimeoutMs(consumerTimeoutMs); } @@ -310,7 +326,7 @@ public int getRebalanceMaxRetries() { return configuration.getRebalanceMaxRetries(); } - public boolean isAutoCommitEnable() { + public Boolean isAutoCommitEnable() { return configuration.isAutoCommitEnable(); } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java new file mode 100644 index 0000000000000..b1816101afb44 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import org.apache.camel.component.kafka.embedded.EmbeddedKafkaCluster; +import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +public class BaseEmbeddedKafkaTest extends CamelTestSupport { + + static EmbeddedZookeeper embeddedZookeeper; + static EmbeddedKafkaCluster embeddedKafkaCluster; + + @BeforeClass + public static void beforeClass(){ + embeddedZookeeper = new EmbeddedZookeeper(2181); + List kafkaPorts = new ArrayList(); + // -1 for any available port + kafkaPorts.add(9092); + embeddedKafkaCluster = new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), kafkaPorts); + try { + embeddedZookeeper.startup(); + } catch (IOException e) { + e.printStackTrace(); + } + System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection()); + embeddedKafkaCluster.startup(); + System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList()); + } + + @AfterClass + public static void afterClass(){ + embeddedKafkaCluster.shutdown(); + embeddedZookeeper.shutdown(); + } + +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java new file mode 100644 index 0000000000000..2b4072434d1b6 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerBatchSizeTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Properties; + +public class KafkaConsumerBatchSizeTest extends BaseEmbeddedKafkaTest { + + public static final String TOPIC = "test"; + + @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181&" + + "groupId=group1&autoOffsetReset=smallest&" + + "autoCommitEnable=false&batchSize=3&consumerStreams=1") + private Endpoint from; + + @EndpointInject(uri = "mock:result") + private MockEndpoint to; + + private Producer producer; + + @Before + public void before() { + Properties props = new Properties(); + props.put("metadata.broker.list", "localhost:9092"); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("partitioner.class", "org.apache.camel.component.kafka.SimplePartitioner"); + props.put("request.required.acks", "1"); + + ProducerConfig config = new ProducerConfig(props); + producer = new Producer(config); + } + + @After + public void after() { + producer.close(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(from).autoStartup(true).to(to).setId("First"); + } + }; + } + + @Test + public void kafkaMessagesIsConsumedByCamel() throws Exception { + //First 5 must not be committed since batch size is 3 + to.expectedMessageCount(2); + to.expectedBodiesReceivedInAnyOrder("m1", "m2"); + for (int k = 1; k <= 2; k++) { + String msg = "m" + k; + KeyedMessage data = new KeyedMessage(TOPIC, "1", msg); + producer.send(data); + } + to.assertIsSatisfied(3000); + + //Restart endpoint, + from.getCamelContext().stop(); + from.getCamelContext().start(); + + to.reset(); + to.expectedMessageCount(10); + to.expectedBodiesReceivedInAnyOrder("m1", "m2", "m3", "m4", "m5", "m6", "m7", "m8", "m9", "m10"); + + //Second route must wake up and consume all from scratch and commit 9 consumed + for (int k = 3; k <=10; k++) { + String msg = "m" + k; + KeyedMessage data = new KeyedMessage(TOPIC, "1", msg); + producer.send(data); + } + + to.assertIsSatisfied(3000); + + //Restart endpoint, + from.getCamelContext().stop(); + from.getCamelContext().start(); + + to.reset(); + + //Only one message should left to consume by this consumer group + to.expectedMessageCount(1); + } +} + diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java similarity index 86% rename from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java rename to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java index 5a4baf73101d3..922ffa664963c 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java @@ -27,20 +27,16 @@ import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.After; import org.junit.Before; import org.junit.Test; -/** - * The Producer IT tests require a Kafka broker running on 9092 and a zookeeper instance running on 2181. - * The broker must have a topic called test created. - */ -public class KafkaConsumerIT extends CamelTestSupport { +public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { public static final String TOPIC = "test"; - @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1") + @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC + "&zookeeperHost=localhost&zookeeperPort=2181" + + "&groupId=group1&autoOffsetReset=smallest") private Endpoint from; @EndpointInject(uri = "mock:result") @@ -79,7 +75,7 @@ public void configure() throws Exception { @Test public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException { to.expectedMessageCount(5); - to.expectedBodiesReceived("message-0", "message-1", "message-2", "message-3", "message-4"); + to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); for (int k = 0; k < 5; k++) { String msg = "message-" + k; KeyedMessage data = new KeyedMessage(TOPIC, "1", msg); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java similarity index 93% rename from components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java rename to components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java index f77d91a31a49e..b86e40257a1c9 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java @@ -36,26 +36,23 @@ import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * The Producer IT tests require a Kafka broker running on 9092 and a zookeeper instance running on 2181. - * The broker must have a topic called test created. - */ -public class KafkaProducerIT extends CamelTestSupport { + +public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest { public static final String TOPIC = "test"; public static final String TOPIC_IN_HEADER = "testHeader"; - private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerIT.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerFullTest.class); @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC - + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder&requestRequiredAcks=1") + + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder" + + "&requestRequiredAcks=-1") private Endpoint to; @Produce(uri = "direct:start") @@ -71,6 +68,7 @@ public void before() { props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); + props.put("auto.offset.reset", "smallest"); kafkaConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); } @@ -83,7 +81,6 @@ public void after() { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { - @Override public void configure() throws Exception { from("direct:start").to(to); @@ -108,7 +105,7 @@ public void producedMessageIsReceivedByKafka() throws InterruptedException, IOEx boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS); - assertTrue("Not all messages were published to the kafka topics", allMessagesReceived); + assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived); } private void createKafkaMessageConsumer(CountDownLatch messagesLatch, Map topicCountMap) { diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java new file mode 100644 index 0000000000000..bf32064badda5 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.embedded; + +import kafka.admin.AdminUtils; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import org.I0Itec.zkclient.ZkClient; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.*; + +public class EmbeddedKafkaCluster { + private final List ports; + private final String zkConnection; + private final Properties baseProperties; + + private final String brokerList; + + private final List brokers; + private final List logDirs; + + public EmbeddedKafkaCluster(String zkConnection) { + this(zkConnection, new Properties()); + } + + public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties) { + this(zkConnection, baseProperties, Collections.singletonList(-1)); + } + + public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, List ports) { + this.zkConnection = zkConnection; + this.ports = resolvePorts(ports); + this.baseProperties = baseProperties; + this.brokers = new ArrayList(); + this.logDirs = new ArrayList(); + + this.brokerList = constructBrokerList(this.ports); + } + + public ZkClient getZkClient(){ + for(KafkaServer server : brokers){ + return server.zkClient(); + } + return null; + } + + public void createTopics(String...topics){ + for(String topic : topics){ + AdminUtils.createTopic(getZkClient(), topic, 2, 1, new Properties()); + } + } + + private List resolvePorts(List ports) { + List resolvedPorts = new ArrayList(); + for (Integer port : ports) { + resolvedPorts.add(resolvePort(port)); + } + return resolvedPorts; + } + + private int resolvePort(int port) { + if (port == -1) { + return TestUtils.getAvailablePort(); + } + return port; + } + + private String constructBrokerList(List ports) { + StringBuilder sb = new StringBuilder(); + for (Integer port : ports) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append("localhost:").append(port); + } + return sb.toString(); + } + + public void startup() { + for (int i = 0; i < ports.size(); i++) { + Integer port = ports.get(i); + File logDir = TestUtils.constructTempDir("kafka-local"); + + Properties properties = new Properties(); + properties.putAll(baseProperties); + properties.setProperty("zookeeper.connect", zkConnection); + properties.setProperty("broker.id", String.valueOf(i + 1)); + properties.setProperty("host.name", "localhost"); + properties.setProperty("port", Integer.toString(port)); + properties.setProperty("log.dir", logDir.getAbsolutePath()); + properties.setProperty("num.partitions", String.valueOf(1)); + properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE)); + System.out.println("EmbeddedKafkaCluster: local directory: " + logDir.getAbsolutePath()); + properties.setProperty("log.flush.interval.messages", String.valueOf(1)); + + KafkaServer broker = startBroker(properties); + + brokers.add(broker); + logDirs.add(logDir); + } + } + + + private KafkaServer startBroker(Properties props) { + KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime()); + server.startup(); + return server; + } + + public Properties getProps() { + Properties props = new Properties(); + props.putAll(baseProperties); + props.put("metadata.broker.list", brokerList); + props.put("zookeeper.connect", zkConnection); + return props; + } + + public String getBrokerList() { + return brokerList; + } + + public List getPorts() { + return ports; + } + + public String getZkConnection() { + return zkConnection; + } + + public void shutdown() { + for (KafkaServer broker : brokers) { + try { + broker.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + for (File logDir : logDirs) { + try { + TestUtils.deleteFile(logDir); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("EmbeddedKafkaCluster{"); + sb.append("brokerList='").append(brokerList).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java new file mode 100644 index 0000000000000..ef8d55e2dcb0a --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.embedded; + +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetSocketAddress; + +public class EmbeddedZookeeper { + private int port = -1; + private int tickTime = 500; + + private ServerCnxnFactory factory; + private File snapshotDir; + private File logDir; + + public EmbeddedZookeeper() { + this(-1); + } + + public EmbeddedZookeeper(int port) { + this(port, 500); + } + + public EmbeddedZookeeper(int port, int tickTime) { + this.port = resolvePort(port); + this.tickTime = tickTime; + } + + private int resolvePort(int port) { + if (port == -1) { + return TestUtils.getAvailablePort(); + } + return port; + } + + public void startup() throws IOException{ + if (this.port == -1) { + this.port = TestUtils.getAvailablePort(); + } + this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), 1024); + this.snapshotDir = TestUtils.constructTempDir("embeeded-zk/snapshot"); + this.logDir = TestUtils.constructTempDir("embeeded-zk/log"); + + try { + factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + + public void shutdown() { + factory.shutdown(); + try { + TestUtils.deleteFile(snapshotDir); + } catch (FileNotFoundException e) { + // ignore + } + try { + TestUtils.deleteFile(logDir); + } catch (FileNotFoundException e) { + // ignore + } + } + + public String getConnection() { + return "localhost:" + port; + } + + public void setPort(int port) { + this.port = port; + } + + public void setTickTime(int tickTime) { + this.tickTime = tickTime; + } + + public int getPort() { + return port; + } + + public int getTickTime() { + return tickTime; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("EmbeddedZookeeper{"); + sb.append("connection=").append(getConnection()); + sb.append('}'); + return sb.toString(); + } +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java new file mode 100644 index 0000000000000..7ad496528f475 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/SystemTime.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.embedded; + +import kafka.utils.Time; + +class SystemTime implements Time { + public long milliseconds() { + return System.currentTimeMillis(); + } + + public long nanoseconds() { + return System.nanoTime(); + } + + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // Ignore + } + } +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java new file mode 100644 index 0000000000000..26574e55218be --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/TestUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.embedded; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Random; + +class TestUtils { + private static final Random RANDOM = new Random(); + + private TestUtils() { + } + + public static File constructTempDir(String dirPrefix) { + File file = new File(System.getProperty("java.io.tmpdir"), dirPrefix + RANDOM.nextInt(10000000)); + if (!file.mkdirs()) { + throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath()); + } + file.deleteOnExit(); + return file; + } + + public static int getAvailablePort() { + try { + ServerSocket socket = new ServerSocket(0); + try { + return socket.getLocalPort(); + } finally { + socket.close(); + } + } catch (IOException e) { + throw new IllegalStateException("Cannot find available port: " + e.getMessage(), e); + } + } + + public static boolean deleteFile(File path) throws FileNotFoundException { + if (!path.exists()) { + throw new FileNotFoundException(path.getAbsolutePath()); + } + boolean ret = true; + if (path.isDirectory()) { + for (File f : path.listFiles()) { + ret = ret && deleteFile(f); + } + } + return ret && path.delete(); + } +} diff --git a/parent/pom.xml b/parent/pom.xml index 94ed548d19480..869092adfeb5e 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -283,7 +283,7 @@ 4.11 2.5.3 1.1.3 - 0.8.1 + 0.8.1.1 0.8.1_1 2.4.0 3.0.2 From 539781ab99eb70a5881a0c5333276760294ab2ef Mon Sep 17 00:00:00 2001 From: Ivan Vasylyev Date: Mon, 1 Dec 2014 14:19:29 +0200 Subject: [PATCH 2/3] https://issues.apache.org/jira/browse/CAMEL-8085 - break iteration if interrupted or barrier is broken --- .../java/org/apache/camel/component/kafka/KafkaConsumer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 4e3cd0affc640..2711bfa877a06 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -140,8 +140,10 @@ public void run() { processed = 0; } catch (InterruptedException e) { LOG.error(e.getMessage(), e); + break; } catch (BrokenBarrierException e) { LOG.error(e.getMessage(), e); + break; } catch (TimeoutException e) { LOG.error(e.getMessage(), e); } From 0e7533dcbc4e440a4ca923541c364f0af75e9417 Mon Sep 17 00:00:00 2001 From: Ivan Vasylyev Date: Mon, 1 Dec 2014 14:31:39 +0200 Subject: [PATCH 3/3] https://issues.apache.org/jira/browse/CAMEL-8085 - make barrier await timeout configurable --- .../camel/component/kafka/KafkaConfiguration.java | 9 +++++++++ .../apache/camel/component/kafka/KafkaConsumer.java | 11 +++-------- .../apache/camel/component/kafka/KafkaEndpoint.java | 8 ++++++++ 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 7d59dd261470b..669e1a0dc370d 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -30,6 +30,7 @@ public class KafkaConfiguration { private int consumerStreams = 10; private int consumersCount = 1; private int batchSize = 100; + private int barrierAwaitTimeoutMs = 10000; //Common configuration properties private String clientId; @@ -207,6 +208,14 @@ public void setBatchSize(int batchSize) { this.batchSize = batchSize; } + public int getBarrierAwaitTimeoutMs() { + return barrierAwaitTimeoutMs; + } + + public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) { + this.barrierAwaitTimeoutMs = barrierAwaitTimeoutMs; + } + public int getConsumersCount() { return consumersCount; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 2711bfa877a06..181337629b189 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -17,7 +17,6 @@ package org.apache.camel.component.kafka; import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; @@ -123,10 +122,8 @@ public BatchingConsumerTask(KafkaStream stream, CyclicBarrier be } public void run() { - ConsumerIterator it = stream.iterator(); int processed = 0; - while (it.hasNext()) { - MessageAndMetadata mm = it.next(); + for (MessageAndMetadata mm : stream) { Exchange exchange = endpoint.createKafkaExchange(mm); try { processor.process(exchange); @@ -136,7 +133,7 @@ public void run() { processed++; if (processed >= endpoint.getBatchSize()) { try { - berrier.await(10, TimeUnit.SECONDS); + berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS); processed = 0; } catch (InterruptedException e) { LOG.error(e.getMessage(), e); @@ -175,9 +172,7 @@ public AutoCommitConsumerTask(KafkaStream stream) { } public void run() { - ConsumerIterator it = stream.iterator(); - while (it.hasNext()) { - MessageAndMetadata mm = it.next(); + for (MessageAndMetadata mm : stream) { Exchange exchange = endpoint.createKafkaExchange(mm); try { processor.process(exchange); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 1ca50d9348834..d32cb83687104 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -170,6 +170,14 @@ public void setBatchSize(int batchSize) { this.configuration.setBatchSize(batchSize); } + public int getBarrierAwaitTimeoutMs() { + return configuration.getBarrierAwaitTimeoutMs(); + } + + public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) { + this.configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs); + } + public int getConsumersCount() { return this.configuration.getConsumersCount(); }