diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 0a69c99cab810..7bd03c75d7484 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -93,7 +93,7 @@ public final class FSImageFormatPBINode { public static final int XATTR_NAMESPACE_EXT_OFFSET = 5; public static final int XATTR_NAMESPACE_EXT_MASK = 1; - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(FSImageFormatPBINode.class); private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000; @@ -227,6 +227,7 @@ void loadINodeDirectorySectionInParallel(ExecutorService service, LOG.info("Loading the INodeDirectory section in parallel with {} sub-" + "sections", sections.size()); CountDownLatch latch = new CountDownLatch(sections.size()); + AtomicInteger totalLoaded = new AtomicInteger(0); final CopyOnWriteArrayList exceptions = new CopyOnWriteArrayList<>(); for (FileSummary.Section s : sections) { @@ -235,7 +236,7 @@ void loadINodeDirectorySectionInParallel(ExecutorService service, try { ins = parent.getInputStreamForSection(s, compressionCodec); - loadINodeDirectorySection(ins); + totalLoaded.addAndGet(loadINodeDirectorySection(ins)); } catch (Exception e) { LOG.error("An exception occurred loading INodeDirectories in " + "parallel", e); @@ -254,6 +255,7 @@ void loadINodeDirectorySectionInParallel(ExecutorService service, } try { latch.await(); + totalLoaded.incrementAndGet(); // increase ROOT_INODE } catch (InterruptedException e) { LOG.error("Interrupted waiting for countdown latch", e); throw new IOException(e); @@ -263,12 +265,14 @@ void loadINodeDirectorySectionInParallel(ExecutorService service, exceptions.size()); throw exceptions.get(0); } - LOG.info("Completed loading all INodeDirectory sub-sections"); + LOG.info("Completed loading all INodeDirectory sub-sections. Loaded {} inodes.", + totalLoaded.get()); } - void loadINodeDirectorySection(InputStream in) throws IOException { + int loadINodeDirectorySection(InputStream in) throws IOException { final List refList = parent.getLoaderContext() .getRefList(); + int cntr = 0; while (true) { INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry .parseDelimitedFrom(in); @@ -279,6 +283,9 @@ void loadINodeDirectorySection(InputStream in) throws IOException { INodeDirectory p = dir.getInode(e.getParent()).asDirectory(); for (long id : e.getChildrenList()) { INode child = dir.getInode(id); + if (child.isDirectory()) { + cntr++; + } if (!addToParent(p, child)) { LOG.warn("Failed to add the inode {} to the directory {}", child.getId(), p.getId()); @@ -293,6 +300,8 @@ void loadINodeDirectorySection(InputStream in) throws IOException { } } } + + return cntr; } private void fillUpInodeList(ArrayList inodeList, INode inode) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 185db6916ab0d..4157bcb82a211 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -84,6 +84,7 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Test; +import org.slf4j.event.Level; import static org.junit.Assert.assertArrayEquals; @@ -1103,6 +1104,99 @@ public void testParallelSaveAndLoad() throws IOException { } } + @Test + public void testNumInodeWithParallelLoad() throws IOException { + Configuration conf = new Configuration(); + + MiniDFSCluster cluster = null; + try { + // generate MiniDFSCluster + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4"); + + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + String baseDir = "/abc"; + Path basePath = new Path(baseDir); + if (fs.exists(basePath)) { + fs.delete(basePath, true); + } + fs.mkdirs(basePath); + // create 10 directories with 5 files per directory + for (int i = 0; i < 10; i++) { + Path dir = new Path(baseDir + "/" + i); + for (int j = 0; j < 5; j++) { + Path f = new Path(dir, Integer.toString(j)); + FSDataOutputStream os = fs.create(f); + os.write(1); + os.close(); + } + } + + GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer. + captureLogs(FSImageFormatPBINode.LOG); + GenericTestUtils.setLogLevel(FSImageFormatPBINode.LOG, Level.INFO); + // checkpoint + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + logs.clearOutput(); + cluster.restartNameNode(); + cluster.waitActive(); + + FSDirectory fsDir = cluster.getNamesystem().getFSDirectory(); + int inodeSize = fsDir.getINodeMap().size(); + assertEquals(62, inodeSize); + + String content = logs.getOutput(); + assertTrue(content.contains("Completed loading all INodeDirectory " + + "sub-sections. Loaded 12 inodes.")); + assertTrue(content.contains("Completed loading all INode sections. " + + "Loaded 62 inodes.")); + + fs.delete(basePath, true); + fs.mkdirs(basePath); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 5; j++) { + Path dir = new Path(baseDir + "/" + i + "/" + j); // multi-level directory + for (int t = 0; t < 5; t++) { + Path f = new Path(dir, Integer.toString(j)); + FSDataOutputStream os = fs.create(f); + os.write(1); + os.close(); + } + } + } + + // checkpoint + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + logs.clearOutput(); + cluster.restartNameNode(); + cluster.waitActive(); + + fsDir = cluster.getNamesystem().getFSDirectory(); + inodeSize = fsDir.getINodeMap().size(); + assertEquals(112, inodeSize); + + content = logs.getOutput(); + assertTrue(content.contains("Completed loading all INodeDirectory " + + "sub-sections. Loaded 62 inodes.")); + assertTrue(content.contains("Completed loading all INode sections. " + + "Loaded 112 inodes.")); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + @Test public void testNoParallelSectionsWithCompressionEnabled() throws IOException {