diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 052d5251f93..10405fad633 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -136,15 +136,15 @@ public EmitState next(SpoutOutputCollector collector) { return EmitState.NO_EMITTED; } Iterable> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); - if (tups != null) { - if(_spoutConfig.topicAsStreamId) { - for (List tup : tups) { - collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset)); - } - } else { - for (List tup : tups) { - collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); - } + if ((tups != null) && tups.iterator().hasNext()) { + if(_spoutConfig.topicAsStreamId) { + for (List tup : tups) { + collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset)); + } + } else { + for (List tup : tups) { + collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); + } } break; } else {