Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -415,16 +416,28 @@ public boolean requiresDeduping() {
private static class RabbitMQCheckpointMark
implements UnboundedSource.CheckpointMark, Serializable {
transient Channel channel;
Instant oldestTimestamp = Instant.now();
Instant latestTimestamp = Instant.now();
final List<Long> sessionIds = new ArrayList<>();

/**
* Advances the watermark to the provided time, provided said time is after the current
* watermark. If the provided time is before the latest, this function no-ops.
*
* @param time The time to advance the watermark to
*/
public void advanceWatermark(Instant time) {
if (time.isAfter(latestTimestamp)) {
latestTimestamp = time;
}
}

@Override
public void finalizeCheckpoint() throws IOException {
for (Long sessionId : sessionIds) {
channel.basicAck(sessionId, false);
}
channel.txCommit();
oldestTimestamp = Instant.now();
latestTimestamp = Instant.now();
sessionIds.clear();
}
}
Expand All @@ -449,7 +462,7 @@ private static class UnboundedRabbitMqReader

@Override
public Instant getWatermark() {
return checkpointMark.oldestTimestamp;
return checkpointMark.latestTimestamp;
}

@Override
Expand Down Expand Up @@ -530,6 +543,10 @@ public boolean advance() throws IOException {
// we consume message without autoAck (we want to do the ack ourselves)
GetResponse delivery = channel.basicGet(queueName, false);
if (delivery == null) {
current = null;
currentRecordId = null;
currentTimestamp = null;
checkpointMark.advanceWatermark(Instant.now());
return false;
}
if (source.spec.useCorrelationId()) {
Expand All @@ -545,10 +562,10 @@ public boolean advance() throws IOException {
checkpointMark.sessionIds.add(deliveryTag);

current = new RabbitMqMessage(source.spec.routingKey(), delivery);
currentTimestamp = new Instant(delivery.getProps().getTimestamp());
if (currentTimestamp.isBefore(checkpointMark.oldestTimestamp)) {
checkpointMark.oldestTimestamp = currentTimestamp;
}
Date deliveryTimestamp = delivery.getProps().getTimestamp();
currentTimestamp =
(deliveryTimestamp != null) ? new Instant(deliveryTimestamp) : Instant.now();
checkpointMark.advanceWatermark(currentTimestamp);
} catch (IOException e) {
throw e;
} catch (Exception e) {
Expand Down