From ab19a949b1f738a620f5fdc9102c147cc3d05c0f Mon Sep 17 00:00:00 2001 From: narendra_bidari Date: Fri, 1 Jul 2016 20:44:08 -0700 Subject: [PATCH 1/5] Sending the Data to Kafka as a batch, instead of sending one tuple each. 0.10 version uses Producer.scala class which has two methods, we are using now the second method which sends in batch. --- .../kafka/trident/TridentKafkaState.java | 54 ++++++++++++------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java index 402ffb173cb..2cffd22b88a 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java @@ -31,6 +31,7 @@ import storm.trident.state.State; import storm.trident.tuple.TridentTuple; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -76,24 +77,39 @@ public void prepare(Map stormConf) { producer = new Producer(config); } - public void updateState(List tuples, TridentCollector collector) { - String topic = null; - for (TridentTuple tuple : tuples) { - try { - topic = topicSelector.getTopic(tuple); + @SuppressWarnings("rawtypes") + public void updateState(List tuples, TridentCollector collector) { + String topic = null; + List batchList = new ArrayList(tuples.size()); + // Creating Batch + for (TridentTuple tuple : tuples) { + try { + topic = topicSelector.getTopic(tuple); + batchList + .add(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple), + mapper.getMessageFromTuple(tuple))); + LOG.debug("Updated Batch"); + } catch (Exception ex) { + String errorMsg = "Error while filling up List for Batching"; + LOG.warn(errorMsg, ex); + throw new FailedException(errorMsg, ex); + } + } + // Sending Batch + try { + if (batchList != null) { + producer.send(batchList); + LOG.debug("Sending the Batch " + batchList.hashCode()); + } else { + LOG.warn("BatchList is null " + batchList); + } + } catch (Exception ex) { + String errorMsg = "Could not send messages = " + tuples + " to topic = " + + topic; + LOG.warn(errorMsg, ex); + throw new FailedException(errorMsg, ex); + } - if(topic != null) { - producer.send(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple), - mapper.getMessageFromTuple(tuple))); - } else { - LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); - } - } catch (Exception ex) { - String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple) - + " to topic = " + topic; - LOG.warn(errorMsg, ex); - throw new FailedException(errorMsg, ex); - } - } - } + } + } From 518e402e96ea1202f55813f8b94467fbcf93c725 Mon Sep 17 00:00:00 2001 From: narendra_bidari Date: Fri, 1 Jul 2016 20:52:44 -0700 Subject: [PATCH 2/5] Had missed the topic null check during sending message to kafka, added it back --- .../jvm/storm/kafka/trident/TridentKafkaState.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java index 2cffd22b88a..d2e37b6d237 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java @@ -85,10 +85,13 @@ public void updateState(List tuples, TridentCollector collector) { for (TridentTuple tuple : tuples) { try { topic = topicSelector.getTopic(tuple); - batchList - .add(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple), - mapper.getMessageFromTuple(tuple))); - LOG.debug("Updated Batch"); + if (topic != null) { + batchList.add( + new KeyedMessage(topic, mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); + LOG.debug("Updated Batch"); + } else { + LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); + } } catch (Exception ex) { String errorMsg = "Error while filling up List for Batching"; LOG.warn(errorMsg, ex); @@ -104,8 +107,7 @@ public void updateState(List tuples, TridentCollector collector) { LOG.warn("BatchList is null " + batchList); } } catch (Exception ex) { - String errorMsg = "Could not send messages = " + tuples + " to topic = " - + topic; + String errorMsg = "Could not send messages = " + tuples + " to topic = " + topic; LOG.warn(errorMsg, ex); throw new FailedException(errorMsg, ex); } From 7b9fac12dd0dc9c583e35f345417863818e45e61 Mon Sep 17 00:00:00 2001 From: narendra_bidari Date: Mon, 4 Jul 2016 02:27:45 -0700 Subject: [PATCH 3/5] Replaced tabs with spaces --- .../kafka/trident/TridentKafkaState.java | 73 +++++++++---------- 1 file changed, 36 insertions(+), 37 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java index d2e37b6d237..c734e3a95ca 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java @@ -77,41 +77,40 @@ public void prepare(Map stormConf) { producer = new Producer(config); } - @SuppressWarnings("rawtypes") - public void updateState(List tuples, TridentCollector collector) { - String topic = null; - List batchList = new ArrayList(tuples.size()); - // Creating Batch - for (TridentTuple tuple : tuples) { - try { - topic = topicSelector.getTopic(tuple); - if (topic != null) { - batchList.add( - new KeyedMessage(topic, mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); - LOG.debug("Updated Batch"); - } else { - LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); - } - } catch (Exception ex) { - String errorMsg = "Error while filling up List for Batching"; - LOG.warn(errorMsg, ex); - throw new FailedException(errorMsg, ex); - } - } - // Sending Batch - try { - if (batchList != null) { - producer.send(batchList); - LOG.debug("Sending the Batch " + batchList.hashCode()); - } else { - LOG.warn("BatchList is null " + batchList); - } - } catch (Exception ex) { - String errorMsg = "Could not send messages = " + tuples + " to topic = " + topic; - LOG.warn(errorMsg, ex); - throw new FailedException(errorMsg, ex); - } - - } - + @SuppressWarnings({"rawtypes", "unchecked", "unused"}) + public void updateState(List tuples, TridentCollector collector) { + String topic = null; + List batchList = new ArrayList(tuples.size()); + // Creating Batch + for (TridentTuple tuple : tuples) { + try { + topic = topicSelector.getTopic(tuple); + if (topic != null) { + batchList.add(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple), + mapper.getMessageFromTuple(tuple))); + LOG.debug("Updated Batch"); + } else { + LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + + ", topic selector returned null."); + } + } catch (Exception ex) { + String errorMsg = "Error while filling up List for Batching"; + LOG.warn(errorMsg, ex); + throw new FailedException(errorMsg, ex); + } + } + // Sending Batch + try { + if (batchList != null) { + producer.send(batchList); + LOG.debug("Sending the Batch " + batchList.hashCode()); + } else { + LOG.warn("BatchList is null " + batchList); + } + } catch (Exception ex) { + String errorMsg = "Could not send messages = " + tuples + " to topic = " + topic; + LOG.warn(errorMsg, ex); + throw new FailedException(errorMsg, ex); + } + } } From 2156995c7f196e9e9c7b02c9b14975429f99dfe9 Mon Sep 17 00:00:00 2001 From: narendra_bidari Date: Tue, 5 Jul 2016 10:56:42 -0700 Subject: [PATCH 4/5] Removed the unused check, for batchlist null --- .../src/jvm/storm/kafka/trident/TridentKafkaState.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java index c734e3a95ca..4048a2a9a32 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java @@ -77,7 +77,7 @@ public void prepare(Map stormConf) { producer = new Producer(config); } - @SuppressWarnings({"rawtypes", "unchecked", "unused"}) + @SuppressWarnings({"rawtypes", "unchecked"}) public void updateState(List tuples, TridentCollector collector) { String topic = null; List batchList = new ArrayList(tuples.size()); @@ -101,12 +101,8 @@ public void updateState(List tuples, TridentCollector collector) { } // Sending Batch try { - if (batchList != null) { - producer.send(batchList); - LOG.debug("Sending the Batch " + batchList.hashCode()); - } else { - LOG.warn("BatchList is null " + batchList); - } + producer.send(batchList); + LOG.debug("Sending the Batch " + batchList); } catch (Exception ex) { String errorMsg = "Could not send messages = " + tuples + " to topic = " + topic; LOG.warn(errorMsg, ex); From 43803715cd3734e56d46bd8d27816707c0cb44db Mon Sep 17 00:00:00 2001 From: narendra_bidari Date: Tue, 5 Jul 2016 22:44:53 -0700 Subject: [PATCH 5/5] Removed extra logs as per previous discussion --- .../src/jvm/storm/kafka/trident/TridentKafkaState.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java index 4048a2a9a32..5551e8354cd 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java @@ -88,7 +88,6 @@ public void updateState(List tuples, TridentCollector collector) { if (topic != null) { batchList.add(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); - LOG.debug("Updated Batch"); } else { LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); @@ -102,7 +101,7 @@ public void updateState(List tuples, TridentCollector collector) { // Sending Batch try { producer.send(batchList); - LOG.debug("Sending the Batch " + batchList); + LOG.debug("Sending the Batch " + batchList.size()); } catch (Exception ex) { String errorMsg = "Could not send messages = " + tuples + " to topic = " + topic; LOG.warn(errorMsg, ex);