From 2db52989fef2455413d42286893c5227983ee74b Mon Sep 17 00:00:00 2001 From: Juan Miguel Cejuela Date: Fri, 10 Nov 2017 17:57:09 +0100 Subject: [PATCH] compare as strictly SMALLER (not SMALLER OR EQUAL) (as per the doc header "if the modification time of the file is smaller than") Otherwise, some files with same exact timestamp (because they were written at the same exact long time) will be ignored. --- .../api/functions/source/ContinuousFileMonitoringFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 9f26efcdd5155..4743326ca5370 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -325,7 +325,7 @@ private Map listEligibleFiles(FileSystem fileSystem, Path path */ private boolean shouldIgnore(Path filePath, long modificationTime) { assert (Thread.holdsLock(checkpointLock)); - boolean shouldIgnore = modificationTime <= globalModificationTime; + boolean shouldIgnore = modificationTime < globalModificationTime; if (shouldIgnore && LOG.isDebugEnabled()) { LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);