Skip to content
Browse files

fix problem with hostport conversion, logging

  • Loading branch information...
1 parent 9ee99ca commit b5e3c01e3425f29c92353e10b38fd0472a81e3b7 @nathanmarz nathanmarz committed May 30, 2012
View
4 storm-kafka/src/jvm/storm/kafka/HostPort.java
@@ -1,6 +1,8 @@
package storm.kafka;
-public class HostPort {
+import java.io.Serializable;
+
+public class HostPort implements Serializable {
public String host;
public int port;
View
1 storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@ -44,6 +44,7 @@ public void forceStartOffsetTime(long millis) {
} else {
throw new IllegalArgumentException("Invalid host specification: " + s);
}
+ ret.add(hp);
}
return ret;
}
View
5 storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@ -84,9 +84,12 @@ public PartitionManager(int partition) {
//returns false if it's reached the end of current batch
public EmitState next() {
+ LOG.info("Filling from " + _partitions.getConsumer(_partition).host() + " " + _partition);
if(_waitingToEmit.isEmpty()) fill();
MessageAndOffset toEmit = _waitingToEmit.pollFirst();
- if(toEmit==null) return EmitState.NO_EMITTED;
+ if(toEmit==null) {
+ return EmitState.NO_EMITTED;
+ }
List<Object> tup = _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.message().payload()));
_collector.emit(tup, new KafkaMessageId(_partition, actualOffset(toEmit)));
if(_waitingToEmit.size()>0) {

0 comments on commit b5e3c01

Please sign in to comment.
Something went wrong with that request. Please try again.