Skip to content

Commit

Permalink
Make sure nextTuple emits tuple with non-null values (#1000)
Browse files Browse the repository at this point in the history
* Make sure nextTuple emits tuple with non-null values

* remove recursive emit call and reduce receive-timeout
  • Loading branch information
rdhabalia authored and merlimat committed Dec 22, 2017
1 parent df9d4e5 commit 5fa141e
Showing 1 changed file with 24 additions and 6 deletions.
Expand Up @@ -158,6 +158,16 @@ public void fail(Object msgId) {
*/
@Override
public void nextTuple() {
emitNextAvailableTuple();
}

/**
* It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message
* available. It receives message from consumer queue and converts it to tuple and emits to topology. if the
* converted tuple is null then it tries to receives next message and perform the same until it finds non-tuple to
* emit.
*/
public void emitNextAvailableTuple() {
Message msg;

// check if there are any failed messages to re-emit in the topology
Expand All @@ -182,12 +192,18 @@ public void nextTuple() {
LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", spoutId);
}
try {
msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
++messagesReceived;
messageSizeReceived += msg.getData().length;
boolean done = false;
while (!done) {
msg = consumer.receive(100, TimeUnit.MILLISECONDS);
if (msg != null) {
++messagesReceived;
messageSizeReceived += msg.getData().length;
done = mapToValueAndEmit(msg);
} else {
// queue is empty and nothing to emit
done = true;
}
}
mapToValueAndEmit(msg);
} catch (PulsarClientException e) {
LOG.error("[{}] Error receiving message from pulsar consumer", spoutId, e);
}
Expand Down Expand Up @@ -228,7 +244,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

private void mapToValueAndEmit(Message msg) {
private boolean mapToValueAndEmit(Message msg) {
if (msg != null) {
Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg);
++pendingAcks;
Expand All @@ -244,8 +260,10 @@ private void mapToValueAndEmit(Message msg) {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId());
}
return true;
}
}
return false;
}

public class MessageRetries {
Expand Down

0 comments on commit 5fa141e

Please sign in to comment.