From 9319ab137efa9d606e96d1fc0504502bbaec87e6 Mon Sep 17 00:00:00 2001 From: zhuol Date: Wed, 27 May 2015 10:48:39 -0500 Subject: [PATCH] storm-826, update KafkaBolt to use the new kafka producer APIs Deal with the producer cannot close issue Able to pass unit tests and add callback async Update the producer api in KafkaUtilsTest. Address comments and add test options for async and fireAndForget --- external/storm-kafka/pom.xml | 10 ++- .../src/jvm/storm/kafka/bolt/KafkaBolt.java | 77 ++++++++++++++++--- .../src/test/storm/kafka/KafkaUtilsTest.java | 32 +++++--- .../test/storm/kafka/bolt/KafkaBoltTest.java | 74 +++++++++++++++--- 4 files changed, 157 insertions(+), 36 deletions(-) diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index 0aba519bb8d..c731a0f7b67 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -113,8 +113,8 @@ org.apache.kafka - kafka_2.9.2 - 0.8.1.1 + kafka_2.10 + 0.8.2.1 provided @@ -128,6 +128,12 @@ + + org.apache.kafka + kafka-clients + 0.8.2.1 + provided + org.apache.storm storm-core diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java index 714ecd3b7e3..738b358c9cb 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java @@ -23,16 +23,18 @@ import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import backtype.storm.utils.TupleUtils; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import storm.kafka.bolt.mapper.TupleToKafkaMapper; import storm.kafka.bolt.selector.DefaultTopicSelector; import storm.kafka.bolt.selector.KafkaTopicSelector; - +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; import java.util.Map; import java.util.Properties; @@ -45,6 +47,10 @@ * 'kafka.broker.properties' and 'topic' *

* respectively. + *

+ * This bolt uses 0.8.2 Kafka Producer API. + *

