From bf7a90f52aac59751ca325ab3f08cac3585b1ad7 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Tue, 26 May 2015 19:59:23 +0200 Subject: [PATCH] [core] cleanup & tests for FileInputFormat followup of f2891ab857e00bc70eb025bb430f46f4f58355a5 --- .../flink/api/common/io/FileInputFormat.java | 65 ++++--------------- .../common/io/EnumerateNestedFilesTest.java | 56 ++++++++++++++++ 2 files changed, 69 insertions(+), 52 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index efa5631c0e7f8..37739f547cf2c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -326,35 +326,23 @@ protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path f // get the file info and check whether the cached statistics are still valid. final FileStatus file = fs.getFileStatus(filePath); - long latestModTime = file.getModificationTime(); long totalLength = 0; - // enumerate all files and check their modification time stamp. + // enumerate all files if (file.isDir()) { - FileStatus[] fss = fs.listStatus(filePath); - files.ensureCapacity(fss.length); - - for (FileStatus s : fss) { - if (!s.isDir()) { - if (acceptFile(s)) { - files.add(s); - totalLength += s.getLen(); - latestModTime = Math.max(s.getModificationTime(), latestModTime); - testForUnsplittable(s); - } - } - else { - if (enumerateNestedFiles && acceptFile(s)) { - totalLength += addNestedFiles(s.getPath(), files, 0, false); - } - } - } + totalLength += addFilesInDir(file.getPath(), files, totalLength, false); } else { files.add(file); testForUnsplittable(file); totalLength += file.getLen(); } + // check the modification time stamp + long latestModTime = 0; + for (FileStatus f : files) { + latestModTime = Math.max(f.getModificationTime(), latestModTime); + } + // check whether the cached statistics are still valid, if we have any if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { return cachedStats; @@ -402,33 +390,7 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { final FileStatus pathFile = fs.getFileStatus(path); if (pathFile.isDir()) { - // input is directory. list all contained files - final FileStatus[] dir = fs.listStatus(path); - for (int i = 0; i < dir.length; i++) { - if (dir[i].isDir()) { - if (enumerateNestedFiles) { - if(acceptFile(dir[i])) { - totalLength += addNestedFiles(dir[i].getPath(), files, 0, true); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Directory "+dir[i].getPath().toString()+" did not pass the file-filter and is excluded."); - } - } - } - } - else { - if (acceptFile(dir[i])) { - files.add(dir[i]); - totalLength += dir[i].getLen(); - // as soon as there is one deflate file in a directory, we can not split it - testForUnsplittable(dir[i]); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("File "+dir[i].getPath().toString()+" did not pass the file-filter and is excluded."); - } - } - } - } + totalLength += addFilesInDir(path, files, totalLength, true); } else { testForUnsplittable(pathFile); @@ -532,18 +494,17 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { } /** - * Recursively traverse the input directory structure - * and enumerate all accepted nested files. + * Enumerate all files in the directory and recursive if enumerateNestedFiles is true. * @return the total length of accepted files. */ - private long addNestedFiles(Path path, List files, long length, boolean logExcludedFiles) + private long addFilesInDir(Path path, List files, long length, boolean logExcludedFiles) throws IOException { final FileSystem fs = path.getFileSystem(); for(FileStatus dir: fs.listStatus(path)) { if (dir.isDir()) { - if (acceptFile(dir)) { - length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles); + if (acceptFile(dir) && enumerateNestedFiles) { + length += addFilesInDir(dir.getPath(), files, length, logExcludedFiles); } else { if (logExcludedFiles && LOG.isDebugEnabled()) { LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded."); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java index 1fbf40d83b576..da4b0f1832a2f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java @@ -187,6 +187,53 @@ public void testTwoNestedDirectoriesTrue() { } } + /** + * Tests if the recursion is invoked correctly in nested directories. + */ + @Test + public void testOnlyLevel2NestedDirectories() { + try { + String rootDir = TestFileUtils.randomFileName(); + String nestedDir = TestFileUtils.randomFileName(); + String firstNestedNestedDir = TestFileUtils.randomFileName(); + String secondNestedNestedDir = TestFileUtils.randomFileName(); + + File testDir = new File(tempPath + System.getProperty("file.separator") + rootDir); + testDir.mkdirs(); + testDir.deleteOnExit(); + + File nested = new File(testDir.getAbsolutePath() + System.getProperty("file.separator") + nestedDir); + nested.mkdirs(); + nested.deleteOnExit(); + + File nestedNestedDir1 = new File(nested.getAbsolutePath() + System.getProperty("file.separator") + + firstNestedNestedDir); + nestedNestedDir1.mkdirs(); + nestedNestedDir1.deleteOnExit(); + + File nestedNestedDir2 = new File(nested.getAbsolutePath() + System.getProperty("file.separator") + + secondNestedNestedDir); + nestedNestedDir2.mkdirs(); + nestedNestedDir2.deleteOnExit(); + + // create files in second level + TestFileUtils.createTempFileInDirectory(nestedNestedDir1.getAbsolutePath(), "paella"); + TestFileUtils.createTempFileInDirectory(nestedNestedDir1.getAbsolutePath(), "kalamari"); + TestFileUtils.createTempFileInDirectory(nestedNestedDir2.getAbsolutePath(), "fideua"); + TestFileUtils.createTempFileInDirectory(nestedNestedDir2.getAbsolutePath(), "bravas"); + + this.format.setFilePath(new Path(testDir.getAbsolutePath())); + this.config.setBoolean("recursive.file.enumeration", true); + format.configure(this.config); + + FileInputSplit[] splits = format.createInputSplits(1); + Assert.assertEquals(4, splits.length); + } catch (Exception ex) { + ex.printStackTrace(); + Assert.fail(ex.getMessage()); + } + } + /** * Test with two nested directories and recursive.file.enumeration = true */ @@ -309,6 +356,15 @@ public void testGetStatisticsMultipleNestedFiles() { BaseStatistics stats = format.getStatistics(null); Assert.assertEquals("The file size from the statistics is wrong.", TOTAL, stats.getTotalInputSize()); + + /* Now invalidate the cache and check again */ + Thread.sleep(1000); // accuracy of file modification times is rather low + TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), 42L); + + BaseStatistics stats2 = format.getStatistics(stats); + Assert.assertNotEquals(stats2, stats); + Assert.assertEquals("The file size from the statistics is wrong.", TOTAL + 42L, stats2.getTotalInputSize()); + } catch (Exception ex) { ex.printStackTrace(); Assert.fail(ex.getMessage());