diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java new file mode 100644 index 0000000000000..fd98391dfa3af --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +/** + * LatchLock controls two hierarchical Read/Write locks: + * the topLock and the childLock. + * Typically an operation starts with the topLock already acquired. + * To acquire child lock LatchLock will + * first acquire the childLock, and then release the topLock. + */ +public abstract class LatchLock { + // Interfaces methods to be defined for subclasses + /** @return true topLock is locked for read by any thread */ + protected abstract boolean isReadTopLocked(); + /** @return true topLock is locked for write by any thread */ + protected abstract boolean isWriteTopLocked(); + protected abstract void readTopUnlock(); + protected abstract void writeTopUnlock(); + + protected abstract boolean hasReadChildLock(); + protected abstract void readChildLock(); + protected abstract void readChildUnlock(); + + protected abstract boolean hasWriteChildLock(); + protected abstract void writeChildLock(); + protected abstract void writeChildUnlock(); + + protected abstract LatchLock clone(); + + // Public APIs to use with the class + public void readLock() { + readChildLock(); + readTopUnlock(); + } + + public void readUnlock() { + readChildUnlock(); + } + + public void writeLock() { + writeChildLock(); + writeTopUnlock(); + } + + public void writeUnlock() { + writeChildUnlock(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java new file mode 100644 index 0000000000000..f493402959031 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.NoSuchElementException; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.LightWeightGSet.LinkedElement; + +/** + * An implementation of {@link GSet}, which splits a collection of elements + * into partitions each corresponding to a range of keys. + * + * This class does not support null element. + * + * This class is backed up by LatchLock for hierarchical synchronization. + * + * @param Key type for looking up the elements + * @param Element type, which must be + * (1) a subclass of K, and + * (2) implementing {@link LinkedElement} interface. + */ +@InterfaceAudience.Private +public class PartitionedGSet implements GSet { + + private static final int DEFAULT_PARTITION_CAPACITY = 65536; // 4096; // 5120; // 2048; // 1027; + private static final float DEFAULT_PARTITION_OVERFLOW = 1.8f; + + /** + * An ordered map of contiguous segments of elements. + * Each key in the map represent the smallest key in the mapped segment, + * so that all elements in this segment are >= the mapping key, + * but are smaller then the next key in the map. + * Elements within a partition do not need to be ordered. + */ + private final NavigableMap partitions; + private LatchLock latchLock; + + /** + * The number of elements in the set. + */ + protected volatile int size; + + /** + * A single partition of the {@link PartitionedGSet}. + * Consists of a hash table {@link LightWeightGSet} and a lock, which + * controls access to this partition independently on the other ones. + */ + public class PartitionEntry extends LightWeightGSet { + private final LatchLock partLock; + + PartitionEntry(int defaultPartitionCapacity) { + super(defaultPartitionCapacity); + this.partLock = latchLock.clone(); + } + } + + public PartitionedGSet(final int capacity, + final Comparator comparator, + final LatchLock latchLock) { + this.partitions = new TreeMap(comparator); + this.latchLock = latchLock; + // addNewPartition(rootKey).put(rootKey); + // this.size = 1; + this.size = 0; + LOG.info("Partition capacity = {}", DEFAULT_PARTITION_CAPACITY); + LOG.info("Partition overflow factor = {}", DEFAULT_PARTITION_OVERFLOW); + } + + /** + * Creates new empty partition. + * @param key + * @return + */ + public PartitionEntry addNewPartition(final K key) { + Entry lastEntry = partitions.lastEntry(); + PartitionEntry lastPart = null; + if(lastEntry != null) + lastPart = lastEntry.getValue(); + + PartitionEntry newPart = + new PartitionEntry(DEFAULT_PARTITION_CAPACITY); + // assert size == 0 || newPart.partLock.isWriteTopLocked() : + // "Must hold write Lock: key = " + key; + PartitionEntry oldPart = partitions.put(key, newPart); + assert oldPart == null : + "RangeMap already has a partition associated with " + key; + + LOG.debug("Total GSet size = {}", size); + LOG.debug("Number of partitions = {}", partitions.size()); + LOG.debug("Previous partition size = {}", + lastPart == null ? 0 : lastPart.size()); + + return newPart; + } + + @Override + public int size() { + return size; + } + + public PartitionEntry getPartition(final K key) { + Entry partEntry = partitions.floorEntry(key); + if(partEntry == null) { + return null; + } + PartitionEntry part = partEntry.getValue(); + if(part == null) { + throw new IllegalStateException("Null partition for key: " + key); + } + assert size == 0 || part.partLock.isReadTopLocked() || + part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key; + return part; + } + + @Override + public boolean contains(final K key) { + PartitionEntry part = getPartition(key); + if(part == null) { + return false; + } + return part.contains(key); + } + + @Override + public E get(final K key) { + PartitionEntry part = getPartition(key); + if(part == null) { + return null; + } + LOG.debug("get key: {}", key); + // part.partLock.readLock(); + return part.get(key); + } + + @Override + public E put(final E element) { + K key = element; + PartitionEntry part = getPartition(key); + if(part == null) { + throw new HadoopIllegalArgumentException("Illegal key: " + key); + } + assert size == 0 || part.partLock.isWriteTopLocked() || + part.partLock.hasWriteChildLock() : + "Must hold write Lock: key = " + key; + LOG.debug("put key: {}", key); + PartitionEntry newPart = addNewPartitionIfNeeded(part, key); + if(newPart != part) { + newPart.partLock.writeChildLock(); + part = newPart; + } + E result = part.put(element); + if(result == null) { // new element + size++; + LOG.debug("partitionPGSet.put: added key {}, size is now {} ", key, size); + } else { + LOG.debug("partitionPGSet.put: replaced key {}, size is now {}", + key, size); + } + return result; + } + + private PartitionEntry addNewPartitionIfNeeded( + PartitionEntry curPart, K key) { + if(curPart.size() < DEFAULT_PARTITION_CAPACITY * DEFAULT_PARTITION_OVERFLOW + || curPart.contains(key)) { + return curPart; + } + return addNewPartition(key); + } + + @Override + public E remove(final K key) { + PartitionEntry part = getPartition(key); + if(part == null) { + return null; + } + E result = part.remove(key); + if(result != null) { + size--; + } + return result; + } + + @Override + public void clear() { + LOG.error("Total GSet size = {}", size); + LOG.error("Number of partitions = {}", partitions.size()); + printStats(); + // assert latchLock.hasWriteTopLock() : "Must hold write topLock"; + // SHV May need to clear all partitions? + partitions.clear(); + size = 0; + } + + private void printStats() { + int partSizeMin = Integer.MAX_VALUE, partSizeAvg = 0, partSizeMax = 0; + long totalSize = 0; + int numEmptyPartitions = 0, numFullPartitions = 0; + Collection parts = partitions.values(); + Set> entries = partitions.entrySet(); + int i = 0; + for(Entry e : entries) { + PartitionEntry part = e.getValue(); + int s = part.size; + if(s == 0) numEmptyPartitions++; + if(s > DEFAULT_PARTITION_CAPACITY) numFullPartitions++; + totalSize += s; + partSizeMin = (s < partSizeMin ? s : partSizeMin); + partSizeMax = (partSizeMax < s ? s : partSizeMax); + Class inodeClass = e.getKey().getClass(); + try { + long[] key = (long[]) inodeClass. + getMethod("getNamespaceKey", int.class).invoke(e.getKey(), 2); + long[] firstKey = new long[key.length]; + if(part.iterator().hasNext()) { + Object first = part.iterator().next(); + long[] firstKeyRef = (long[]) inodeClass.getMethod( + "getNamespaceKey", int.class).invoke(first, 2); + Object parent = inodeClass. + getMethod("getParent").invoke(first); + long parentId = (parent == null ? 0L : + (long) inodeClass.getMethod("getId").invoke(parent)); + for (int j=0; j < key.length; j++) { + firstKey[j] = firstKeyRef[j]; + } + firstKey[0] = parentId; + } + LOG.error("Partition #{}\t key: {}\t size: {}\t first: {}", + i++, key, s, firstKey); // SHV should be info + } catch (NoSuchElementException ex) { + LOG.error("iterator.next() throws NoSuchElementException."); + throw ex; + } catch (Exception ex) { + LOG.error("Cannot find Method getNamespaceKey() in {}", inodeClass); + } + } + partSizeAvg = (int) (totalSize / parts.size()); + LOG.error("Partition sizes: min = {}, avg = {}, max = {}, sum = {}", + partSizeMin, partSizeAvg, partSizeMax, totalSize); + LOG.error("Number of partitions: empty = {}, in-use = {}, full = {}", + numEmptyPartitions, parts.size()-numEmptyPartitions, numFullPartitions); + } + + @Override + public Collection values() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Iterator iterator() { + return new EntryIterator(); + } + + /** + * Iterator over the elements in the set. + * Iterates first by keys, then inside the partition + * corresponding to the key. + * + * Modifications are tracked by the underlying collections. We allow + * modifying other partitions, while iterating through the current one. + */ + private class EntryIterator implements Iterator { + private Iterator keyIterator; + private Iterator partitionIterator; + + // Set partitionIterator to point to the first partition, or set it to null + // when there is no partitions created for this PartitionedGSet. + public EntryIterator() { + keyIterator = partitions.keySet().iterator(); + + if (!keyIterator.hasNext()) { + partitionIterator = null; + return; + } + + K firstKey = keyIterator.next(); + partitionIterator = partitions.get(firstKey).iterator(); + } + + @Override + public boolean hasNext() { + + // Special case: an iterator was created for an empty PartitionedGSet. + // Check whether new partitions have been added since then. + if (partitionIterator == null) { + if (partitions.size() == 0) { + return false; + } else { + keyIterator = partitions.keySet().iterator(); + K nextKey = keyIterator.next(); + partitionIterator = partitions.get(nextKey).iterator(); + } + } + + while(!partitionIterator.hasNext()) { + if(!keyIterator.hasNext()) { + return false; + } + K curKey = keyIterator.next(); + partitionIterator = getPartition(curKey).iterator(); + } + return partitionIterator.hasNext(); + } + + @Override + public E next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements in this set."); + } + return partitionIterator.next(); + } + } + + public void latchWriteLock(K[] keys) { + // getPartition(parent).partLock.writeChildLock(); + LatchLock pLock = null; + for(K key : keys) { + pLock = getPartition(key).partLock; + pLock.writeChildLock(); + } + assert pLock != null : "pLock is null"; + pLock.writeTopUnlock(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java new file mode 100644 index 0000000000000..9ae772c25deb1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestPartitionedGSet.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.util.LightWeightGSet.LinkedElement; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Testing {@link PartitionedGSet} */ +public class TestPartitionedGSet { + public static final Logger LOG = + LoggerFactory.getLogger(TestPartitionedGSet.class); + private static final int ELEMENT_NUM = 100; + + /** + * Generate positive random numbers for testing. We want to use only positive + * numbers because the smallest partition used in testing is 0. + * + * @param length + * number of random numbers to be generated. + * + * @param randomSeed + * seed to be used for random number generator. + * + * @return + * An array of Integers + */ + private static ArrayList getRandomList(int length, int randomSeed) { + Random random = new Random(randomSeed); + ArrayList list = new ArrayList(length); + for (int i = 0; i < length; i++) { + list.add(random.nextInt(Integer.MAX_VALUE)); + } + return list; + } + + private static class TestElement implements LinkedElement { + private final int val; + private LinkedElement next; + + TestElement(int val) { + this.val = val; + this.next = null; + } + + public int getVal() { + return val; + } + + @Override + public void setNext(LinkedElement next) { + this.next = next; + } + + @Override + public LinkedElement getNext() { + return next; + } + } + + private static class TestElementComparator implements Comparator + { + @Override + public int compare(TestElement e1, TestElement e2) { + if (e1 == null || e2 == null) { + throw new NullPointerException("Cannot compare null elements"); + } + + return e1.getVal() - e2.getVal(); + } + } + + protected ReentrantReadWriteLock topLock = + new ReentrantReadWriteLock(false); + /** + * We are NOT testing any concurrent access to a PartitionedGSet here. + */ + private class NoOpLock extends LatchLock { + private ReentrantReadWriteLock childLock; + + public NoOpLock() { + childLock = new ReentrantReadWriteLock(false); + } + + @Override + protected boolean isReadTopLocked() { + return topLock.getReadLockCount() > 0 || isWriteTopLocked(); + } + + @Override + protected boolean isWriteTopLocked() { + return topLock.isWriteLocked(); + } + + @Override + protected void readTopUnlock() { + topLock.readLock().unlock(); + } + + @Override + protected void writeTopUnlock() { + topLock.writeLock().unlock(); + } + + @Override + protected boolean hasReadChildLock() { + return childLock.getReadLockCount() > 0 || hasWriteChildLock(); + } + + @Override + protected void readChildLock() { + childLock.readLock().lock(); + } + + @Override + protected void readChildUnlock() { + childLock.readLock().unlock(); + } + + @Override + protected boolean hasWriteChildLock() { + return childLock.isWriteLockedByCurrentThread(); + } + + @Override + protected void writeChildLock() { + childLock.writeLock().lock(); + } + + @Override + protected void writeChildUnlock() { + childLock.writeLock().unlock(); + } + + @Override + protected LatchLock clone() { + return new NoOpLock(); + } + } + + /** + * Test iterator for a PartitionedGSet with no partitions. + */ + @Test(timeout=60000) + public void testIteratorForNoPartition() { + PartitionedGSet set = + new PartitionedGSet( + 16, new TestElementComparator(), new NoOpLock()); + + topLock.readLock().lock(); + int count = 0; + Iterator iter = set.iterator(); + while( iter.hasNext() ) { + iter.next(); + count ++; + } + topLock.readLock().unlock(); + Assert.assertEquals(0, count); + } + + /** + * Test iterator for a PartitionedGSet with empty partitions. + */ + @Test(timeout=60000) + public void testIteratorForEmptyPartitions() { + PartitionedGSet set = + new PartitionedGSet( + 16, new TestElementComparator(), new NoOpLock()); + + set.addNewPartition(new TestElement(0)); + set.addNewPartition(new TestElement(1000)); + set.addNewPartition(new TestElement(2000)); + + topLock.readLock().lock(); + int count = 0; + Iterator iter = set.iterator(); + while( iter.hasNext() ) { + iter.next(); + count ++; + } + topLock.readLock().unlock(); + Assert.assertEquals(0, count); + } + + /** + * Test whether the iterator can return the same number of elements as stored + * into the PartitionedGSet. + */ + @Test(timeout=60000) + public void testIteratorCountElements() { + ArrayList list = getRandomList(ELEMENT_NUM, 123); + PartitionedGSet set = + new PartitionedGSet( + 16, new TestElementComparator(), new NoOpLock()); + + set.addNewPartition(new TestElement(0)); + set.addNewPartition(new TestElement(1000)); + set.addNewPartition(new TestElement(2000)); + + topLock.writeLock().lock(); + for (Integer i : list) { + set.put(new TestElement(i)); + } + topLock.writeLock().unlock(); + + topLock.readLock().lock(); + int count = 0; + Iterator iter = set.iterator(); + while( iter.hasNext() ) { + iter.next(); + count ++; + } + topLock.readLock().unlock(); + Assert.assertEquals(ELEMENT_NUM, count); + } + + /** + * Test iterator when it is created before partitions/elements are + * added to the PartitionedGSet. + */ + @Test(timeout=60000) + public void testIteratorAddElementsAfterIteratorCreation() { + PartitionedGSet set = + new PartitionedGSet( + 16, new TestElementComparator(), new NoOpLock()); + + // Create the iterator before partitions are added. + Iterator iter = set.iterator(); + + set.addNewPartition(new TestElement(0)); + set.addNewPartition(new TestElement(1000)); + set.addNewPartition(new TestElement(2000)); + + // Added one element + topLock.writeLock().lock(); + set.put(new TestElement(2500)); + topLock.writeLock().unlock(); + + topLock.readLock().lock(); + int count = 0; + while( iter.hasNext() ) { + iter.next(); + count ++; + } + topLock.readLock().unlock(); + Assert.assertEquals(1, count); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java index da324fb46738a..e8c3feb12cbc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.nio.charset.StandardCharsets; import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -69,18 +70,7 @@ static FileStatus mkdirs(FSNamesystem fsn, FSPermissionChecker pc, String src, // create multiple inodes. fsn.checkFsObjectLimit(); - // Ensure that the user can traversal the path by adding implicit - // u+wx permission to all ancestor directories. - INodesInPath existing = - createParentDirectories(fsd, iip, permissions, false); - if (existing != null) { - existing = createSingleDirectory( - fsd, existing, iip.getLastLocalName(), permissions); - } - if (existing == null) { - throw new IOException("Failed to create directory: " + src); - } - iip = existing; + iip = createMissingDirs(fsd, iip, permissions, false); } return fsd.getAuditFileInfo(iip); } finally { @@ -88,6 +78,36 @@ static FileStatus mkdirs(FSNamesystem fsn, FSPermissionChecker pc, String src, } } + static INodesInPath createMissingDirs(FSDirectory fsd, INodesInPath iip, + PermissionStatus permissions, boolean inheritPerms) throws IOException { + PermissionStatus basePerm = inheritPerms ? + iip.getExistingINodes().getLastINode().getPermissionStatus() : + permissions; + // create all missing directories along the path, + // but don't add them to the INodeMap yet + permissions = addImplicitUwx(basePerm, permissions); // SHV !!! + INode[] missing = createPathDirectories(fsd, iip, permissions); + iip = iip.getExistingINodes(); + if (missing.length == 0) { + return iip; + } + // switch the locks + fsd.getINodeMap().latchWriteLock(iip, missing); + int counter = 0; + // Add missing inodes to the INodeMap + for (INode dir : missing) { + if (counter++ == missing.length - 1) { + //Last folder in the path, use the user given permission + //For MKDIR - refers to the permission given by the user + //For create - refers to the parent directory permission. + permissions = basePerm; + } + iip = addSingleDirectory(fsd, iip, dir, permissions); + assert iip != null : "iip should not be null"; + } + return iip; + } + /** * For a given absolute path, create all ancestors as directories along the * path. All ancestors inherit their parent's permission plus an implicit @@ -132,6 +152,7 @@ private static INodesInPath createParentDirectories(FSDirectory fsd, if (missing == 0) { // full path exists, return parents. existing = iip.getParentINodesInPath(); } else if (missing > 1) { // need to create at least one ancestor dir. + FSNamesystem.LOG.error("missing = " + missing); // Ensure that the user can traversal the path by adding implicit // u+wx permission to all ancestor directories. PermissionStatus basePerm = inheritPerms @@ -143,6 +164,14 @@ private static INodesInPath createParentDirectories(FSDirectory fsd, for (int i = existing.length(); existing != null && i <= last; i++) { byte[] component = iip.getPathComponent(i); existing = createSingleDirectory(fsd, existing, component, perm); + if(existing == null) { + FSNamesystem.LOG.error("unprotectedMkdir returned null for " + + iip.getPath() + " for " + + new String(component, StandardCharsets.US_ASCII) + " i = " + i); + // Somebody already created the parent. Recalculate existing + existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents()); + i = existing.length() - 1; + } } } return existing; @@ -228,5 +257,73 @@ private static INodesInPath unprotectedMkdir(FSDirectory fsd, long inodeId, } return iip; } + + private static INode createDirectoryINode(FSDirectory fsd, + INodesInPath parent, byte[] name, PermissionStatus permission) + throws FileAlreadyExistsException { + assert fsd.hasReadLock(); + assert parent.getLastINode() != null; + if (!parent.getLastINode().isDirectory()) { + throw new FileAlreadyExistsException("Parent path is not a directory: " + + parent.getPath() + " " + DFSUtil.bytes2String(name)); + } + final INodeDirectory dir = new INodeDirectory( + fsd.allocateNewInodeId(), name, permission, now()); + return dir; + } + + /** + * Find-out missing iNodes for the current mkdir OP. + */ + private static INode[] createPathDirectories(FSDirectory fsd, + INodesInPath iip, PermissionStatus perm) + throws IOException { + assert fsd.hasWriteLock(); + INodesInPath existing = iip.getExistingINodes(); + assert existing != null : "existing should not be null"; + int numMissing = iip.length() - existing.length(); + if (numMissing == 0) { // full path exists + return new INode[0]; + } + + // create the missing directories along the path + INode[] missing = new INode[numMissing]; + final int last = iip.length(); +// INode parent = existing.getLastINode(); + for (int i = existing.length(); i < last; i++) { + byte[] component = iip.getPathComponent(i); + missing[i - existing.length()] = + createDirectoryINode(fsd, existing, component, perm); +// missing[i - existing.length()].setParent(parent.asDirectory()); +// parent = missing[i - existing.length()]; + } + return missing; + } + + private static INodesInPath addSingleDirectory(FSDirectory fsd, + INodesInPath existing, INode dir, PermissionStatus perm) + throws IOException { + assert fsd.hasWriteLock(); + INodesInPath iip = fsd.addLastINode(existing, dir, perm.getPermission(), true); + if (iip == null) { + FSNamesystem.LOG.debug("somebody already created {} on path {}", dir, existing.getPath()); + final INodeDirectory parent = existing.getLastINode().asDirectory(); + dir = parent.getChild(dir.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID); + return INodesInPath.append(existing, dir, dir.getLocalNameBytes()); + } + existing = iip; + assert dir.equals(existing.getLastINode()) : "dir is not the last INode"; + + // Directory creation also count towards FilesCreated + // to match count of FilesDeleted metric. + NameNode.getNameNodeMetrics().incrFilesCreated(); + + assert dir.getPermissionStatus().getGroupName() != null : + "GroupName is null for " + existing.getPath(); + String cur = existing.getPath(); + fsd.getEditLog().logMkDir(cur, dir); + NameNode.stateChangeLog.debug("mkdirs: created directory {}", cur); + return existing; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 0d9c6aeeb9c45..96f9907871304 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -228,6 +228,13 @@ static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src, // while chooseTarget() was executing. LocatedBlock[] onRetryBlock = new LocatedBlock[1]; INodesInPath iip = fsn.dir.resolvePath(null, src, fileId); + + INode[] missing = new INode[]{iip.getLastINode()}; + INodesInPath existing = iip.getParentINodesInPath(); + FSDirectory fsd = fsn.getFSDirectory(); + // switch the locks + fsd.getINodeMap().latchWriteLock(existing, missing); + FileState fileState = analyzeFileState(fsn, iip, fileId, clientName, previous, onRetryBlock); final INodeFile pendingFile = fileState.inode; @@ -392,8 +399,8 @@ static HdfsFileStatus startFile( } fsn.checkFsObjectLimit(); INodeFile newNode = null; - INodesInPath parent = - FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions); + INodesInPath parent = FSDirMkdirOp.createMissingDirs(fsd, + iip.getParentINodesInPath(), permissions, true); if (parent != null) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder, clientMachine, shouldReplicate, @@ -541,41 +548,22 @@ private static INodesInPath addFile( FSDirectory fsd, INodesInPath existing, byte[] localName, PermissionStatus permissions, short replication, long preferredBlockSize, String clientName, String clientMachine, boolean shouldReplicate, - String ecPolicyName, String storagePolicy) throws IOException { + String ecPolicyName, String storagePolicy) + throws IOException { Preconditions.checkNotNull(existing); long modTime = now(); INodesInPath newiip; fsd.writeLock(); try { - boolean isStriped = false; - ErasureCodingPolicy ecPolicy = null; - byte storagepolicyid = 0; - if (storagePolicy != null && !storagePolicy.isEmpty()) { - BlockStoragePolicy policy = - fsd.getBlockManager().getStoragePolicy(storagePolicy); - if (policy == null) { - throw new HadoopIllegalArgumentException( - "Cannot find a block policy with the name " + storagePolicy); - } - storagepolicyid = policy.getId(); - } - if (!shouldReplicate) { - ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( - fsd.getFSNamesystem(), ecPolicyName, existing); - if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) { - isStriped = true; - } - } - final BlockType blockType = isStriped ? - BlockType.STRIPED : BlockType.CONTIGUOUS; - final Short replicationFactor = (!isStriped ? replication : null); - final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); - INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, - modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize, - storagepolicyid, blockType); - newNode.setLocalName(localName); - newNode.toUnderConstruction(clientName, clientMachine); + INodeFile newNode = createINodeFile(fsd, existing, localName, + permissions, replication, preferredBlockSize, clientName, + clientMachine, shouldReplicate, ecPolicyName, storagePolicy, modTime); + + INode[] missing = new INode[] {newNode}; + // switch the locks + fsd.getINodeMap().latchWriteLock(existing, missing); + newiip = fsd.addINode(existing, newNode, permissions.getPermission()); } finally { fsd.writeUnlock(); @@ -593,6 +581,42 @@ private static INodesInPath addFile( return newiip; } + private static INodeFile createINodeFile(FSDirectory fsd, + INodesInPath existing, byte[] localName, PermissionStatus permissions, + short replication, long preferredBlockSize, String clientName, + String clientMachine, boolean shouldReplicate, String ecPolicyName, + String storagePolicy, long modTime) throws IOException { + boolean isStriped = false; + ErasureCodingPolicy ecPolicy = null; + byte storagepolicyid = 0; + if (storagePolicy != null && !storagePolicy.isEmpty()) { + BlockStoragePolicy policy = + fsd.getBlockManager().getStoragePolicy(storagePolicy); + if (policy == null) { + throw new HadoopIllegalArgumentException( + "Cannot find a block policy with the name " + storagePolicy); + } + storagepolicyid = policy.getId(); + } + if (!shouldReplicate) { + ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy( + fsd.getFSNamesystem(), ecPolicyName, existing); + if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) { + isStriped = true; + } + } + final BlockType blockType = isStriped ? + BlockType.STRIPED : BlockType.CONTIGUOUS; + final Short replicationFactor = (!isStriped ? replication : null); + final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null); + INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, + modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize, + storagepolicyid, blockType); + newNode.setLocalName(localName); + newNode.toUnderConstruction(clientName, clientMachine); + return newNode; + } + private static FileState analyzeFileState( FSNamesystem fsn, INodesInPath iip, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) @@ -687,6 +711,14 @@ static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc, } checkBlock(fsn, last); INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId); + + assert (iip.getLastINode() instanceof INodeFile); + INode[] missing = new INode[] {iip.getLastINode()}; + INodesInPath existing = iip.getParentINodesInPath(); + // switch the locks + FSDirectory fsd = fsn.getFSDirectory(); + fsd.getINodeMap().latchWriteLock(existing, missing); + return completeFileInternal(fsn, iip, holder, ExtendedBlock.getLocalBlock(last), fileId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 9ff54e6f2b752..8ddf0ffa9e857 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.util.HashMap; +import java.util.Iterator; +import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.util.GSet; +import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -160,6 +165,8 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) { private final int contentCountLimit; // max content summary counts per run private final long contentSleepMicroSec; private final INodeMap inodeMap; // Synchronized by dirLock + // Temp InodeMap used when loading an FS image. + private HashMap tempInodeMap; private long yieldCount = 0; // keep track of lock yield count. private int quotaInitThreads; @@ -317,7 +324,12 @@ public enum DirOp { FSDirectory(FSNamesystem ns, Configuration conf) throws IOException { this.inodeId = new INodeId(); rootDir = createRoot(ns); - inodeMap = INodeMap.newInstance(rootDir); + inodeMap = INodeMap.newInstance(rootDir, ns); + tempInodeMap = new HashMap<>(1000); + + // add rootDir to inodeMapTemp. + tempInodeMap.put(rootDir.getId(), rootDir); + this.isPermissionEnabled = conf.getBoolean( DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); @@ -1476,6 +1488,23 @@ public INodeMap getINodeMap() { return inodeMap; } + final void addToTempInodeMap(INode inode) { + if (inode instanceof INodeWithAdditionalFields) { + LOG.debug("addToTempInodeMap: id={}, inodeMapTemp.size={}", + inode.getId(), tempInodeMap.size()); + tempInodeMap.put(inode.getId(), (INodeWithAdditionalFields) inode); + if (!inode.isSymlink()) { + final XAttrFeature xaf = inode.getXAttrFeature(); + addEncryptionZone((INodeWithAdditionalFields) inode, xaf); + StoragePolicySatisfyManager spsManager = + namesystem.getBlockManager().getSPSManager(); + if (spsManager != null && spsManager.isEnabled()) { + addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf); + } + } + } + } + /** * This method is always called with writeLock of FSDirectory held. */ @@ -1543,6 +1572,36 @@ public final void addRootDirToEncryptionZone(XAttrFeature xaf) { addEncryptionZone(rootDir, xaf); } + /** + * After the inodes are set properly (set the parent for each inode), we move + * them from INodeMapTemp to INodeMap. + */ + void moveInodes() throws IOException { + long count=0, totalInodes = tempInodeMap.size(); + LOG.debug("inodeMapTemp={}", tempInodeMap); + + for (Map.Entry e: tempInodeMap.entrySet()) { + INodeWithAdditionalFields n = (INodeWithAdditionalFields)e.getValue(); + + LOG.debug("populate {}-th inode: id={}, fullpath={}", + count, n.getId(), n.getFullPathName()); + + inodeMap.put(n); + count++; + } + + if (count != totalInodes) { + String msg = String.format("moveInodes: expected to move %l inodes, " + + "but moved %l inodes", totalInodes, count); + throw new IOException(msg); + } + + //inodeMap.show(); + tempInodeMap.clear(); + assert(tempInodeMap.isEmpty()); + tempInodeMap = null; + } + /** * This method is always called with writeLock of FSDirectory held. */ @@ -1860,6 +1919,17 @@ FSPermissionChecker getPermissionChecker() } } + public INode getInode(INode inode) { + return inodeMap.get(inode); + } + public INode getInodeFromTempINodeMap(long id) { + LOG.debug("getInodeFromTempINodeMap: id={}, TempINodeMap.size={}", + id, tempInodeMap.size()); + if (id < INodeId.ROOT_INODE_ID) + return null; + + return tempInodeMap.get(id); + } @VisibleForTesting FSPermissionChecker getPermissionChecker(String fsOwner, String superGroup, UserGroupInformation ugi) throws AccessControlException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index f7749ce7e231b..0f0024e9d66b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -177,18 +177,23 @@ protected FSImage(Configuration conf, void format(FSNamesystem fsn, String clusterId, boolean force) throws IOException { - long fileCount = fsn.getFilesTotal(); - // Expect 1 file, which is the root inode - Preconditions.checkState(fileCount == 1, - "FSImage.format should be called with an uninitialized namesystem, has " + - fileCount + " files"); - NamespaceInfo ns = NNStorage.newNamespaceInfo(); - LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID()); - ns.clusterID = clusterId; - - storage.format(ns); - editLog.formatNonFileJournals(ns, force); - saveFSImageInAllDirs(fsn, 0); + fsn.readLock(); + try { + long fileCount = fsn.getFilesTotal(); + // Expect 1 file, which is the root inode + Preconditions.checkState(fileCount == 1, + "FSImage.format should be called with an uninitialized namesystem, has " + + fileCount + " files"); + NamespaceInfo ns = NNStorage.newNamespaceInfo(); + LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID()); + ns.clusterID = clusterId; + + storage.format(ns); + editLog.formatNonFileJournals(ns, force); + saveFSImageInAllDirs(fsn, 0); + } finally { + fsn.readUnlock(); + } } /** @@ -756,6 +761,16 @@ LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { "above for more info."); } prog.endPhase(Phase.LOADING_FSIMAGE); + + /* + * loadEdits always sets the parent of an inode before adding the inode to + * inodeMap. So, it is safe to move inodes from inodeMapTemp to inodeMap + * before loadEdits. + */ + FSDirectory dir = target.getFSDirectory(); + dir.moveInodes(); + LOG.info("LOADING_FSIMAGE: loaded {} inodes into inodeMap", + dir.getINodeMap().size()); if (!rollingRollback) { prog.beginPhase(Phase.LOADING_EDITS); @@ -771,6 +786,8 @@ LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { needToSave = false; } editLog.setNextTxId(lastAppliedTxId + 1); + LOG.info("LOADING_EDITS: loaded {} inodes into inodeMap", + dir.getINodeMap().size()); return needToSave; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 0a69c99cab810..fe37b8239a88f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -276,9 +276,10 @@ void loadINodeDirectorySection(InputStream in) throws IOException { if (e == null) { break; } - INodeDirectory p = dir.getInode(e.getParent()).asDirectory(); + INodeDirectory p = + dir.getInodeFromTempINodeMap(e.getParent()).asDirectory(); for (long id : e.getChildrenList()) { - INode child = dir.getInode(id); + INode child = dir.getInodeFromTempINodeMap(id); if (!addToParent(p, child)) { LOG.warn("Failed to add the inode {} to the directory {}", child.getId(), p.getId()); @@ -382,6 +383,7 @@ private int loadINodesInSection(InputStream in, Counter counter) if (p == null) { break; } + LOG.debug("loadINodesInSection: cntr={}, inode={}", cntr, p.getId()); if (p.getId() == INodeId.ROOT_INODE_ID) { synchronized(this) { loadRootINode(p); @@ -389,7 +391,7 @@ private int loadINodesInSection(InputStream in, Counter counter) } else { INode n = loadINode(p); synchronized(this) { - dir.addToInodeMap(n); + dir.addToTempInodeMap(n); } fillUpInodeList(inodeList, n); } @@ -761,7 +763,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { DirEntry.newBuilder().setParent(n.getId()); for (INode inode : children) { // Error if the child inode doesn't exist in inodeMap - if (dir.getInode(inode.getId()) == null) { + if (dir.getInode(inode) == null) { FSImage.LOG.error( "FSImageFormatPBINode#serializeINodeDirectorySection: " + "Dangling child pointer found. Missing INode in " + @@ -812,6 +814,7 @@ void serializeINodeSection(OutputStream out) throws IOException { Iterator iter = inodesMap.getMapIterator(); while (iter.hasNext()) { INodeWithAdditionalFields n = iter.next(); + LOG.debug("i = {}, save inode: {}", i, n); save(out, n); ++i; if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 7ccaae9773e1f..96324e3f50a54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1753,7 +1753,7 @@ public void readUnlock(String opName) { public void readUnlock(String opName, Supplier lockReportInfoSupplier) { - this.fsLock.readUnlock(opName, lockReportInfoSupplier); + this.fsLock.readUnlock(opName, lockReportInfoSupplier, true); } @Override @@ -1786,7 +1786,8 @@ public void writeUnlock(String opName, @Override public boolean hasWriteLock() { - return this.fsLock.isWriteLockedByCurrentThread(); + return this.fsLock.isWriteLockedByCurrentThread() || + fsLock.hasWriteChildLock(); } @Override public boolean hasReadLock() { @@ -1801,6 +1802,10 @@ public int getWriteHoldCount() { return this.fsLock.getWriteHoldCount(); } + public FSNamesystemLock getFSLock() { + return this.fsLock; + } + /** Lock the checkpoint lock */ public void cpLock() { this.cpLock.lock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java index b4f479fa93c89..aa54008c360d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.namenode; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -29,6 +32,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.INodeMap.INodeMapLock; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.log.LogThrottlingHelper; import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; @@ -129,6 +133,32 @@ public Long initialValue() { private static final String OVERALL_METRIC_NAME = "Overall"; + private final ThreadLocal> partitionLocks = + new ThreadLocal>() { + @Override + public Collection initialValue() { + return new ArrayList(); + } + }; + + void addChildLock(INodeMapLock lock) { + partitionLocks.get().add(lock); + } + + boolean removeChildLock(INodeMapLock lock) { + return partitionLocks.get().remove(lock); + } + + boolean hasWriteChildLock() { + Iterator iter = partitionLocks.get().iterator(); + // FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size()); + while(iter.hasNext()) { + if(iter.next().hasWriteChildLock()) + return true; + } + return false; + } + FSNamesystemLock(Configuration conf, MutableRatesWithAggregation detailedHoldTimeMetrics) { this(conf, detailedHoldTimeMetrics, new Timer()); @@ -180,11 +210,29 @@ public void readUnlock(String opName) { public void readUnlock(String opName, Supplier lockReportInfoSupplier) { + readUnlock(opName, lockReportInfoSupplier, true); + } + + public void readUnlock(String opName, + Supplier lockReportInfoSupplier, + boolean unlockChildren) { final boolean needReport = coarseLock.getReadHoldCount() == 1; final long readLockIntervalNanos = timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get(); final long currentTimeMs = timer.now(); - coarseLock.readLock().unlock(); + + if(getReadHoldCount() > 0) { // Current thread holds the lock + // Unlock the top FSNamesystemLock + coarseLock.readLock().unlock(); + } + + if(unlockChildren) { // Also unlock and remove children locks + Iterator iter = partitionLocks.get().iterator(); + while(iter.hasNext()) { + iter.next().readChildUnlock(); + iter.remove(); + } + } if (needReport) { addMetric(opName, readLockIntervalNanos, false); @@ -252,7 +300,7 @@ public void writeLockInterruptibly() throws InterruptedException { * FSNamesystemLock#writeUnlock(String, boolean, Supplier)} */ public void writeUnlock() { - writeUnlock(OP_NAME_OTHER, false, null); + writeUnlock(OP_NAME_OTHER, false, null, true); } /** @@ -262,7 +310,7 @@ public void writeUnlock() { * @param opName Operation name. */ public void writeUnlock(String opName) { - writeUnlock(opName, false, null); + writeUnlock(opName, false, null, true); } /** @@ -274,7 +322,7 @@ public void writeUnlock(String opName) { */ public void writeUnlock(String opName, Supplier lockReportInfoSupplier) { - writeUnlock(opName, false, lockReportInfoSupplier); + writeUnlock(opName, false, lockReportInfoSupplier, true); } /** @@ -286,7 +334,7 @@ public void writeUnlock(String opName, * for long time will be logged in logs and metrics. */ public void writeUnlock(String opName, boolean suppressWriteLockReport) { - writeUnlock(opName, suppressWriteLockReport, null); + writeUnlock(opName, suppressWriteLockReport, null, true); } /** @@ -297,8 +345,9 @@ public void writeUnlock(String opName, boolean suppressWriteLockReport) { * for long time will be logged in logs and metrics. * @param lockReportInfoSupplier The info shown in the lock report */ - private void writeUnlock(String opName, boolean suppressWriteLockReport, - Supplier lockReportInfoSupplier) { + public void writeUnlock(String opName, boolean suppressWriteLockReport, + Supplier lockReportInfoSupplier, + boolean unlockChildren) { final boolean needReport = !suppressWriteLockReport && coarseLock .getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread(); final long writeLockIntervalNanos = @@ -329,7 +378,18 @@ private void writeUnlock(String opName, boolean suppressWriteLockReport, longestWriteLockHeldInfo = new LockHeldInfo(); } - coarseLock.writeLock().unlock(); + if(this.isWriteLockedByCurrentThread()) { // Current thread holds the lock + // Unlock the top FSNamesystemLock + coarseLock.writeLock().unlock(); + } + + if(unlockChildren) { // Unlock and remove children locks + Iterator iter = partitionLocks.get().iterator(); + while(iter.hasNext()) { + iter.next().writeChildUnlock(); + iter.remove(); + } + } if (needReport) { addMetric(opName, writeLockIntervalNanos, true); @@ -355,7 +415,25 @@ public int getReadHoldCount() { public int getWriteHoldCount() { return coarseLock.getWriteHoldCount(); } - + + /** + * Queries if the write lock is held by any thread. + * @return {@code true} if any thread holds the write lock and + * {@code false} otherwise + */ + public boolean isReadLocked() { + return coarseLock.getReadLockCount() > 0 || isWriteLocked(); + } + + /** + * Queries if the write lock is held by any thread. + * @return {@code true} if any thread holds the write lock and + * {@code false} otherwise + */ + public boolean isWriteLocked() { + return coarseLock.isWriteLocked(); + } + public boolean isWriteLockedByCurrentThread() { return coarseLock.isWriteLockedByCurrentThread(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index daff95c373911..42e462e44417a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -47,6 +47,7 @@ import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -577,6 +578,43 @@ public final String getLocalName() { return name == null? null: DFSUtil.bytes2String(name); } + private long[] namespaceKey; + + /** + * Key of an INode. + * Defines partitioning of INodes in the INodeMap. + * + * @param level how many levels to be included in the key + * @return + */ + public long[] getNamespaceKey(int level) { + if(namespaceKey == null) { // generate the namespace key + long[] buf = new long[level]; + INode cur = this; + for(int l = 0; l < level; l++) { + long curId = (cur == null) ? 0L : cur.getId(); + buf[level - l - 1] = curId; + cur = (cur == null) ? null : cur.parent; + } + buf[0] = indexOf(buf); + namespaceKey = buf; + } + return namespaceKey; + } + + private final static long LARGE_PRIME = 512927357; + public static long indexOf(long[] key) { + if(key[key.length-1] == INodeId.ROOT_INODE_ID) { + return key[0]; + } + long idx = LARGE_PRIME * key[0]; + idx = (idx ^ (idx >> 32)) & (INodeMap.NUM_RANGES_STATIC -1); + return idx; + } + + /** + * Key of a snapshot Diff Element + */ @Override public final byte[] getKey() { return getLocalNameBytes(); @@ -636,7 +674,7 @@ public byte[][] getPathComponents() { @Override public String toString() { - return getLocalName(); + return getLocalName() + ": " + Arrays.toString(namespaceKey); } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java index f35949fdcdbed..0fdda96448f1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java @@ -17,44 +17,202 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.nio.charset.StandardCharsets; +import java.util.Comparator; import java.util.Iterator; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.util.GSet; +import org.apache.hadoop.util.LatchLock; import org.apache.hadoop.util.LightWeightGSet; - -import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.util.PartitionedGSet; /** * Storing all the {@link INode}s and maintaining the mapping between INode ID * and INode. */ public class INodeMap { - - static INodeMap newInstance(INodeDirectory rootDir) { - // Compute the map capacity by allocating 1% of total memory - int capacity = LightWeightGSet.computeCapacity(1, "INodeMap"); - GSet map = - new LightWeightGSet<>(capacity); - map.put(rootDir); - return new INodeMap(map); + static final int NAMESPACE_KEY_DEPTH = 2; + static final int NUM_RANGES_STATIC = 256; // power of 2 + + public static class INodeKeyComparator implements Comparator { + INodeKeyComparator() { + FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH); + } + + @Override + public int compare(INode i1, INode i2) { + if (i1 == null || i2 == null) { + throw new NullPointerException("Cannot compare null INodes"); + } + long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH); + long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH); + for(int l = 0; l < NAMESPACE_KEY_DEPTH; l++) { + if(key1[l] == key2[l]) continue; + return (key1[l] < key2[l] ? -1 : 1); + } + return 0; + } + } + + /** + * INodeKeyComparator with Hashed Parent + * + */ + public static class HPINodeKeyComparator implements Comparator { + HPINodeKeyComparator() { + FSDirectory.LOG.info("Namespace key depth = {}", NAMESPACE_KEY_DEPTH); + } + + @Override + public int compare(INode i1, INode i2) { + if (i1 == null || i2 == null) { + throw new NullPointerException("Cannot compare null INodes"); + } + long[] key1 = i1.getNamespaceKey(NAMESPACE_KEY_DEPTH); + long[] key2 = i2.getNamespaceKey(NAMESPACE_KEY_DEPTH); + long key1_0 = INode.indexOf(key1); + long key2_0 = INode.indexOf(key2); + if(key1_0 != key2_0) + return (key1_0 < key2_0 ? -1 : 1); + for(int l = 1; l < NAMESPACE_KEY_DEPTH; l++) { + if(key1[l] == key2[l]) continue; + return (key1[l] < key2[l] ? -1 : 1); + } + return 0; + } + } + + public static class INodeIdComparator implements Comparator { + @Override + public int compare(INode i1, INode i2) { + if (i1 == null || i2 == null) { + throw new NullPointerException("Cannot compare null INodesl"); + } + long id1 = i1.getId(); + long id2 = i2.getId(); + return id1 < id2 ? -1 : id1 == id2 ? 0 : 1; + } + } + + public class INodeMapLock extends LatchLock { + private ReentrantReadWriteLock childLock; + + INodeMapLock() { + this(null); + } + + private INodeMapLock(ReentrantReadWriteLock childLock) { + assert namesystem != null : "namesystem is null"; + this.childLock = childLock; + } + + @Override + protected boolean isReadTopLocked() { + return namesystem.getFSLock().isReadLocked(); + } + + @Override + protected boolean isWriteTopLocked() { + return namesystem.getFSLock().isWriteLocked(); + } + + @Override + protected void readTopUnlock() { + namesystem.getFSLock().readUnlock("INodeMap", null, false); + } + + @Override + protected void writeTopUnlock() { + namesystem.getFSLock().writeUnlock("INodeMap", false, null, false); + } + + @Override + protected boolean hasReadChildLock() { + return this.childLock.getReadHoldCount() > 0 || hasWriteChildLock(); + } + + @Override + protected void readChildLock() { + // LOG.info("readChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + this.childLock.readLock().lock(); + namesystem.getFSLock().addChildLock(this); + // LOG.info("readChildLock: done"); + } + + @Override + protected void readChildUnlock() { + // LOG.info("readChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + this.childLock.readLock().unlock(); + // LOG.info("readChildUnlock: done"); + } + + @Override + protected boolean hasWriteChildLock() { + return this.childLock.isWriteLockedByCurrentThread() || namesystem + .getFSLock().hasWriteChildLock(); + } + + @Override + protected void writeChildLock() { + // LOG.info("writeChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + this.childLock.writeLock().lock(); + namesystem.getFSLock().addChildLock(this); + // LOG.info("writeChildLock: done"); + } + + @Override + protected void writeChildUnlock() { + // LOG.info("writeChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + this.childLock.writeLock().unlock(); + // LOG.info("writeChildUnlock: done"); + } + + @Override + protected LatchLock clone() { + return new INodeMapLock(new ReentrantReadWriteLock(false)); // not fair + } + } + + static INodeMap newInstance(INodeDirectory rootDir, + FSNamesystem ns) { + return new INodeMap(rootDir, ns); } /** Synchronized by external lock. */ private final GSet map; - + private FSNamesystem namesystem; + public Iterator getMapIterator() { return map.iterator(); } - private INodeMap(GSet map) { - Preconditions.checkArgument(map != null); - this.map = map; + private INodeMap(INodeDirectory rootDir, FSNamesystem ns) { + this.namesystem = ns; + // Compute the map capacity by allocating 1% of total memory + int capacity = LightWeightGSet.computeCapacity(1, "INodeMap"); + this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(), + new INodeMapLock()); + + // Pre-populate initial empty partitions + PartitionedGSet pgs = + (PartitionedGSet) map; + PermissionStatus perm = new PermissionStatus( + "", "", new FsPermission((short) 0)); + for(int p = 0; p < NUM_RANGES_STATIC; p++) { + INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID, + "range key".getBytes(StandardCharsets.UTF_8), perm, 0); + key.setParent(new INodeDirectory((long)p, null, perm, 0)); + pgs.addNewPartition(key); + } + + map.put(rootDir); } - + /** * Add an {@link INode} into the {@link INode} map. Replace the old value if * necessary. @@ -88,48 +246,58 @@ public int size() { * such {@link INode} in the map. */ public INode get(long id) { - INode inode = new INodeWithAdditionalFields(id, null, new PermissionStatus( - "", "", new FsPermission((short) 0)), 0, 0) { - - @Override - void recordModification(int latestSnapshotId) { - } + PartitionedGSet pgs = + (PartitionedGSet) map; + /* + * Convert a long inode id into an INode object. We only need to compare + * two inodes by inode id. So, it can be any type of INode object. + */ + INode inode = new INodeDirectory(id, null, + new PermissionStatus("", "", new FsPermission((short) 0)), 0); + + /* + * Iterate all partitions of PGSet and return the INode. + * Just for fallback. + */ + PermissionStatus perm = + new PermissionStatus("", "", new FsPermission((short) 0)); + // TODO: create a static array, to avoid creation of keys each time. + for (int p = 0; p < NUM_RANGES_STATIC; p++) { + INodeDirectory key = new INodeDirectory(INodeId.ROOT_INODE_ID, + "range key".getBytes(StandardCharsets.UTF_8), perm, 0); + key.setParent(new INodeDirectory((long)p, null, perm, 0)); + PartitionedGSet.PartitionEntry e = pgs.getPartition(key); - @Override - public void destroyAndCollectBlocks(ReclaimContext reclaimContext) { - // Nothing to do + if (e.contains(inode)) { + return (INode) e.get(inode); } + } - @Override - public QuotaCounts computeQuotaUsage( - BlockStoragePolicySuite bsps, byte blockStoragePolicyId, - boolean useCache, int lastSnapshotId) { - return null; - } + return null; + } - @Override - public ContentSummaryComputationContext computeContentSummary( - int snapshotId, ContentSummaryComputationContext summary) { - return null; - } - - @Override - public void cleanSubtree( - ReclaimContext reclaimContext, int snapshotId, int priorSnapshotId) { - } + public INode get(INode inode) { - @Override - public byte getStoragePolicyID(){ - return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED; - } + /* + * Check whether the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent + * dirs + */ + int i = NAMESPACE_KEY_DEPTH - 1; + INode tmpInode = inode; + while (i > 0 && tmpInode.getParent() != null) { + tmpInode = tmpInode.getParent(); + i--; + } - @Override - public byte getLocalStoragePolicyID() { - return HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED; - } - }; - - return map.get(inode); + /* + * If the Inode has (NAMESPACE_KEY_DEPTH - 1) levels of parent dirs, + * use map.get(); else, fall back to get INode based on Inode ID. + */ + if (i == 0) { + return map.get(inode); + } else { + return get(inode.getId()); + } } /** @@ -138,4 +306,27 @@ public byte getLocalStoragePolicyID() { public void clear() { map.clear(); } + + public void latchWriteLock(INodesInPath iip, INode[] missing) { + assert namesystem.hasReadLock() : "must have namesysem lock"; + assert iip.length() > 0 : "INodesInPath has 0 length"; + if(!(map instanceof PartitionedGSet)) { + return; + } + // Locks partitions along the path starting from the first existing parent + // Locking is in the hierarchical order + INode[] allINodes = new INode[Math.min(1, iip.length()) + missing.length]; + allINodes[0] = iip.getLastINode(); + System.arraycopy(missing, 0, allINodes, 1, missing.length); + /* + // Locks all the partitions along the path in the hierarchical order + INode[] allINodes = new INode[iip.length() + missing.length]; + INode[] existing = iip.getINodesArray(); + System.arraycopy(existing, 0, allINodes, 0, existing.length); + System.arraycopy(missing, 0, allINodes, existing.length, missing.length); + */ + + ((PartitionedGSet) + map).latchWriteLock(allINodes); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d813375e2748f..498211924b8b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -186,6 +186,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.Whitebox; +import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -1997,6 +1998,7 @@ public static void setNameNodeLogLevel(Level level) { GenericTestUtils.setLogLevel(NameNode.LOG, level); GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level); GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); + GenericTestUtils.setLogLevel(GSet.LOG, level); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSMkdirs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSMkdirs.java index e19f3281e207d..bae98cb625a6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSMkdirs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSMkdirs.java @@ -25,6 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -152,4 +154,55 @@ public void testMkdirRpcNonCanonicalPath() throws IOException { cluster.shutdown(); } } + + @Test + public void testMkDirsWithRestart() throws IOException { + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + DistributedFileSystem dfs = cluster.getFileSystem(); + try { + Path dir1 = new Path("/mkdir-1"); + Path file1 = new Path(dir1, "file1"); + Path deleteDir = new Path("/deleteDir"); + Path deleteFile = new Path(dir1, "deleteFile"); + // Create a dir in root dir, should succeed + assertTrue(dfs.mkdir(dir1, FsPermission.getDefault())); + dfs.mkdir(deleteDir, FsPermission.getDefault()); + assertTrue(dfs.exists(deleteDir)); + dfs.delete(deleteDir, true); + assertTrue(!dfs.exists(deleteDir)); + + DFSTestUtil.writeFile(dfs, file1, "hello world"); + DFSTestUtil.writeFile(dfs, deleteFile, "hello world"); + int totalFiles = getFileCount(dfs); + //Before deletion there are 2 files + assertTrue("Incorrect file count", 2 == totalFiles); + dfs.delete(deleteFile, false); + totalFiles = getFileCount(dfs); + //After deletion, left with 1 file + assertTrue("Incorrect file count", 1 == totalFiles); + + cluster.restartNameNodes(); + dfs = cluster.getFileSystem(); + assertTrue(dfs.exists(dir1)); + assertTrue(!dfs.exists(deleteDir)); + assertTrue(dfs.exists(file1)); + totalFiles = getFileCount(dfs); + assertTrue("Incorrect file count", 1 == totalFiles); + } finally { + dfs.close(); + cluster.shutdown(); + } + } + + private int getFileCount(DistributedFileSystem dfs) throws IOException { + RemoteIterator fileItr = + dfs.listFiles(new Path("/"), true); + int totalFiles = 0; + while (fileItr.hasNext()) { + fileItr.next(); + totalFiles++; + } + return totalFiles; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index a7cf68b10168f..2b5f04982ba47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -1313,6 +1313,10 @@ public void testFileIdMismatch() throws IOException { fail(); } catch(FileNotFoundException e) { FileSystem.LOG.info("Caught Expected FileNotFoundException: ", e); + } catch (AssertionError ae) { + //FSDirWriteFileOp#completeFile throws AssertError if the given + // id/node is not an instance of INodeFile. + FileSystem.LOG.info("Caught Expected AssertionError: ", ae); } } finally { IOUtils.closeStream(dfs); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java index b32f8fe759d1e..0b2103243f7f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java @@ -1017,23 +1017,38 @@ public void testInodeReplacement() throws Exception { final Path dir = new Path("/dir"); hdfs.mkdirs(dir); - INodeDirectory dirNode = getDir(fsdir, dir); - INode dirNodeFromNode = fsdir.getInode(dirNode.getId()); - assertSame(dirNode, dirNodeFromNode); + cluster.getNamesystem().readLock(); + try { + INodeDirectory dirNode = getDir(fsdir, dir); + INode dirNodeFromNode = fsdir.getInode(dirNode.getId()); + assertSame(dirNode, dirNodeFromNode); + } finally { + cluster.getNamesystem().readUnlock(); + } // set quota to dir, which leads to node replacement hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); - dirNode = getDir(fsdir, dir); - assertTrue(dirNode.isWithQuota()); - // the inode in inodeMap should also be replaced - dirNodeFromNode = fsdir.getInode(dirNode.getId()); - assertSame(dirNode, dirNodeFromNode); + cluster.getNamesystem().readLock(); + try { + INodeDirectory dirNode = getDir(fsdir, dir); + assertTrue(dirNode.isWithQuota()); + // the inode in inodeMap should also be replaced + INode dirNodeFromNode = fsdir.getInode(dirNode.getId()); + assertSame(dirNode, dirNodeFromNode); + } finally { + cluster.getNamesystem().readUnlock(); + } hdfs.setQuota(dir, -1, -1); - dirNode = getDir(fsdir, dir); - // the inode in inodeMap should also be replaced - dirNodeFromNode = fsdir.getInode(dirNode.getId()); - assertSame(dirNode, dirNodeFromNode); + cluster.getNamesystem().readLock(); + try { + INodeDirectory dirNode = getDir(fsdir, dir); + // the inode in inodeMap should also be replaced + INode dirNodeFromNode = fsdir.getInode(dirNode.getId()); + assertSame(dirNode, dirNodeFromNode); + } finally { + cluster.getNamesystem().readUnlock(); + } } finally { if (cluster != null) { cluster.shutdown();