Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index #1028

Merged
merged 11 commits into from Aug 23, 2019
Expand Up @@ -883,6 +883,22 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can add annotation?
// NameNode fsimage start parallel

public static final String DFS_IMAGE_PARALLEL_LOAD_KEY =
"dfs.image.parallel.load";
public static final boolean DFS_IMAGE_PARALLEL_LOAD_DEFAULT = true;

public static final String DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY =
"dfs.image.parallel.target.sections";
public static final int DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT = 12;

public static final String DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY =
"dfs.image.parallel.inode.threshold";
public static final int DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT = 1000000;

public static final String DFS_IMAGE_PARALLEL_THREADS_KEY =
"dfs.image.parallel.threads";
public static final int DFS_IMAGE_PARALLEL_THREADS_DEFAULT = 4;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, threads size should be not greater than target sections, otherwise the remaining threads will not be used or some other issues. So is it necessary to warn this configuration limit?

// Edit Log segment transfer timeout
public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY =
"dfs.edit.log.transfer.timeout";
Expand Down
Expand Up @@ -985,7 +985,8 @@ void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
File dstFile = NNStorage.getStorageFile(sd, dstType, txid);

FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context,
conf);
FSImageCompression compression = FSImageCompression.createCompression(conf);
long numErrors = saver.save(newFile, compression);
if (numErrors > 0) {
Expand Down
Expand Up @@ -25,6 +25,11 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +95,8 @@ public final class FSImageFormatPBINode {
private static final Logger LOG =
LoggerFactory.getLogger(FSImageFormatPBINode.class);

private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000;

// the loader must decode all fields referencing serial number based fields
// via to<Item> methods with the string table.
public final static class Loader {
Expand Down Expand Up @@ -197,16 +204,66 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) {
private final FSDirectory dir;
private final FSNamesystem fsn;
private final FSImageFormatProtobuf.Loader parent;
private ReentrantLock cacheNameMapLock;
private ReentrantLock blockMapLock;

Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
this.fsn = fsn;
this.dir = fsn.dir;
this.parent = parent;
cacheNameMapLock = new ReentrantLock(true);
blockMapLock = new ReentrantLock(true);
}

void loadINodeDirectorySectionInParallel(ExecutorService service,
ArrayList<FileSummary.Section> sections, String compressionCodec)
throws IOException {
LOG.info("Loading the INodeDirectory section in parallel with {} sub-" +
"sections", sections.size());
CountDownLatch latch = new CountDownLatch(sections.size());
final CopyOnWriteArrayList<IOException> exceptions =
new CopyOnWriteArrayList<>();
for (FileSummary.Section s : sections) {
service.submit(() -> {
InputStream ins = null;
try {
ins = parent.getInputStreamForSection(s,
compressionCodec);
loadINodeDirectorySection(ins);
} catch (Exception e) {
LOG.error("An exception occurred loading INodeDirectories in " +
"parallel", e);
exceptions.add(new IOException(e));
} finally {
latch.countDown();
try {
if (ins != null) {
ins.close();
}
} catch (IOException ioe) {
LOG.warn("Failed to close the input stream, ignoring", ioe);
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
LOG.error("Interrupted waiting for countdown latch", e);
throw new IOException(e);
}
if (exceptions.size() != 0) {
LOG.error("{} exceptions occurred loading INodeDirectories",
exceptions.size());
throw exceptions.get(0);
}
LOG.info("Completed loading all INodeDirectory sub-sections");
}

void loadINodeDirectorySection(InputStream in) throws IOException {
final List<INodeReference> refList = parent.getLoaderContext()
.getRefList();
ArrayList<INode> inodeList = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use inodeList to batch cache name and update blocksMap here, right?

  1. following code fragment limit the size under 1000, should this value be defined as constant?
  2. batch process update blocksMap is good idea, however it will bring negative effects for cache? anti-locality? Please correct me if I am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the performance benefit of batch update outweighs cache locality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jojochuang Thanks for your feedback, it is true that performance first. I am just concerned about memory overhead since the cache hit ratio decreased with batch cache way. Of course it is not serious issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this comment earlier. I have changed the 1000 to a constant.

For the cache in used, I have no concerns. Its a filename cache where all the file names are loaded to it at startup time, so the same string can be re-used if there are multiple files with the same name. It is not an LRU cache or anything like that, but is a hashmap of filenames used to reduce the overall heap used in the namenode. Loading it in batch like this will not affect the usefulness of it.

while (true) {
INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
.parseDelimitedFrom(in);
Expand All @@ -217,33 +274,159 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
for (long id : e.getChildrenList()) {
INode child = dir.getInode(id);
addToParent(p, child);
if (addToParent(p, child)) {
if (child.isFile()) {
inodeList.add(child);
}
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
addToCacheAndBlockMap(inodeList);
inodeList.clear();
}
} else {
LOG.warn("Failed to add the inode {} to the directory {}",
child.getId(), p.getId());
}
jojochuang marked this conversation as resolved.
Show resolved Hide resolved
}

for (int refId : e.getRefChildrenList()) {
INodeReference ref = refList.get(refId);
addToParent(p, ref);
if (addToParent(p, ref)) {
if (ref.isFile()) {
inodeList.add(ref);
}
if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) {
addToCacheAndBlockMap(inodeList);
inodeList.clear();
}
} else {
LOG.warn("Failed to add the inode reference {} to the directory {}",
ref.getId(), p.getId());
}
}
}
addToCacheAndBlockMap(inodeList);
}

private void addToCacheAndBlockMap(ArrayList<INode> inodeList) {
try {
cacheNameMapLock.lock();
for (INode i : inodeList) {
dir.cacheName(i);
}
} finally {
cacheNameMapLock.unlock();
}

try {
blockMapLock.lock();
for (INode i : inodeList) {
updateBlocksMap(i.asFile(), fsn.getBlockManager());
}
} finally {
blockMapLock.unlock();
}
}

void loadINodeSection(InputStream in, StartupProgress prog,
Step currentStep) throws IOException {
INodeSection s = INodeSection.parseDelimitedFrom(in);
fsn.dir.resetLastInodeId(s.getLastInodeId());
long numInodes = s.getNumInodes();
LOG.info("Loading " + numInodes + " INodes.");
prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
loadINodeSectionHeader(in, prog, currentStep);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
for (int i = 0; i < numInodes; ++i) {
int totalLoaded = loadINodesInSection(in, counter);
LOG.info("Successfully loaded {} inodes", totalLoaded);
}

private int loadINodesInSection(InputStream in, Counter counter)
throws IOException {
// As the input stream is a LimitInputStream, the reading will stop when
// EOF is encountered at the end of the stream.
int cntr = 0;
while (true) {
INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
if (p == null) {
break;
}
if (p.getId() == INodeId.ROOT_INODE_ID) {
loadRootINode(p);
synchronized(this) {
sodonnel marked this conversation as resolved.
Show resolved Hide resolved
loadRootINode(p);
}
} else {
INode n = loadINode(p);
dir.addToInodeMap(n);
synchronized(this) {
dir.addToInodeMap(n);
}
}
cntr++;
if (counter != null) {
counter.increment();
}
}
return cntr;
}


private long loadINodeSectionHeader(InputStream in, StartupProgress prog,
Step currentStep) throws IOException {
INodeSection s = INodeSection.parseDelimitedFrom(in);
fsn.dir.resetLastInodeId(s.getLastInodeId());
long numInodes = s.getNumInodes();
LOG.info("Loading " + numInodes + " INodes.");
prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
return numInodes;
}

void loadINodeSectionInParallel(ExecutorService service,
ArrayList<FileSummary.Section> sections,
String compressionCodec, StartupProgress prog,
Step currentStep) throws IOException {
LOG.info("Loading the INode section in parallel with {} sub-sections",
sections.size());
long expectedInodes = 0;
CountDownLatch latch = new CountDownLatch(sections.size());
AtomicInteger totalLoaded = new AtomicInteger(0);
final CopyOnWriteArrayList<IOException> exceptions =
new CopyOnWriteArrayList<>();

for (int i=0; i < sections.size(); i++) {
FileSummary.Section s = sections.get(i);
InputStream ins = parent.getInputStreamForSection(s, compressionCodec);
if (i == 0) {
// The first inode section has a header which must be processed first
expectedInodes = loadINodeSectionHeader(ins, prog, currentStep);
}
counter.increment();
service.submit(() -> {
try {
totalLoaded.addAndGet(loadINodesInSection(ins, null));
prog.setCount(Phase.LOADING_FSIMAGE, currentStep,
totalLoaded.get());
} catch (Exception e) {
LOG.error("An exception occurred loading INodes in parallel", e);
exceptions.add(new IOException(e));
} finally {
latch.countDown();
try {
ins.close();
} catch (IOException ioe) {
LOG.warn("Failed to close the input stream, ignoring", ioe);
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
LOG.info("Interrupted waiting for countdown latch");
}
if (exceptions.size() != 0) {
LOG.error("{} exceptions occurred loading INodes", exceptions.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some exceptions will invisible to users? it will be not easy to get root cause if meet some exceptions but the core exception is not at head.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All exceptions will be logged by the executor here:

          public void run() {
            try {
              totalLoaded.addAndGet(loadINodesInSection(ins, null));
              prog.setCount(Phase.LOADING_FSIMAGE, currentStep,
                  totalLoaded.get());
            } catch (Exception e) {
              LOG.error("An exception occurred loading INodes in parallel", e);
              exceptions.add(new IOException(e));
            } finally {
            ...

It is fairly likely that if there are problems all exceptions will be from the same cause, so just throwing the first one, while ensuring all of them are logged should make debugging possible.

The alternative is to create a wrapper exception that all exceptions get stored into, and then throw that, but that would need to be caught and log out each exception it contains anyway, and those exceptions are already logged, so it would duplicate the information in the logs.

throw exceptions.get(0);
}
if (totalLoaded.get() != expectedInodes) {
throw new IOException("Expected to load "+expectedInodes+" in " +
"parallel, but loaded "+totalLoaded.get()+". The image may " +
"be corrupt.");
}
LOG.info("Completed loading all INode sections. Loaded {} inodes.",
totalLoaded.get());
}

/**
Expand All @@ -261,22 +444,18 @@ void loadFilesUnderConstructionSection(InputStream in) throws IOException {
}
}

private void addToParent(INodeDirectory parent, INode child) {
if (parent == dir.rootDir && FSDirectory.isReservedName(child)) {
private boolean addToParent(INodeDirectory parentDir, INode child) {
if (parentDir == dir.rootDir && FSDirectory.isReservedName(child)) {
throw new HadoopIllegalArgumentException("File name \""
+ child.getLocalName() + "\" is reserved. Please "
+ " change the name of the existing file or directory to another "
+ "name before upgrading to this release.");
}
// NOTE: This does not update space counts for parents
if (!parent.addChildAtLoading(child)) {
return;
}
dir.cacheName(child);

if (child.isFile()) {
updateBlocksMap(child.asFile(), fsn.getBlockManager());
if (!parentDir.addChildAtLoading(child)) {
return false;
}
return true;
}

private INode loadINode(INodeSection.INode n) {
Expand Down Expand Up @@ -527,6 +706,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
final ArrayList<INodeReference> refList = parent.getSaverContext()
.getRefList();
int i = 0;
int outputInodes = 0;
while (iter.hasNext()) {
INodeWithAdditionalFields n = iter.next();
if (!n.isDirectory()) {
Expand Down Expand Up @@ -558,6 +738,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
refList.add(inode.asReference());
b.addRefChildren(refList.size() - 1);
}
outputInodes++;
}
INodeDirectorySection.DirEntry e = b.build();
e.writeDelimitedTo(out);
Expand All @@ -567,9 +748,15 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
context.checkCancelled();
}
if (outputInodes >= parent.getInodesPerSubSection()) {
outputInodes = 0;
parent.commitSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
}
}
parent.commitSection(summary,
FSImageFormatProtobuf.SectionName.INODE_DIR);
parent.commitSectionAndSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE_DIR,
FSImageFormatProtobuf.SectionName.INODE_DIR_SUB);
}

void serializeINodeSection(OutputStream out) throws IOException {
Expand All @@ -589,8 +776,14 @@ void serializeINodeSection(OutputStream out) throws IOException {
if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
context.checkCancelled();
}
if (i % parent.getInodesPerSubSection() == 0) {
parent.commitSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE_SUB);
}
}
parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
parent.commitSectionAndSubSection(summary,
FSImageFormatProtobuf.SectionName.INODE,
FSImageFormatProtobuf.SectionName.INODE_SUB);
}

void serializeFilesUCSection(OutputStream out) throws IOException {
Expand Down