From 09646d014b9bfeae6b922dda2ca053738ea58c3a Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Fri, 28 Jun 2019 12:06:36 +0100 Subject: [PATCH 01/11] Initial code to implement the description in HDFS-14617 --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 16 ++ .../hadoop/hdfs/server/namenode/FSImage.java | 3 +- .../server/namenode/FSImageFormatPBINode.java | 226 ++++++++++++++++-- .../namenode/FSImageFormatProtobuf.java | 220 ++++++++++++++++- .../snapshot/FSImageFormatPBSnapshot.java | 9 +- .../namenode/TestFSImageWithSnapshot.java | 3 +- 6 files changed, 449 insertions(+), 28 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 15f5a417cb167..559008ff7fcad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -883,6 +883,22 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize"; public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024; + public static final String DFS_IMAGE_PARALLEL_LOAD_KEY = + "dfs.image.parallel.load"; + public static final boolean DFS_IMAGE_PARALLEL_LOAD_DEFAULT = true; + + public static final String DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY = + "dfs.image.parallel.target.sections"; + public static final int DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT = 12; + + public static final String DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY = + "dfs.image.parallel.inode.threshold"; + public static final int DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT = 1000000; + + public static final String DFS_IMAGE_PARALLEL_THREADS_KEY = + "dfs.image.parallel.threads"; + public static final int DFS_IMAGE_PARALLEL_THREADS_DEFAULT = 4; + // Edit Log segment transfer timeout public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY = "dfs.edit.log.transfer.timeout"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index cfba091976eb5..cea18b7f00382 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -985,7 +985,8 @@ void saveFSImage(SaveNamespaceContext context, StorageDirectory sd, File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid); File dstFile = NNStorage.getStorageFile(sd, dstType, txid); - FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context); + FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context, + conf); FSImageCompression compression = FSImageCompression.createCompression(conf); long numErrors = saver.save(newFile, compression); if (numErrors > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 6825a5c4857db..e19dbf2003143 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -25,6 +25,12 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -197,16 +203,66 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) { private final FSDirectory dir; private final FSNamesystem fsn; private final FSImageFormatProtobuf.Loader parent; + private ReentrantLock cacheNameMapLock; + private ReentrantLock blockMapLock; Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) { this.fsn = fsn; this.dir = fsn.dir; this.parent = parent; + cacheNameMapLock = new ReentrantLock(true); + blockMapLock = new ReentrantLock(true); + } + + void loadINodeDirectorySectionInParallel(ExecutorService service, + ArrayList sections, String compressionCodec) + throws IOException { + LOG.info("Loading the INodeDirectory section in parallel with {} sub-" + + "sections", sections.size()); + CountDownLatch latch = new CountDownLatch(sections.size()); + final CopyOnWriteArrayList exceptions = + new CopyOnWriteArrayList<>(); + for (FileSummary.Section s : sections) { + service.submit(new Runnable() { + public void run() { + InputStream ins = null; + try { + ins = parent.getInputStreamForSection(s, + compressionCodec); + loadINodeDirectorySection(ins); + } catch (Exception e) { + LOG.error("An exception occurred loading INodeDirectories in " + + "parallel", e); + exceptions.add(new IOException(e)); + } finally { + latch.countDown(); + try { + ins.close(); + } catch (IOException ioe) { + LOG.warn("Failed to close the input stream, ignoring", ioe); + } + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for countdown latch", e); + throw new IOException(e); + } + if (exceptions.size() != 0) { + LOG.error("{} exceptions occurred loading INodeDirectories", + exceptions.size()); + throw exceptions.get(0); + } + LOG.info("Completed loading all INodeDirectory sub-sections"); } void loadINodeDirectorySection(InputStream in) throws IOException { final List refList = parent.getLoaderContext() .getRefList(); + ArrayList inodeList = new ArrayList<>(); while (true) { INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry .parseDelimitedFrom(in); @@ -217,33 +273,150 @@ void loadINodeDirectorySection(InputStream in) throws IOException { INodeDirectory p = dir.getInode(e.getParent()).asDirectory(); for (long id : e.getChildrenList()) { INode child = dir.getInode(id); - addToParent(p, child); + if (addToParent(p, child)) { + if (child.isFile()) { + inodeList.add(child); + } + if (inodeList.size() >= 1000) { + addToCacheAndBlockMap(inodeList); + inodeList.clear(); + } + } } + for (int refId : e.getRefChildrenList()) { INodeReference ref = refList.get(refId); - addToParent(p, ref); + if (addToParent(p, ref)) { + if (ref.isFile()) { + inodeList.add(ref); + } + if (inodeList.size() >= 1000) { + addToCacheAndBlockMap(inodeList); + inodeList.clear(); + } + } } } + addToCacheAndBlockMap(inodeList); + } + + private void addToCacheAndBlockMap(ArrayList inodeList) { + try { + cacheNameMapLock.lock(); + for (INode i : inodeList) { + dir.cacheName(i); + } + } finally { + cacheNameMapLock.unlock(); + } + + try { + blockMapLock.lock(); + for (INode i : inodeList) { + updateBlocksMap(i.asFile(), fsn.getBlockManager()); + } + } finally { + blockMapLock.unlock(); + } } void loadINodeSection(InputStream in, StartupProgress prog, Step currentStep) throws IOException { - INodeSection s = INodeSection.parseDelimitedFrom(in); - fsn.dir.resetLastInodeId(s.getLastInodeId()); - long numInodes = s.getNumInodes(); - LOG.info("Loading " + numInodes + " INodes."); - prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); + loadINodeSectionHeader(in, prog, currentStep); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); - for (int i = 0; i < numInodes; ++i) { + int totalLoaded = loadINodesInSection(in, counter); + LOG.info("Successfully loaded {} inodes", totalLoaded); + } + + private int loadINodesInSection(InputStream in, Counter counter) + throws IOException { + // As the input stream is a LimitInputStream, the reading will stop when + // EOF is encountered at the end of the stream. + int cntr = 0; + while (true) { INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in); + if (p == null) { + break; + } if (p.getId() == INodeId.ROOT_INODE_ID) { - loadRootINode(p); + synchronized(this) { + loadRootINode(p); + } } else { INode n = loadINode(p); - dir.addToInodeMap(n); + synchronized(this) { + dir.addToInodeMap(n); + } + } + cntr ++; + if (counter != null) { + counter.increment(); } - counter.increment(); } + return cntr; + } + + + private void loadINodeSectionHeader(InputStream in, StartupProgress prog, + Step currentStep) throws IOException { + INodeSection s = INodeSection.parseDelimitedFrom(in); + fsn.dir.resetLastInodeId(s.getLastInodeId()); + long numInodes = s.getNumInodes(); + LOG.info("Loading " + numInodes + " INodes."); + prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); + } + + void loadINodeSectionInParallel(ExecutorService service, + ArrayList sections, + String compressionCodec, StartupProgress prog, + Step currentStep) throws IOException { + LOG.info("Loading the INode section in parallel with {} sub-sections", + sections.size()); + CountDownLatch latch = new CountDownLatch(sections.size()); + AtomicInteger totalLoaded = new AtomicInteger(0); + final CopyOnWriteArrayList exceptions = + new CopyOnWriteArrayList<>(); + + for (int i=0; i < sections.size(); i++) { + FileSummary.Section s = sections.get(i); + InputStream ins = parent.getInputStreamForSection(s, compressionCodec); + if (i == 0) { + // The first inode section has a header which must be processed first + loadINodeSectionHeader(ins, prog, currentStep); + } + + service.submit(new Runnable() { + public void run() { + try { + totalLoaded.addAndGet(loadINodesInSection(ins, null)); + prog.setCount(Phase.LOADING_FSIMAGE, currentStep, + totalLoaded.get()); + } catch (Exception e) { + LOG.error("An exception occurred loading INodes in parallel", e); + exceptions.add(new IOException(e)); + } finally { + latch.countDown(); + try { + ins.close(); + } catch (IOException ioe) { + LOG.warn("Failed to close the input stream, ignoring", ioe); + } + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + LOG.info("Interrupted waiting for countdown latch"); + } + if (exceptions.size() != 0) { + LOG.error("{} exceptions occurred loading INodes", exceptions.size()); + throw exceptions.get(0); + } + // TODO - should we fail if total_loaded != total_expected? + LOG.info("Completed loading all INode sections. Loaded {} inodes.", + totalLoaded.get()); } /** @@ -261,7 +434,7 @@ void loadFilesUnderConstructionSection(InputStream in) throws IOException { } } - private void addToParent(INodeDirectory parent, INode child) { + private boolean addToParent(INodeDirectory parent, INode child) { if (parent == dir.rootDir && FSDirectory.isReservedName(child)) { throw new HadoopIllegalArgumentException("File name \"" + child.getLocalName() + "\" is reserved. Please " @@ -270,13 +443,9 @@ private void addToParent(INodeDirectory parent, INode child) { } // NOTE: This does not update space counts for parents if (!parent.addChildAtLoading(child)) { - return; - } - dir.cacheName(child); - - if (child.isFile()) { - updateBlocksMap(child.asFile(), fsn.getBlockManager()); + return false; } + return true; } private INode loadINode(INodeSection.INode n) { @@ -527,6 +696,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { final ArrayList refList = parent.getSaverContext() .getRefList(); int i = 0; + int outputInodes = 0; while (iter.hasNext()) { INodeWithAdditionalFields n = iter.next(); if (!n.isDirectory()) { @@ -558,6 +728,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { refList.add(inode.asReference()); b.addRefChildren(refList.size() - 1); } + outputInodes ++; } INodeDirectorySection.DirEntry e = b.build(); e.writeDelimitedTo(out); @@ -567,9 +738,16 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } + if (outputInodes >= + FSImageFormatProtobuf.Saver.INODES_PER_SUB_SECTION) { + outputInodes = 0; + parent.commitSubSection(summary, + FSImageFormatProtobuf.SectionName.INODE_DIR_SUB); + } } - parent.commitSection(summary, - FSImageFormatProtobuf.SectionName.INODE_DIR); + parent.commitSectionAndSubSection(summary, + FSImageFormatProtobuf.SectionName.INODE_DIR, + FSImageFormatProtobuf.SectionName.INODE_DIR_SUB); } void serializeINodeSection(OutputStream out) throws IOException { @@ -589,8 +767,14 @@ void serializeINodeSection(OutputStream out) throws IOException { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } + if (i % FSImageFormatProtobuf.Saver.INODES_PER_SUB_SECTION == 0) { + parent.commitSubSection(summary, + FSImageFormatProtobuf.SectionName.INODE_SUB); + } } - parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE); + parent.commitSectionAndSubSection(summary, + FSImageFormatProtobuf.SectionName.INODE, + FSImageFormatProtobuf.SectionName.INODE_SUB); } void serializeFilesUCSection(OutputStream out) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index b887a1438e249..4926aa98bded4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -40,7 +40,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -150,6 +154,8 @@ public static final class Loader implements FSImageFormat.AbstractLoader { */ private final boolean requireSameLayoutVersion; + private File filename; + Loader(Configuration conf, FSNamesystem fsn, boolean requireSameLayoutVersion) { this.conf = conf; @@ -229,6 +235,7 @@ public String toString() { } void load(File file) throws IOException { + filename = file; long start = Time.monotonicNow(); DigestThread dt = new DigestThread(file); dt.start(); @@ -250,6 +257,73 @@ void load(File file) throws IOException { } } + /** + * Given a FSImage FileSummary.section, return a LimitInput stream set to + * the starting position of the section and limited to the section length + * @param section The FileSummary.Section containing the offset and length + * @param compressionCodec The compression codec in use, if any + * @return + * @throws IOException + */ + public InputStream getInputStreamForSection(FileSummary.Section section, + String compressionCodec) + throws IOException { + FileInputStream fin = new FileInputStream(filename); + FileChannel channel = fin.getChannel(); + channel.position(section.getOffset()); + InputStream in = new BufferedInputStream(new LimitInputStream(fin, + section.getLength())); + + in = FSImageUtil.wrapInputStreamForCompression(conf, + compressionCodec, in); + return in; + } + + /** + * Takes an ArrayList of Section's and removes all Section's whose + * name ends in _SUB, indicating they are sub-sections. The original + * array list is modified and a new list of the removed Section's is + * returned. + * @param sections Array List containing all Sections and Sub Sections + * in the image. + * @return ArrayList of the sections removed, or an empty list if none are + * removed. + */ + private ArrayList getAndRemoveSubSections( + ArrayList sections) { + ArrayList subSections = new ArrayList<>(); + Iterator iter = sections.iterator(); + while (iter.hasNext()) { + FileSummary.Section s = iter.next(); + String name = s.getName(); + if (name.matches(".*_SUB$")) { + subSections.add(s); + iter.remove(); + } + } + return subSections; + } + + /** + * Given an ArrayList of Section's, return all Section's with the given + * name, or an empty list if none are found. + * @param sections ArrayList of the Section's to search though + * @param name The name of the Sections to search for + * @return ArrayList of the sections matching the given name + */ + private ArrayList getSubSectionsOfName( + ArrayList sections, SectionName name) { + ArrayList subSec = new ArrayList<>(); + for (FileSummary.Section s : sections) { + String n = s.getName(); + SectionName sectionName = SectionName.fromString(n); + if (sectionName == name) { + subSec.add(s); + } + } + return subSec; + } + private void loadInternal(RandomAccessFile raFile, FileInputStream fin) throws IOException { if (!FSImageUtil.checkFileFormat(raFile)) { @@ -294,6 +368,19 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { * a particular step to be started for once. */ Step currentStep = null; + boolean loadInParallel = + conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT); + // TODO - check for compression and if enabled disable parallel + + ExecutorService executorService = null; + ArrayList subSections = + getAndRemoveSubSections(sections); + if (loadInParallel) { + executorService = Executors.newFixedThreadPool( + conf.getInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT)); + } for (FileSummary.Section s : sections) { channel.position(s.getOffset()); @@ -308,6 +395,8 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { if (sectionName == null) { throw new IOException("Unrecognized section " + n); } + + ArrayList stageSubSections; switch (sectionName) { case NS_INFO: loadNameSystemSection(in); @@ -318,14 +407,28 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { case INODE: { currentStep = new Step(StepType.INODES); prog.beginStep(Phase.LOADING_FSIMAGE, currentStep); - inodeLoader.loadINodeSection(in, prog, currentStep); + stageSubSections = getSubSectionsOfName( + subSections, SectionName.INODE_SUB); + if (loadInParallel && (stageSubSections.size() > 0)) { + inodeLoader.loadINodeSectionInParallel(executorService, + stageSubSections, summary.getCodec(), prog, currentStep); + } else { + inodeLoader.loadINodeSection(in, prog, currentStep); + } } break; case INODE_REFERENCE: snapshotLoader.loadINodeReferenceSection(in); break; case INODE_DIR: - inodeLoader.loadINodeDirectorySection(in); + stageSubSections = getSubSectionsOfName( + subSections, SectionName.INODE_DIR_SUB); + if (loadInParallel && stageSubSections.size() > 0) { + inodeLoader.loadINodeDirectorySectionInParallel(executorService, + stageSubSections, summary.getCodec()); + } else { + inodeLoader.loadINodeDirectorySection(in); + } break; case FILES_UNDERCONSTRUCTION: inodeLoader.loadFilesUnderConstructionSection(in); @@ -362,6 +465,9 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { break; } } + if (executorService != null) { + executorService.shutdown(); + } } private void loadNameSystemSection(InputStream in) throws IOException { @@ -452,10 +558,13 @@ private void loadErasureCodingSection(InputStream in) public static final class Saver { public static final int CHECK_CANCEL_INTERVAL = 4096; + public static boolean WRITE_SUB_SECTIONS = false; + public static int INODES_PER_SUB_SECTION = Integer.MAX_VALUE; private final SaveNamespaceContext context; private final SaverContext saverContext; private long currentOffset = FSImageUtil.MAGIC_HEADER.length; + private long subSectionOffset = currentOffset; private MD5Hash savedDigest; private FileChannel fileChannel; @@ -463,10 +572,12 @@ public static final class Saver { private OutputStream sectionOutputStream; private CompressionCodec codec; private OutputStream underlyingOutputStream; + private Configuration conf; - Saver(SaveNamespaceContext context) { + Saver(SaveNamespaceContext context, Configuration conf) { this.context = context; this.saverContext = new SaverContext(); + this.conf = conf; } public MD5Hash getSavedDigest() { @@ -481,6 +592,21 @@ public SaverContext getSaverContext() { return saverContext; } + /** + * Commit the length and offset of a fsimage section to the summary index, + * including the sub section, which will be committed before the section is + * committed. + * @param summary The image summary object + * @param name The name of the section to commit + * @param subSectionName The name of the sub-section to commit + * @throws IOException + */ + public void commitSectionAndSubSection(FileSummary.Builder summary, + SectionName name, SectionName subSectionName) throws IOException { + commitSubSection(summary, subSectionName); + commitSection(summary, name); + } + public void commitSection(FileSummary.Builder summary, SectionName name) throws IOException { long oldOffset = currentOffset; @@ -495,6 +621,35 @@ public void commitSection(FileSummary.Builder summary, SectionName name) summary.addSections(FileSummary.Section.newBuilder().setName(name.name) .setLength(length).setOffset(currentOffset)); currentOffset += length; + subSectionOffset = currentOffset; + } + + /** + * Commit the length and offset of a fsimage sub-section to the summary + * index. + * @param summary The image summary object + * @param name The name of the sub-section to commit + * @throws IOException + */ + public void commitSubSection(FileSummary.Builder summary, SectionName name) + throws IOException { + if (!WRITE_SUB_SECTIONS) { + return; + } + + LOG.debug("Saving a subsection for {}", name.toString()); + // The output stream must be flushed before the length is obtained + // as the flush can move the length forward. + sectionOutputStream.flush(); + long length = fileChannel.position() - subSectionOffset; + if (length == 0) { + LOG.warn("The requested section for {} is empty. It will not be " + + "output to the image", name.toString()); + return; + } + summary.addSections(FileSummary.Section.newBuilder().setName(name.name) + .setLength(length).setOffset(subSectionOffset)); + subSectionOffset += length; } private void flushSectionOutputStream() throws IOException { @@ -509,6 +664,7 @@ private void flushSectionOutputStream() throws IOException { * @throws IOException on fatal error. */ long save(File file, FSImageCompression compression) throws IOException { + enableSubSectionsIfRequired(); FileOutputStream fout = new FileOutputStream(file); fileChannel = fout.getChannel(); try { @@ -525,6 +681,60 @@ long save(File file, FSImageCompression compression) throws IOException { } } + private void enableSubSectionsIfRequired() { + boolean parallelEnabled = conf.getBoolean( + DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT); + int inodeThreshold = conf.getInt( + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT); + int targetSections = conf.getInt( + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT); + boolean compressionEnabled = conf.getBoolean( + DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, + DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT); + + + if (parallelEnabled) { + if (compressionEnabled) { + LOG.warn("Parallel Image loading is not supported when {} is set to" + + " true. Parallel loading will be disabled.", + DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY); + WRITE_SUB_SECTIONS = false; + return; + } + if (targetSections <= 0) { + LOG.warn("{} is set to {}. It must be greater than zero. Setting to" + + "default of {}", + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, + targetSections, + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT); + targetSections = + DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT; + } + if (inodeThreshold <= 0) { + LOG.warn("{} is set to {}. It must be greater than zero. Setting to" + + "default of {}", + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, + targetSections, + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT); + inodeThreshold = + DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT; + } + int inodeCount = context.getSourceNamesystem().dir.getInodeMapSize(); + // Only enable parallel sections if there are enough inodes + if (inodeCount >= inodeThreshold) { + WRITE_SUB_SECTIONS = true; + // Calculate the inodes per section rounded up to the nearest int + INODES_PER_SUB_SECTION = (inodeCount + targetSections - 1) / + targetSections; + } + } else { + WRITE_SUB_SECTIONS = false; + } + } + private static void saveFileSummary(OutputStream out, FileSummary summary) throws IOException { summary.writeDelimitedTo(out); @@ -737,11 +947,15 @@ public enum SectionName { EXTENDED_ACL("EXTENDED_ACL"), ERASURE_CODING("ERASURE_CODING"), INODE("INODE"), + INODE_SUB("INODE_SUB"), INODE_REFERENCE("INODE_REFERENCE"), + INODE_REFERENCE_SUB("INODE_REFERENCE_SUB"), SNAPSHOT("SNAPSHOT"), INODE_DIR("INODE_DIR"), + INODE_DIR_SUB("INODE_DIR_SUB"), FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"), SNAPSHOT_DIFF("SNAPSHOT_DIFF"), + SNAPSHOT_DIFF_SUB("SNAPSHOT_DIFF_SUB"), SECRET_MANAGER("SECRET_MANAGER"), CACHE_MANAGER("CACHE_MANAGER"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index 2157554cd62ed..997815b620cc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@ -529,9 +529,14 @@ public void serializeSnapshotDiffSection(OutputStream out) if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } + if (i % FSImageFormatProtobuf.Saver.INODES_PER_SUB_SECTION == 0) { + parent.commitSubSection(headers, + FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB); + } } - parent.commitSection(headers, - FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF); + parent.commitSectionAndSubSection(headers, + FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF, + FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB); } private void serializeFileDiffList(INodeFile file, OutputStream out) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java index 6f31b58d1e0a1..f6301146b0eaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java @@ -142,7 +142,8 @@ private HdfsDataOutputStream appendFileWithoutClosing(Path file, int length) private File saveFSImageToTempFile() throws IOException { SaveNamespaceContext context = new SaveNamespaceContext(fsn, txid, new Canceler()); - FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context); + FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context, + conf); FSImageCompression compression = FSImageCompression.createCompression(conf); File imageFile = getImageFile(testDir, txid); fsn.readLock(); From e36537018a124adfda0fe9c0b4929659a4a9a6ea Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 24 Jul 2019 10:09:10 +0100 Subject: [PATCH 02/11] Addressed most of the check style issues and the find bugs warning --- .../server/namenode/FSImageFormatPBINode.java | 23 ++++++++++--------- .../namenode/FSImageFormatProtobuf.java | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index e19dbf2003143..cc2ca4a019b97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -28,7 +28,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -237,7 +236,9 @@ public void run() { } finally { latch.countDown(); try { - ins.close(); + if (ins != null) { + ins.close(); + } } catch (IOException ioe) { LOG.warn("Failed to close the input stream, ignoring", ioe); } @@ -348,7 +349,7 @@ private int loadINodesInSection(InputStream in, Counter counter) dir.addToInodeMap(n); } } - cntr ++; + cntr++; if (counter != null) { counter.increment(); } @@ -386,11 +387,11 @@ void loadINodeSectionInParallel(ExecutorService service, } service.submit(new Runnable() { - public void run() { + public void run() { try { - totalLoaded.addAndGet(loadINodesInSection(ins, null)); - prog.setCount(Phase.LOADING_FSIMAGE, currentStep, - totalLoaded.get()); + totalLoaded.addAndGet(loadINodesInSection(ins, null)); + prog.setCount(Phase.LOADING_FSIMAGE, currentStep, + totalLoaded.get()); } catch (Exception e) { LOG.error("An exception occurred loading INodes in parallel", e); exceptions.add(new IOException(e)); @@ -434,15 +435,15 @@ void loadFilesUnderConstructionSection(InputStream in) throws IOException { } } - private boolean addToParent(INodeDirectory parent, INode child) { - if (parent == dir.rootDir && FSDirectory.isReservedName(child)) { + private boolean addToParent(INodeDirectory parentDir, INode child) { + if (parentDir == dir.rootDir && FSDirectory.isReservedName(child)) { throw new HadoopIllegalArgumentException("File name \"" + child.getLocalName() + "\" is reserved. Please " + " change the name of the existing file or directory to another " + "name before upgrading to this release."); } // NOTE: This does not update space counts for parents - if (!parent.addChildAtLoading(child)) { + if (!parentDir.addChildAtLoading(child)) { return false; } return true; @@ -728,7 +729,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { refList.add(inode.asReference()); b.addRefChildren(refList.size() - 1); } - outputInodes ++; + outputInodes++; } INodeDirectorySection.DirEntry e = b.build(); e.writeDelimitedTo(out); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 4926aa98bded4..4d55b4c78d0f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -259,7 +259,7 @@ void load(File file) throws IOException { /** * Given a FSImage FileSummary.section, return a LimitInput stream set to - * the starting position of the section and limited to the section length + * the starting position of the section and limited to the section length. * @param section The FileSummary.Section containing the offset and length * @param compressionCodec The compression codec in use, if any * @return From 06ee8c4997089fefb518f3343daed5a640e2f71c Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 24 Jul 2019 11:47:53 +0100 Subject: [PATCH 03/11] Added the new parameters to hdfs-default.xml --- .../src/main/resources/hdfs-default.xml | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 8b57fde2e53f6..f0bb8e871cfc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1385,6 +1385,54 @@ + + dfs.image.parallel.load + true + + If true, write sub-section entries to the fsimage index so it can + be loaded in parallel. Also controls whether parallel loading + will be used for an image previously created with sub-sections. + If the image contains sub-sections and this is set to false, + parallel loading will not be used. + + + + + dfs.image.parallel.target.sections + 12 + + Controls the number of sub-sections that will be written to + fsimage for each section. This should be larger than + dfs.image.parallel.threads, otherwise all threads will not be + used when loading. Ideally, have at least twice the number + of target sections as threads, so each thread must load more + than one section to avoid one long running section affecting + the load time. + + + + + dfs.image.parallel.inode.threshold + 1000000 + + If the image contains less inodes than this setting, then + do not write sub-sections and hence disable parallel loading. + This is because small images load very quickly in serial and + parallel loading is not needed. + + + + + dfs.image.parallel.threads + 4 + + The number of threads to use when dfs.image.parallel.load is + enabled. This setting should be less than + dfs.image.parallel.target.sections. The optimal number of + threads will depend on the hardware and environment. + + + dfs.edit.log.transfer.timeout 30000 From 7713b7a37740222d35ed592151b9d3558145cae5 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 24 Jul 2019 11:50:06 +0100 Subject: [PATCH 04/11] Addressed many of the comments from Hexiaoqiao in the initial PR --- .../server/namenode/FSImageFormatPBINode.java | 5 ++-- .../namenode/FSImageFormatProtobuf.java | 27 ++++++++++++------- .../snapshot/FSImageFormatPBSnapshot.java | 2 +- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index cc2ca4a019b97..ba0b7002ccd5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -739,8 +739,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } - if (outputInodes >= - FSImageFormatProtobuf.Saver.INODES_PER_SUB_SECTION) { + if (outputInodes >= parent.getInodesPerSubSection()) { outputInodes = 0; parent.commitSubSection(summary, FSImageFormatProtobuf.SectionName.INODE_DIR_SUB); @@ -768,7 +767,7 @@ void serializeINodeSection(OutputStream out) throws IOException { if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } - if (i % FSImageFormatProtobuf.Saver.INODES_PER_SUB_SECTION == 0) { + if (i % parent.getInodesPerSubSection() == 0) { parent.commitSubSection(summary, FSImageFormatProtobuf.SectionName.INODE_SUB); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 4d55b4c78d0f2..a80335f90750b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -262,7 +262,7 @@ void load(File file) throws IOException { * the starting position of the section and limited to the section length. * @param section The FileSummary.Section containing the offset and length * @param compressionCodec The compression codec in use, if any - * @return + * @return An InputStream for the given section * @throws IOException */ public InputStream getInputStreamForSection(FileSummary.Section section, @@ -558,8 +558,8 @@ private void loadErasureCodingSection(InputStream in) public static final class Saver { public static final int CHECK_CANCEL_INTERVAL = 4096; - public static boolean WRITE_SUB_SECTIONS = false; - public static int INODES_PER_SUB_SECTION = Integer.MAX_VALUE; + private boolean writeSubSections = false; + private int inodesPerSubSection = Integer.MAX_VALUE; private final SaveNamespaceContext context; private final SaverContext saverContext; @@ -592,6 +592,14 @@ public SaverContext getSaverContext() { return saverContext; } + public int getInodesPerSubSection() { + return inodesPerSubSection; + } + + public boolean shouldWriteSubSections() { + return writeSubSections; + } + /** * Commit the length and offset of a fsimage section to the summary index, * including the sub section, which will be committed before the section is @@ -633,7 +641,7 @@ public void commitSection(FileSummary.Builder summary, SectionName name) */ public void commitSubSection(FileSummary.Builder summary, SectionName name) throws IOException { - if (!WRITE_SUB_SECTIONS) { + if (!writeSubSections) { return; } @@ -695,13 +703,12 @@ private void enableSubSectionsIfRequired() { DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT); - if (parallelEnabled) { if (compressionEnabled) { LOG.warn("Parallel Image loading is not supported when {} is set to" + " true. Parallel loading will be disabled.", DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY); - WRITE_SUB_SECTIONS = false; + writeSubSections = false; return; } if (targetSections <= 0) { @@ -717,7 +724,7 @@ private void enableSubSectionsIfRequired() { LOG.warn("{} is set to {}. It must be greater than zero. Setting to" + "default of {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, - targetSections, + inodeThreshold, DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT); inodeThreshold = DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT; @@ -725,13 +732,13 @@ private void enableSubSectionsIfRequired() { int inodeCount = context.getSourceNamesystem().dir.getInodeMapSize(); // Only enable parallel sections if there are enough inodes if (inodeCount >= inodeThreshold) { - WRITE_SUB_SECTIONS = true; + writeSubSections = true; // Calculate the inodes per section rounded up to the nearest int - INODES_PER_SUB_SECTION = (inodeCount + targetSections - 1) / + inodesPerSubSection = (inodeCount + targetSections - 1) / targetSections; } } else { - WRITE_SUB_SECTIONS = false; + writeSubSections = false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index 997815b620cc0..cd5051dd3924c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@ -529,7 +529,7 @@ public void serializeSnapshotDiffSection(OutputStream out) if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { context.checkCancelled(); } - if (i % FSImageFormatProtobuf.Saver.INODES_PER_SUB_SECTION == 0) { + if (i % parent.getInodesPerSubSection() == 0) { parent.commitSubSection(headers, FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF_SUB); } From 30bfeeae4ad44bb46eca864cf39cd85dc8739113 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 24 Jul 2019 15:27:34 +0100 Subject: [PATCH 05/11] Moved runnables to lambda expressions --- .../server/namenode/FSImageFormatPBINode.java | 63 +++++++++---------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index ba0b7002ccd5f..c0d68542b3021 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -222,26 +222,24 @@ void loadINodeDirectorySectionInParallel(ExecutorService service, final CopyOnWriteArrayList exceptions = new CopyOnWriteArrayList<>(); for (FileSummary.Section s : sections) { - service.submit(new Runnable() { - public void run() { - InputStream ins = null; + service.submit(() -> { + InputStream ins = null; + try { + ins = parent.getInputStreamForSection(s, + compressionCodec); + loadINodeDirectorySection(ins); + } catch (Exception e) { + LOG.error("An exception occurred loading INodeDirectories in " + + "parallel", e); + exceptions.add(new IOException(e)); + } finally { + latch.countDown(); try { - ins = parent.getInputStreamForSection(s, - compressionCodec); - loadINodeDirectorySection(ins); - } catch (Exception e) { - LOG.error("An exception occurred loading INodeDirectories in " + - "parallel", e); - exceptions.add(new IOException(e)); - } finally { - latch.countDown(); - try { - if (ins != null) { - ins.close(); - } - } catch (IOException ioe) { - LOG.warn("Failed to close the input stream, ignoring", ioe); + if (ins != null) { + ins.close(); } + } catch (IOException ioe) { + LOG.warn("Failed to close the input stream, ignoring", ioe); } } }); @@ -385,23 +383,20 @@ void loadINodeSectionInParallel(ExecutorService service, // The first inode section has a header which must be processed first loadINodeSectionHeader(ins, prog, currentStep); } - - service.submit(new Runnable() { - public void run() { + service.submit(() -> { + try { + totalLoaded.addAndGet(loadINodesInSection(ins, null)); + prog.setCount(Phase.LOADING_FSIMAGE, currentStep, + totalLoaded.get()); + } catch (Exception e) { + LOG.error("An exception occurred loading INodes in parallel", e); + exceptions.add(new IOException(e)); + } finally { + latch.countDown(); try { - totalLoaded.addAndGet(loadINodesInSection(ins, null)); - prog.setCount(Phase.LOADING_FSIMAGE, currentStep, - totalLoaded.get()); - } catch (Exception e) { - LOG.error("An exception occurred loading INodes in parallel", e); - exceptions.add(new IOException(e)); - } finally { - latch.countDown(); - try { - ins.close(); - } catch (IOException ioe) { - LOG.warn("Failed to close the input stream, ignoring", ioe); - } + ins.close(); + } catch (IOException ioe) { + LOG.warn("Failed to close the input stream, ignoring", ioe); } } }); From 23efe96ad9345e0cf9cbf284a99694977e3e3561 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Thu, 25 Jul 2019 15:35:37 +0100 Subject: [PATCH 06/11] Added a test to ensure the parallel load works and check the sub-section offsets align when the image is created --- .../hdfs/server/namenode/FSImageTestUtil.java | 23 ++++ .../hdfs/server/namenode/TestFSImage.java | 112 +++++++++++++++++- 2 files changed, 134 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java index 985ab35ba1dcc..c82d317d88e32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java @@ -606,4 +606,27 @@ public static long getStorageTxId(NameNode node, URI storageUri) getStorageDirectory(storageUri); return NNStorage.readTransactionIdFile(sDir); } + + /** + * Returns the summary section from the latest fsimage stored on the cluster. + * This is effectively the image index which contains the offset of each + * section and subsection. + * @param cluster The cluster to load the image from + * @return The FileSummary section of the fsimage + * @throws IOException + */ + public static FsImageProto.FileSummary getLatestImageSummary( + MiniDFSCluster cluster) throws IOException { + RandomAccessFile raFile = null; + try { + File image = FSImageTestUtil.findLatestImageFile(FSImageTestUtil + .getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0)); + raFile = new RandomAccessFile(image, "r"); + return FSImageUtil.loadSummary(raFile); + } finally { + if (raFile != null) { + raFile.close(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 0beb7582e945f..0af1dcccc0fc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -32,8 +32,10 @@ import java.io.ByteArrayOutputStream; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.EnumSet; +import com.google.common.collect.Lists; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.Block; @@ -72,6 +74,8 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection; +import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary.Section; +import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; @@ -1000,4 +1004,110 @@ private boolean isPolicyEnabledInFsImage(ErasureCodingPolicy testPolicy) { } throw new AssertionError("Policy is not found!"); } -} + + private ArrayList
getSubSectionsOfName(ArrayList
sections, + FSImageFormatProtobuf.SectionName name) { + ArrayList
subSec = new ArrayList<>(); + for (Section s : sections) { + if (s.getName().equals(name.toString())) { + subSec.add(s); + } + } + return subSec; + } + + @Test + public void testParallelSaveAndLoad() throws IOException { + Configuration conf = new Configuration(); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4"); + conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4"); + + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create 10 directories, each containing 5 files + String baseDir = "/abc/def"; + for (int i=0; i<10; i++) { + Path dir = new Path(baseDir+"/"+i); + for (int j=0; j<5; j++) { + Path f = new Path(dir, Integer.toString(j)); + FSDataOutputStream os = fs.create(f); + os.write(1); + os.close(); + } + } + + // checkpoint + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNode(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + + // Ensure all the files created above exist, proving they were loaded + // correctly + for (int i=0; i<10; i++) { + Path dir = new Path(baseDir+"/"+i); + assertTrue(fs.getFileStatus(dir).isDirectory()); + for (int j=0; j<5; j++) { + Path f = new Path(dir, Integer.toString(j)); + assertTrue(fs.exists(f)); + } + } + + // Obtain the image summary section to check the sub-sections + // are being correctly created when the image is saved. + FsImageProto.FileSummary summary = FSImageTestUtil. + getLatestImageSummary(cluster); + ArrayList
sections = Lists.newArrayList( + summary.getSectionsList()); + + ArrayList
inodeSubSections = + getSubSectionsOfName(sections, SectionName.INODE_SUB); + ArrayList
dirSubSections = + getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB); + Section inodeSection = + getSubSectionsOfName(sections, SectionName.INODE).get(0); + Section dirSection = getSubSectionsOfName(sections, + SectionName.INODE_DIR).get(0); + + // Expect 4 sub-sections for inodes and directories as target Sections + // is 4 + assertEquals(4, inodeSubSections.size()); + assertEquals(4, dirSubSections.size()); + + // Expect the sub-section offset and lengths do not overlap and cover a + // continuous range of the file. They should also line up with the parent + ensureSubSectionsAlignWithParent(inodeSubSections, inodeSection); + ensureSubSectionsAlignWithParent(dirSubSections, dirSection); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void ensureSubSectionsAlignWithParent(ArrayList
subSec, + Section parent) { + // For each sub-section, check its offset + length == the next section + // offset + for (int i=0; i Date: Tue, 13 Aug 2019 16:58:01 +0100 Subject: [PATCH 07/11] Removed whitespace at end of line --- .../hadoop-hdfs/src/main/resources/hdfs-default.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index f0bb8e871cfc8..50d81f144153a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1427,7 +1427,7 @@ 4 The number of threads to use when dfs.image.parallel.load is - enabled. This setting should be less than + enabled. This setting should be less than dfs.image.parallel.target.sections. The optimal number of threads will depend on the hardware and environment. From fb27e281fdaf084c32d1882283648f7243a30ab8 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Tue, 13 Aug 2019 17:28:25 +0100 Subject: [PATCH 08/11] Addressed intitial set of review comments from Wei-Chiu --- .../namenode/FSImageFormatProtobuf.java | 40 ++++++++++--------- .../src/main/resources/hdfs-default.xml | 3 ++ 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index a80335f90750b..58a946d2dc5a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -368,10 +368,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { * a particular step to be started for once. */ Step currentStep = null; - boolean loadInParallel = - conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, - DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT); - // TODO - check for compression and if enabled disable parallel + boolean loadInParallel = enableParallelSaveAndLoad(conf); ExecutorService executorService = null; ArrayList subSections = @@ -556,6 +553,25 @@ private void loadErasureCodingSection(InputStream in) } } + private static boolean enableParallelSaveAndLoad(Configuration conf) { + boolean loadInParallel = + conf.getBoolean(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT); + boolean compressionEnabled = conf.getBoolean( + DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, + DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT); + + if (loadInParallel) { + if (compressionEnabled) { + LOG.warn("Parallel Image loading and saving is not supported when {}" + + " is set to true. Parallel will be disabled.", + DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY); + loadInParallel = false; + } + } + return loadInParallel; + } + public static final class Saver { public static final int CHECK_CANCEL_INTERVAL = 4096; private boolean writeSubSections = false; @@ -690,30 +706,18 @@ long save(File file, FSImageCompression compression) throws IOException { } private void enableSubSectionsIfRequired() { - boolean parallelEnabled = conf.getBoolean( - DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, - DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT); + boolean parallelEnabled = enableParallelSaveAndLoad(conf); int inodeThreshold = conf.getInt( DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT); int targetSections = conf.getInt( DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT); - boolean compressionEnabled = conf.getBoolean( - DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, - DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT); if (parallelEnabled) { - if (compressionEnabled) { - LOG.warn("Parallel Image loading is not supported when {} is set to" + - " true. Parallel loading will be disabled.", - DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY); - writeSubSections = false; - return; - } if (targetSections <= 0) { LOG.warn("{} is set to {}. It must be greater than zero. Setting to" + - "default of {}", + " default of {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, targetSections, DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 50d81f144153a..6decc6a34f594 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1394,6 +1394,9 @@ will be used for an image previously created with sub-sections. If the image contains sub-sections and this is set to false, parallel loading will not be used. + Parallel loading is not compatible with image compression, + so if dfs.image.compress is set to true this setting will be + ignored and no parallel loading will occur. From 997f6a269f602eeaf5a98a6bd4d484052ac1e606 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 14 Aug 2019 08:56:22 +0100 Subject: [PATCH 09/11] Added a unit test to verify parallel is disabled if image compression is used --- .../hdfs/server/namenode/TestFSImage.java | 112 ++++++++++++------ 1 file changed, 77 insertions(+), 35 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 0af1dcccc0fc2..793a749be21c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -1016,51 +1016,58 @@ private ArrayList
getSubSectionsOfName(ArrayList
sections, return subSec; } - @Test - public void testParallelSaveAndLoad() throws IOException { - Configuration conf = new Configuration(); + private MiniDFSCluster createAndLoadParallelFSImage(Configuration conf) + throws IOException { conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY, "true"); conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, "1"); conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY, "4"); conf.set(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, "4"); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster.Builder(conf).build(); - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); - // Create 10 directories, each containing 5 files - String baseDir = "/abc/def"; - for (int i=0; i<10; i++) { - Path dir = new Path(baseDir+"/"+i); - for (int j=0; j<5; j++) { - Path f = new Path(dir, Integer.toString(j)); - FSDataOutputStream os = fs.create(f); - os.write(1); - os.close(); - } + // Create 10 directories, each containing 5 files + String baseDir = "/abc/def"; + for (int i=0; i<10; i++) { + Path dir = new Path(baseDir+"/"+i); + for (int j=0; j<5; j++) { + Path f = new Path(dir, Integer.toString(j)); + FSDataOutputStream os = fs.create(f); + os.write(1); + os.close(); } + } - // checkpoint - fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); - fs.saveNamespace(); - fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); - - cluster.restartNameNode(); - cluster.waitActive(); - fs = cluster.getFileSystem(); + // checkpoint + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); - // Ensure all the files created above exist, proving they were loaded - // correctly - for (int i=0; i<10; i++) { - Path dir = new Path(baseDir+"/"+i); - assertTrue(fs.getFileStatus(dir).isDirectory()); - for (int j=0; j<5; j++) { - Path f = new Path(dir, Integer.toString(j)); - assertTrue(fs.exists(f)); - } + cluster.restartNameNode(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + + // Ensure all the files created above exist, proving they were loaded + // correctly + for (int i=0; i<10; i++) { + Path dir = new Path(baseDir+"/"+i); + assertTrue(fs.getFileStatus(dir).isDirectory()); + for (int j=0; j<5; j++) { + Path f = new Path(dir, Integer.toString(j)); + assertTrue(fs.exists(f)); } + } + return cluster; + } + + @Test + public void testParallelSaveAndLoad() throws IOException { + Configuration conf = new Configuration(); + + MiniDFSCluster cluster = null; + try { + cluster = createAndLoadParallelFSImage(conf); // Obtain the image summary section to check the sub-sections // are being correctly created when the image is saved. @@ -1094,6 +1101,41 @@ public void testParallelSaveAndLoad() throws IOException { } } + @Test + public void testNoParallelSectionsWithCompressionEnabled() + throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); + conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, + "org.apache.hadoop.io.compress.GzipCodec"); + + MiniDFSCluster cluster = null; + try { + cluster = createAndLoadParallelFSImage(conf); + + // Obtain the image summary section to check the sub-sections + // are being correctly created when the image is saved. + FsImageProto.FileSummary summary = FSImageTestUtil. + getLatestImageSummary(cluster); + ArrayList
sections = Lists.newArrayList( + summary.getSectionsList()); + + ArrayList
inodeSubSections = + getSubSectionsOfName(sections, SectionName.INODE_SUB); + ArrayList
dirSubSections = + getSubSectionsOfName(sections, SectionName.INODE_DIR_SUB); + + // As compression is enabled, there should be no sub-sections in the + // image header + assertEquals(0, inodeSubSections.size()); + assertEquals(0, dirSubSections.size()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + private void ensureSubSectionsAlignWithParent(ArrayList
subSec, Section parent) { // For each sub-section, check its offset + length == the next section From a8957569277c97d327191d301f07c78cfd970475 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 14 Aug 2019 11:31:57 +0100 Subject: [PATCH 10/11] Addressed second set of review comments from Wei-Chiu --- .../server/namenode/FSImageFormatPBINode.java | 12 ++++++-- .../namenode/FSImageFormatProtobuf.java | 29 ++++++++++++++++--- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index c0d68542b3021..7f3102131d99c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -95,6 +95,8 @@ public final class FSImageFormatPBINode { private static final Logger LOG = LoggerFactory.getLogger(FSImageFormatPBINode.class); + private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000; + // the loader must decode all fields referencing serial number based fields // via to methods with the string table. public final static class Loader { @@ -276,10 +278,13 @@ void loadINodeDirectorySection(InputStream in) throws IOException { if (child.isFile()) { inodeList.add(child); } - if (inodeList.size() >= 1000) { + if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { addToCacheAndBlockMap(inodeList); inodeList.clear(); } + } else { + LOG.warn("Failed to add the inode {} to the directory {}", + child.getId(), p.getId()); } } @@ -289,10 +294,13 @@ void loadINodeDirectorySection(InputStream in) throws IOException { if (ref.isFile()) { inodeList.add(ref); } - if (inodeList.size() >= 1000) { + if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { addToCacheAndBlockMap(inodeList); inodeList.clear(); } + } else { + LOG.warn("Failed to add the inode reference {} to the directory {}", + ref.getId(), p.getId()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 58a946d2dc5a3..3144d4b17cfa6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -324,6 +324,29 @@ private ArrayList getSubSectionsOfName( return subSec; } + /** + * Checks the number of threads configured for parallel loading and + * return an ExecutorService with configured number of threads. If the + * thread count is set to less than 1, it will be reset to the default + * value + * @return ExecutorServie with the correct number of threads + */ + private ExecutorService getParallelExecutorService() { + int threads = conf.getInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, + DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT); + if (threads < 1) { + LOG.warn("Parallel is enabled and {} is set to {}. Setting to the " + + "default value {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, + threads, DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT); + threads = DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT; + } + ExecutorService executorService = Executors.newFixedThreadPool( + threads); + LOG.info("The fsimage will be loaded in parallel using {} threads", + threads); + return executorService; + } + private void loadInternal(RandomAccessFile raFile, FileInputStream fin) throws IOException { if (!FSImageUtil.checkFileFormat(raFile)) { @@ -374,9 +397,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) { ArrayList subSections = getAndRemoveSubSections(sections); if (loadInParallel) { - executorService = Executors.newFixedThreadPool( - conf.getInt(DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_KEY, - DFSConfigKeys.DFS_IMAGE_PARALLEL_THREADS_DEFAULT)); + executorService = getParallelExecutorService(); } for (FileSummary.Section s : sections) { @@ -726,7 +747,7 @@ private void enableSubSectionsIfRequired() { } if (inodeThreshold <= 0) { LOG.warn("{} is set to {}. It must be greater than zero. Setting to" + - "default of {}", + " default of {}", DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY, inodeThreshold, DFSConfigKeys.DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT); From 654be0396be0b7fb95ec507df34ab73328181c3b Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Wed, 14 Aug 2019 15:22:23 +0100 Subject: [PATCH 11/11] Throw an exception if the loaded inode count is not equal to the expected count --- .../hdfs/server/namenode/FSImageFormatPBINode.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 7f3102131d99c..d84e8c5b1fc00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -364,13 +364,14 @@ private int loadINodesInSection(InputStream in, Counter counter) } - private void loadINodeSectionHeader(InputStream in, StartupProgress prog, + private long loadINodeSectionHeader(InputStream in, StartupProgress prog, Step currentStep) throws IOException { INodeSection s = INodeSection.parseDelimitedFrom(in); fsn.dir.resetLastInodeId(s.getLastInodeId()); long numInodes = s.getNumInodes(); LOG.info("Loading " + numInodes + " INodes."); prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); + return numInodes; } void loadINodeSectionInParallel(ExecutorService service, @@ -379,6 +380,7 @@ void loadINodeSectionInParallel(ExecutorService service, Step currentStep) throws IOException { LOG.info("Loading the INode section in parallel with {} sub-sections", sections.size()); + long expectedInodes = 0; CountDownLatch latch = new CountDownLatch(sections.size()); AtomicInteger totalLoaded = new AtomicInteger(0); final CopyOnWriteArrayList exceptions = @@ -389,7 +391,7 @@ void loadINodeSectionInParallel(ExecutorService service, InputStream ins = parent.getInputStreamForSection(s, compressionCodec); if (i == 0) { // The first inode section has a header which must be processed first - loadINodeSectionHeader(ins, prog, currentStep); + expectedInodes = loadINodeSectionHeader(ins, prog, currentStep); } service.submit(() -> { try { @@ -418,7 +420,11 @@ void loadINodeSectionInParallel(ExecutorService service, LOG.error("{} exceptions occurred loading INodes", exceptions.size()); throw exceptions.get(0); } - // TODO - should we fail if total_loaded != total_expected? + if (totalLoaded.get() != expectedInodes) { + throw new IOException("Expected to load "+expectedInodes+" in " + + "parallel, but loaded "+totalLoaded.get()+". The image may " + + "be corrupt."); + } LOG.info("Completed loading all INode sections. Loaded {} inodes.", totalLoaded.get()); }