From 5d499854bf4b236b2d5fc7078d7fc5343d89dbeb Mon Sep 17 00:00:00 2001 From: Pete Prokopowicz Date: Wed, 23 Sep 2015 10:09:23 -0500 Subject: [PATCH 1/3] advance kafka offset when deserializer yields no object --- .../src/jvm/storm/kafka/PartitionManager.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 052d5251f93..fc757aa99bc 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -136,15 +136,15 @@ public EmitState next(SpoutOutputCollector collector) { return EmitState.NO_EMITTED; } Iterable> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); - if (tups != null) { - if(_spoutConfig.topicAsStreamId) { - for (List tup : tups) { - collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset)); - } - } else { - for (List tup : tups) { - collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); - } + if ((tups != null) && (tups.size() > 0)) { + if(_spoutConfig.topicAsStreamId) { + for (List tup : tups) { + collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset)); + } + } else { + for (List tup : tups) { + collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); + } } break; } else { From 8c0dcb1fede52512f2599ae4732372346bb36ba3 Mon Sep 17 00:00:00 2001 From: Pete Prokopowicz Date: Wed, 23 Sep 2015 12:30:06 -0500 Subject: [PATCH 2/3] fixes incorrect use of iterator in earlier commit 5d499854bf4b236b2d5fc7078d7fc5343d89dbeb --- external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index fc757aa99bc..afff87c65d7 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -136,7 +136,7 @@ public EmitState next(SpoutOutputCollector collector) { return EmitState.NO_EMITTED; } Iterable> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); - if ((tups != null) && (tups.size() > 0)) { + if ((tups != null) && tups.iterator.hasNext()) { if(_spoutConfig.topicAsStreamId) { for (List tup : tups) { collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset)); From 77b171107ba52daac34ebf3d8c59d7643ecb6408 Mon Sep 17 00:00:00 2001 From: Pete Prokopowicz Date: Thu, 1 Oct 2015 16:48:14 -0500 Subject: [PATCH 3/3] fixed typo in PartitionManager --- external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index afff87c65d7..10405fad633 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -136,7 +136,7 @@ public EmitState next(SpoutOutputCollector collector) { return EmitState.NO_EMITTED; } Iterable> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); - if ((tups != null) && tups.iterator.hasNext()) { + if ((tups != null) && tups.iterator().hasNext()) { if(_spoutConfig.topicAsStreamId) { for (List tup : tups) { collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset));