Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public final class FSImageFormatPBINode {
public static final int XATTR_NAMESPACE_EXT_OFFSET = 5;
public static final int XATTR_NAMESPACE_EXT_MASK = 1;

private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(FSImageFormatPBINode.class);

private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000;
Expand Down Expand Up @@ -227,6 +227,7 @@ void loadINodeDirectorySectionInParallel(ExecutorService service,
LOG.info("Loading the INodeDirectory section in parallel with {} sub-" +
"sections", sections.size());
CountDownLatch latch = new CountDownLatch(sections.size());
AtomicInteger totalLoaded = new AtomicInteger(0);
final CopyOnWriteArrayList<IOException> exceptions =
new CopyOnWriteArrayList<>();
for (FileSummary.Section s : sections) {
Expand All @@ -235,7 +236,7 @@ void loadINodeDirectorySectionInParallel(ExecutorService service,
try {
ins = parent.getInputStreamForSection(s,
compressionCodec);
loadINodeDirectorySection(ins);
totalLoaded.addAndGet(loadINodeDirectorySection(ins));
} catch (Exception e) {
LOG.error("An exception occurred loading INodeDirectories in " +
"parallel", e);
Expand All @@ -254,6 +255,7 @@ void loadINodeDirectorySectionInParallel(ExecutorService service,
}
try {
latch.await();
totalLoaded.incrementAndGet(); // increase ROOT_INODE
} catch (InterruptedException e) {
LOG.error("Interrupted waiting for countdown latch", e);
throw new IOException(e);
Expand All @@ -263,12 +265,14 @@ void loadINodeDirectorySectionInParallel(ExecutorService service,
exceptions.size());
throw exceptions.get(0);
}
LOG.info("Completed loading all INodeDirectory sub-sections");
LOG.info("Completed loading all INodeDirectory sub-sections. Loaded {} inodes.",
totalLoaded.get());
}

void loadINodeDirectorySection(InputStream in) throws IOException {
int loadINodeDirectorySection(InputStream in) throws IOException {
final List<INodeReference> refList = parent.getLoaderContext()
.getRefList();
int cntr = 0;
while (true) {
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
.parseDelimitedFrom(in);
Expand All @@ -279,6 +283,9 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
for (long id : e.getChildrenList()) {
INode child = dir.getInode(id);
if (child.isDirectory()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only incrementing here if its a directory. The inode table / section contains an entry for every file and directory in the system.

The the directory section is what links them all together into the parent child relationship, so it should contain about the same number of entries as inodes.

I am not sure if it makes sense to just count the directories here, as we have already counted them in the inode section.

Why do you want to count just directories? Would it make more sense to count each entry and child entry to give an idea of the number of entries processed by each parallel section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sodonnel for the comment and review.
When loading FsImage, we have recorded the number of all inodes (including INodeFile and INodeDirectory) in the log.
For example, here is the specific record information:
2021-09-30 19:12:55,034 [15609]-INFO [main:FSImageFormatPBINode$Loader@409]-Loading xxxx INodes.
Yes, this is good. We can know the data of the loaded inode, but this is a sum. But we can't know how many INodeFiles or how many INodeDirectory are loaded, if we can know how many INodeFiles are loaded, similarly, we can know how many INodeDirectory is loaded. This will help us find the cause of the problem when there is an exception.
Regarding the reason for dealing with INodeDirectory here. What I want to show is that in many cases, the number of files created will be more than the number of directories created. Therefore, it may use less time when calculating INodeDirectory.

cntr++;
}
if (!addToParent(p, child)) {
LOG.warn("Failed to add the inode {} to the directory {}",
child.getId(), p.getId());
Expand All @@ -293,6 +300,8 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
}
}
}

return cntr;
}

private void fillUpInodeList(ArrayList<INode> inodeList, INode inode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.event.Level;

import static org.junit.Assert.assertArrayEquals;

Expand Down Expand Up @@ -1103,6 +1104,99 @@ public void testParallelSaveAndLoad() throws IOException {
}
}

@Test
public void testNumInodeWithParallelLoad() throws IOException {
Configuration conf = new Configuration();

MiniDFSCluster cluster = null;
try {
// generate MiniDFSCluster
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true");
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1");
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4");
conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4");

cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
String baseDir = "/abc";
Path basePath = new Path(baseDir);
if (fs.exists(basePath)) {
fs.delete(basePath, true);
}
fs.mkdirs(basePath);
// create 10 directories with 5 files per directory
for (int i = 0; i < 10; i++) {
Path dir = new Path(baseDir + "/" + i);
for (int j = 0; j < 5; j++) {
Path f = new Path(dir, Integer.toString(j));
FSDataOutputStream os = fs.create(f);
os.write(1);
os.close();
}
}

GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer.
captureLogs(FSImageFormatPBINode.LOG);
GenericTestUtils.setLogLevel(FSImageFormatPBINode.LOG, Level.INFO);
// checkpoint
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);

logs.clearOutput();
cluster.restartNameNode();
cluster.waitActive();

FSDirectory fsDir = cluster.getNamesystem().getFSDirectory();
int inodeSize = fsDir.getINodeMap().size();
assertEquals(62, inodeSize);

String content = logs.getOutput();
assertTrue(content.contains("Completed loading all INodeDirectory " +
"sub-sections. Loaded 12 inodes."));
assertTrue(content.contains("Completed loading all INode sections. " +
"Loaded 62 inodes."));

fs.delete(basePath, true);
fs.mkdirs(basePath);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
Path dir = new Path(baseDir + "/" + i + "/" + j); // multi-level directory
for (int t = 0; t < 5; t++) {
Path f = new Path(dir, Integer.toString(j));
FSDataOutputStream os = fs.create(f);
os.write(1);
os.close();
}
}
}

// checkpoint
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
fs.saveNamespace();
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);

logs.clearOutput();
cluster.restartNameNode();
cluster.waitActive();

fsDir = cluster.getNamesystem().getFSDirectory();
inodeSize = fsDir.getINodeMap().size();
assertEquals(112, inodeSize);

content = logs.getOutput();
assertTrue(content.contains("Completed loading all INodeDirectory " +
"sub-sections. Loaded 62 inodes."));
assertTrue(content.contains("Completed loading all INode sections. " +
"Loaded 112 inodes."));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}

@Test
public void testNoParallelSectionsWithCompressionEnabled()
throws IOException {
Expand Down