Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handling of offset in case of auto commit is disabled to prevent data loss (CAMEL-8085) #342

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions components/camel-kafka/pom.xml
Expand Up @@ -50,6 +50,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Expand Up @@ -28,6 +28,9 @@ public class KafkaConfiguration {
private String groupId;
private String partitioner = DefaultPartitioner.class.getCanonicalName();
private int consumerStreams = 10;
private int consumersCount = 1;
private int batchSize = 100;
private int barrierAwaitTimeoutMs = 10000;

//Common configuration properties
private String clientId;
Expand Down Expand Up @@ -197,6 +200,30 @@ public void setConsumerStreams(int consumerStreams) {
this.consumerStreams = consumerStreams;
}

public int getBatchSize() {
return batchSize;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public int getBarrierAwaitTimeoutMs() {
return barrierAwaitTimeoutMs;
}

public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
this.barrierAwaitTimeoutMs = barrierAwaitTimeoutMs;
}

public int getConsumersCount() {
return consumersCount;
}

public void setConsumersCount(int consumersCount) {
this.consumersCount = consumersCount;
}

public String getClientId() {
return clientId;
}
Expand Down
Expand Up @@ -16,37 +16,40 @@
*/
package org.apache.camel.component.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.*;

/**
*
*/
public class KafkaConsumer extends DefaultConsumer {

private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);

protected ExecutorService executor;
private final KafkaEndpoint endpoint;
private final Processor processor;

private ConsumerConnector consumer;
private Map<ConsumerConnector, CyclicBarrier> consumerBarriers;

public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
this.processor = processor;
if (endpoint.getZookeeperConnect() == null) {
this.consumerBarriers = new HashMap<ConsumerConnector, CyclicBarrier>();
if (endpoint.getZookeeperConnect() == null) {
throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified");
}
if (endpoint.getGroupId() == null) {
Expand All @@ -65,27 +68,38 @@ Properties getProps() {
protected void doStart() throws Exception {
super.doStart();
log.info("Starting Kafka consumer");

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());

executor = endpoint.createExecutor();
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new ConsumerTask(stream));

for (int i = 0; i < endpoint.getConsumersCount(); i++) {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(getProps()));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(endpoint.getTopic(), endpoint.getConsumerStreams());
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
if (endpoint.isAutoCommitEnable() != null && Boolean.FALSE == endpoint.isAutoCommitEnable().booleanValue()) {
CyclicBarrier barrier = new CyclicBarrier(endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new BatchingConsumerTask(stream, barrier));
}
consumerBarriers.put(consumer, barrier);
} else{
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new AutoCommitConsumerTask(stream));
}
consumerBarriers.put(consumer, null);
}
}

}

@Override
protected void doStop() throws Exception {
super.doStop();
log.info("Stopping Kafka consumer");

if (consumer != null) {
consumer.shutdown();
for (ConsumerConnector consumer : consumerBarriers.keySet()) {
if (consumer != null) {
consumer.shutdown();
}
}
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
Expand All @@ -97,23 +111,73 @@ protected void doStop() throws Exception {
executor = null;
}

class ConsumerTask implements Runnable {
class BatchingConsumerTask implements Runnable {

private KafkaStream<byte[], byte[]> stream;
private CyclicBarrier berrier;

public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier berrier) {
this.stream = stream;
this.berrier = berrier;
}

public void run() {
int processed = 0;
for (MessageAndMetadata<byte[], byte[]> mm : stream) {
Exchange exchange = endpoint.createKafkaExchange(mm);
try {
processor.process(exchange);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
processed++;
if (processed >= endpoint.getBatchSize()) {
try {
berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
processed = 0;
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
break;
} catch (BrokenBarrierException e) {
LOG.error(e.getMessage(), e);
break;
} catch (TimeoutException e) {
LOG.error(e.getMessage(), e);
}
}
}
}
}

class CommitOffsetTask implements Runnable {

private ConsumerConnector consumer;

public CommitOffsetTask(ConsumerConnector consumer) {
this.consumer = consumer;
}

@Override
public void run() {
consumer.commitOffsets();
}
}

class AutoCommitConsumerTask implements Runnable {

private KafkaStream<byte[], byte[]> stream;

public ConsumerTask(KafkaStream<byte[], byte[]> stream) {
public AutoCommitConsumerTask(KafkaStream<byte[], byte[]> stream) {
this.stream = stream;
}

public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> mm = it.next();
for (MessageAndMetadata<byte[], byte[]> mm : stream) {
Exchange exchange = endpoint.createKafkaExchange(mm);
try {
processor.process(exchange);
} catch (Exception e) {
e.printStackTrace();
LOG.error(e.getMessage(), e);
}
}
}
Expand Down
Expand Up @@ -162,6 +162,30 @@ public void setConsumerStreams(int consumerStreams) {
configuration.setConsumerStreams(consumerStreams);
}

public int getBatchSize() {
return configuration.getBatchSize();
}

public void setBatchSize(int batchSize) {
this.configuration.setBatchSize(batchSize);
}

public int getBarrierAwaitTimeoutMs() {
return configuration.getBarrierAwaitTimeoutMs();
}

public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
this.configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
}

public int getConsumersCount() {
return this.configuration.getConsumersCount();
}

public void setConsumersCount(int consumersCount) {
this.configuration.setConsumersCount(consumersCount);
}

public void setConsumerTimeoutMs(int consumerTimeoutMs) {
configuration.setConsumerTimeoutMs(consumerTimeoutMs);
}
Expand Down Expand Up @@ -310,7 +334,7 @@ public int getRebalanceMaxRetries() {
return configuration.getRebalanceMaxRetries();
}

public boolean isAutoCommitEnable() {
public Boolean isAutoCommitEnable() {
return configuration.isAutoCommitEnable();
}

Expand Down
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kafka;

import org.apache.camel.component.kafka.embedded.EmbeddedKafkaCluster;
import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class BaseEmbeddedKafkaTest extends CamelTestSupport {

static EmbeddedZookeeper embeddedZookeeper;
static EmbeddedKafkaCluster embeddedKafkaCluster;

@BeforeClass
public static void beforeClass(){
embeddedZookeeper = new EmbeddedZookeeper(2181);
List<Integer> kafkaPorts = new ArrayList<Integer>();
// -1 for any available port
kafkaPorts.add(9092);
embeddedKafkaCluster = new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), kafkaPorts);
try {
embeddedZookeeper.startup();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
embeddedKafkaCluster.startup();
System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
}

@AfterClass
public static void afterClass(){
embeddedKafkaCluster.shutdown();
embeddedZookeeper.shutdown();
}

}