+ * It works for sending tuples to older Kafka version (0.8.1). */ public class KafkaBolt extends BaseRichBolt { @@ -53,10 +59,18 @@ public class KafkaBolt extends BaseRichBolt { public static final String TOPIC = "topic"; public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties"; - private Producer producer; + private KafkaProducer producer; private OutputCollector collector; private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; + /** + * With default setting for fireAndForget and async, the callback is called when the sending succeeds. + * By setting fireAndForget true, the send will not wait at all for kafka to ack. + * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set. + * By setting async false, synchronous sending is used. + */ + private boolean fireAndForget = false; + private boolean async = true; public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper mapper) { this.mapper = mapper; @@ -83,18 +97,16 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES); Properties properties = new Properties(); properties.putAll(configMap); - ProducerConfig config = new ProducerConfig(properties); - producer = new Producer(config); + producer = new KafkaProducer(properties); this.collector = collector; } @Override - public void execute(Tuple input) { + public void execute(final Tuple input) { if (TupleUtils.isTick(input)) { collector.ack(input); return; // Do not try to send ticks to Kafka } - K key = null; V message = null; String topic = null; @@ -102,12 +114,40 @@ public void execute(Tuple input) { key = mapper.getKeyFromTuple(input); message = mapper.getMessageFromTuple(input); topic = topicSelector.getTopic(input); - if(topic != null ) { - producer.send(new KeyedMessage(topic, key, message)); + if (topic != null ) { + Callback callback = null; + + if (!fireAndForget && async) { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata ignored, Exception e) { + synchronized (collector) { + if (e != null) { + collector.reportError(e); + collector.fail(input); + } else { + collector.ack(input); + } + } + } + }; + } + Future result = producer.send(new ProducerRecord(topic, key, message), callback); + if (!async) { + try { + result.get(); + collector.ack(input); + } catch (ExecutionException err) { + collector.reportError(err); + collector.fail(input); + } + } else if (fireAndForget) { + collector.ack(input); + } } else { LOG.warn("skipping key = " + key + ", topic selector returned null."); + collector.ack(input); } - collector.ack(input); } catch (Exception ex) { collector.reportError(ex); collector.fail(input); @@ -118,4 +158,17 @@ public void execute(Tuple input) { public void declareOutputFields(OutputFieldsDeclarer declarer) { } + + @Override + public void cleanup() { + producer.close(); + } + + public void setFireAndForget(boolean fireAndForget) { + this.fireAndForget = fireAndForget; + } + + public void setAsync(boolean async) { + this.async = async; + } } diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java index 965eaeae609..a2d2af8d25a 100644 --- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java @@ -23,13 +23,14 @@ import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.javaapi.producer.Producer; import kafka.message.MessageAndOffset; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.Assert; import storm.kafka.trident.GlobalPartitionInformation; import java.util.List; @@ -39,9 +40,11 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaUtilsTest { + private static final Logger LOG = LoggerFactory.getLogger(KafkaUtilsTest.class); private KafkaTestBroker broker; private SimpleConsumer simpleConsumer; private KafkaConfig config; @@ -182,7 +185,6 @@ private void runGetValueOnlyTuplesTest() { } } - private void createTopicAndSendMessage() { createTopicAndSendMessage(null, "someValue"); } @@ -193,14 +195,22 @@ private void createTopicAndSendMessage(String value) { private void createTopicAndSendMessage(String key, String value) { Properties p = new Properties(); - p.setProperty("metadata.broker.list", broker.getBrokerConnectionString()); - p.setProperty("serializer.class", "kafka.serializer.StringEncoder"); - ProducerConfig producerConfig = new ProducerConfig(p); - Producer producer = new Producer(producerConfig); - producer.send(new KeyedMessage(config.topic, key, value)); + p.put("serializer.class", "kafka.serializer.StringEncoder"); + p.put("bootstrap.servers", broker.getBrokerConnectionString()); + p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + p.put("metadata.fetch.timeout.ms", 1000); + KafkaProducer producer = new KafkaProducer(p); + try { + producer.send(new ProducerRecord(config.topic, key, value)).get(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + LOG.error("Failed to do synchronous sending due to " + e, e); + } finally { + producer.close(); + } } - @Test public void assignOnePartitionPerTask() { runPartitionToTaskMappingTest(16, 1); diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java index 576cc12c0f3..05d138b6dc7 100644 --- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java +++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java @@ -34,9 +34,7 @@ import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import storm.kafka.*; @@ -77,9 +75,9 @@ public void initMocks() { public void shutdown() { simpleConsumer.close(); broker.shutdown(); + bolt.cleanup(); } - private void setupKafkaConsumer() { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(); globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString())); @@ -110,15 +108,60 @@ public void executeWithKey() throws Exception { verifyMessage(key, message); } + /* test synchronous sending */ + @Test + public void executeWithByteArrayKeyAndMessageSync() { + boolean async = false; + boolean fireAndForget = false; + bolt = generateDefaultSerializerBolt(async, fireAndForget); + String keyString = "test-key"; + String messageString = "test-message"; + byte[] key = keyString.getBytes(); + byte[] message = messageString.getBytes(); + Tuple tuple = generateTestTuple(key, message); + bolt.execute(tuple); + verify(collector).ack(tuple); + verifyMessage(keyString, messageString); + } + + /* test asynchronous sending (default) */ + @Test + public void executeWithByteArrayKeyAndMessageAsync() { + boolean async = true; + boolean fireAndForget = false; + bolt = generateDefaultSerializerBolt(async, fireAndForget); + String keyString = "test-key"; + String messageString = "test-message"; + byte[] key = keyString.getBytes(); + byte[] message = messageString.getBytes(); + Tuple tuple = generateTestTuple(key, message); + bolt.execute(tuple); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + verify(collector).ack(tuple); + verifyMessage(keyString, messageString); + } + + /* test with fireAndForget option enabled */ @Test - public void executeWithByteArrayKeyAndMessage() { - bolt = generateDefaultSerializerBolt(); + public void executeWithByteArrayKeyAndMessageFire() { + boolean async = true; + boolean fireAndForget = true; + bolt = generateDefaultSerializerBolt(async, fireAndForget); String keyString = "test-key"; String messageString = "test-message"; byte[] key = keyString.getBytes(); byte[] message = messageString.getBytes(); Tuple tuple = generateTestTuple(key, message); bolt.execute(tuple); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } verify(collector).ack(tuple); verifyMessage(keyString, messageString); } @@ -126,21 +169,32 @@ public void executeWithByteArrayKeyAndMessage() { private KafkaBolt generateStringSerializerBolt() { KafkaBolt bolt = new KafkaBolt(); Properties props = new Properties(); - props.put("metadata.broker.list", broker.getBrokerConnectionString()); props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("bootstrap.servers", broker.getBrokerConnectionString()); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("metadata.fetch.timeout.ms", 1000); config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); bolt.prepare(config, null, new OutputCollector(collector)); + bolt.setAsync(false); return bolt; } - private KafkaBolt generateDefaultSerializerBolt() { + private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget) { KafkaBolt bolt = new KafkaBolt(); Properties props = new Properties(); - props.put("metadata.broker.list", broker.getBrokerConnectionString()); props.put("request.required.acks", "1"); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("bootstrap.servers", broker.getBrokerConnectionString()); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("metadata.fetch.timeout.ms", 1000); + props.put("linger.ms", 0); config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); bolt.prepare(config, null, new OutputCollector(collector)); + bolt.setAsync(async); + bolt.setFireAndForget(fireAndForget); return bolt; } @@ -163,7 +217,6 @@ public void executeWithBrokerDown() throws Exception { verify(collector).fail(tuple); } - private boolean verifyMessage(String key, String message) { long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1; ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer, @@ -211,5 +264,4 @@ private Tuple mockTickTuple() { assertTrue(TupleUtils.isTick(tuple)); return tuple; } - }