Skip to content

Commit

Permalink
[BEAM-13] Cleanup destination attribute in the reader and improve wat…
Browse files Browse the repository at this point in the history
…ermark value
  • Loading branch information
jbonofre committed Jul 6, 2016
1 parent 54063b5 commit 5a85bc2
Showing 1 changed file with 4 additions and 4 deletions.
Expand Up @@ -277,13 +277,13 @@ private static class UnboundedJmsReader extends UnboundedReader<JmsRecord> {

private UnboundedJmsSource source;
private JmsCheckpointMark checkpointMark;
private String destination;
private Connection connection;
private Session session;
private MessageConsumer consumer;

private JmsRecord currentRecord;
private Instant currentTimestamp;
private Instant watermark;

public UnboundedJmsReader(
UnboundedJmsSource source,
Expand All @@ -306,10 +306,8 @@ public boolean start() throws IOException {
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (source.topic != null) {
this.consumer = this.session.createConsumer(this.session.createTopic(source.topic));
this.destination = source.topic;
} else {
this.consumer = this.session.createConsumer(this.session.createQueue(source.queue));
this.destination = source.queue;
}

return advance();
Expand Down Expand Up @@ -352,6 +350,8 @@ public boolean advance() throws IOException {
checkpointMark.addMessage(message);

currentRecord = jmsRecord;
// store the "previous" current timestamp in the watermark
watermark = currentTimestamp;
currentTimestamp = new Instant(message.getJMSTimestamp());

return true;
Expand All @@ -370,7 +370,7 @@ public JmsRecord getCurrent() throws NoSuchElementException {

@Override
public Instant getWatermark() {
return currentTimestamp;
return watermark;
}

@Override
Expand Down

0 comments on commit 5a85bc2

Please sign in to comment.