From 7cb492410ab083dcc56515a0c653936fe32c2c14 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 7 Oct 2016 20:06:18 +0200 Subject: [PATCH] [FLINK-4777] catch IOException in ContinuousFileMonitoringFunction FileSystem.listStatus(path) may throw an IOException when it lists files and then retrieves their file status. This is quite common, e.g. editors which create temporary files and move them. The ContinuousFileMonitoringFunction can only apply a file path filter afterwards. The solution is to defer file checks until no exception is caught anymore. --- .../source/ContinuousFileMonitoringFunction.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 8ff4a2af510b5..4b2fbe197201e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -246,12 +246,21 @@ private Map> getInputSplits(List eligible * method to decide which parts of the file to be processed, and forward them downstream. */ private List listEligibleFiles(FileSystem fileSystem) throws IOException { - List files = new ArrayList<>(); - FileStatus[] statuses = fileSystem.listStatus(new Path(path)); + final FileStatus[] statuses; + try { + statuses = fileSystem.listStatus(new Path(path)); + } catch (IOException e) { + // we may run into an IOException if files are moved while listing their status + // delay the check for eligible files in this case + return Collections.emptyList(); + } + if (statuses == null) { LOG.warn("Path does not exist: {}", path); + return Collections.emptyList(); } else { + List files = new ArrayList<>(); // handle the new files for (FileStatus status : statuses) { Path filePath = status.getPath(); @@ -260,8 +269,8 @@ private List listEligibleFiles(FileSystem fileSystem) throws IOExcep files.add(status); } } + return files; } - return files; } /**