From 5a85bc26bb5dc502d51740099b6059344eae03e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 6 Jul 2016 08:10:05 +0200 Subject: [PATCH] [BEAM-13] Cleanup destination attribute in the reader and improve watermark value --- .../src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index f69224611c66b..b3a067805099d 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -277,13 +277,13 @@ private static class UnboundedJmsReader extends UnboundedReader { private UnboundedJmsSource source; private JmsCheckpointMark checkpointMark; - private String destination; private Connection connection; private Session session; private MessageConsumer consumer; private JmsRecord currentRecord; private Instant currentTimestamp; + private Instant watermark; public UnboundedJmsReader( UnboundedJmsSource source, @@ -306,10 +306,8 @@ public boolean start() throws IOException { this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); if (source.topic != null) { this.consumer = this.session.createConsumer(this.session.createTopic(source.topic)); - this.destination = source.topic; } else { this.consumer = this.session.createConsumer(this.session.createQueue(source.queue)); - this.destination = source.queue; } return advance(); @@ -352,6 +350,8 @@ public boolean advance() throws IOException { checkpointMark.addMessage(message); currentRecord = jmsRecord; + // store the "previous" current timestamp in the watermark + watermark = currentTimestamp; currentTimestamp = new Instant(message.getJMSTimestamp()); return true; @@ -370,7 +370,7 @@ public JmsRecord getCurrent() throws NoSuchElementException { @Override public Instant getWatermark() { - return currentTimestamp; + return watermark; } @Override