Skip to content

Commit

Permalink
HDFS-8856. Make LeaseManager#countPath O(1). (Contributed by Arpit Ag…
Browse files Browse the repository at this point in the history
…arwal)
  • Loading branch information
arp7 committed Aug 7, 2015
1 parent 8572a5a commit 6d4eee7
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 23 deletions.
2 changes: 2 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -767,6 +767,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8815. DFS getStoragePolicy implementation using single RPC call HDFS-8815. DFS getStoragePolicy implementation using single RPC call
(Surendra Singh Lilhore via vinayakumarb) (Surendra Singh Lilhore via vinayakumarb)


HDFS-8856. Make LeaseManager#countPath O(1). (Arpit Agarwal)

OPTIMIZATIONS OPTIMIZATIONS


HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -254,7 +254,9 @@ void doCheckpoint() throws IOException {
try { try {
backupNode.namesystem.setImageLoaded(); backupNode.namesystem.setImageLoaded();
if(backupNode.namesystem.getBlocksTotal() > 0) { if(backupNode.namesystem.getBlocksTotal() > 0) {
backupNode.namesystem.setBlockTotal(); long completeBlocksTotal =
backupNode.namesystem.getCompleteBlocksTotal();
backupNode.namesystem.setBlockTotal(completeBlocksTotal);
} }
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid); bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
if (!backupNode.namesystem.isRollingUpgrade()) { if (!backupNode.namesystem.isRollingUpgrade()) {
Expand Down
Expand Up @@ -1042,9 +1042,10 @@ void startCommonServices(Configuration conf, HAContext haContext) throws IOExcep
assert safeMode != null && !isPopulatingReplQueues(); assert safeMode != null && !isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress(); StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE); prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
getCompleteBlocksTotal()); completeBlocksTotal);
setBlockTotal(); setBlockTotal(completeBlocksTotal);
blockManager.activate(conf); blockManager.activate(conf);
} finally { } finally {
writeUnlock(); writeUnlock();
Expand Down Expand Up @@ -4686,12 +4687,12 @@ public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
/** /**
* Set the total number of blocks in the system. * Set the total number of blocks in the system.
*/ */
public void setBlockTotal() { public void setBlockTotal(long completeBlocksTotal) {
// safeMode is volatile, and may be set to null at any time // safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode; SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) if (safeMode == null)
return; return;
safeMode.setBlockTotal((int) getCompleteBlocksTotal()); safeMode.setBlockTotal((int) completeBlocksTotal);
} }


/** /**
Expand Down Expand Up @@ -4723,13 +4724,14 @@ public long getNumActiveClients() {
/** /**
* Get the total number of COMPLETE blocks in the system. * Get the total number of COMPLETE blocks in the system.
* For safe mode only complete blocks are counted. * For safe mode only complete blocks are counted.
* This is invoked only during NN startup and checkpointing.
*/ */
private long getCompleteBlocksTotal() { public long getCompleteBlocksTotal() {
// Calculate number of blocks under construction // Calculate number of blocks under construction
long numUCBlocks = 0; long numUCBlocks = 0;
readLock(); readLock();
numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
try { try {
numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
return getBlocksTotal() - numUCBlocks; return getBlocksTotal() - numUCBlocks;
} finally { } finally {
readUnlock(); readUnlock();
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
Expand Down Expand Up @@ -128,15 +129,13 @@ synchronized long getNumUnderConstructionBlocks() {


/** @return the number of leases currently in the system */ /** @return the number of leases currently in the system */
@VisibleForTesting @VisibleForTesting
public synchronized int countLease() {return sortedLeases.size();} public synchronized int countLease() {
return sortedLeases.size();
}


/** @return the number of paths contained in all leases */ /** @return the number of paths contained in all leases */
synchronized int countPath() { synchronized long countPath() {
int count = 0; return leasesById.size();
for (Lease lease : sortedLeases) {
count += lease.getFiles().size();
}
return count;
} }


/** /**
Expand Down Expand Up @@ -280,7 +279,9 @@ public int hashCode() {
return holder.hashCode(); return holder.hashCode();
} }


private Collection<Long> getFiles() { return files; } private Collection<Long> getFiles() {
return Collections.unmodifiableCollection(files);
}


String getHolder() { String getHolder() {
return holder; return holder;
Expand Down
Expand Up @@ -17,19 +17,28 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;


import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.junit.rules.Timeout;


import java.util.ArrayList; import java.util.ArrayList;


import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;


public class TestLeaseManager { public class TestLeaseManager {
@Rule
public Timeout timeout = new Timeout(300000);

@Test @Test
public void testRemoveLeases() throws Exception { public void testRemoveLeases() throws Exception {
FSNamesystem fsn = mock(FSNamesystem.class); FSNamesystem fsn = mock(FSNamesystem.class);
Expand All @@ -52,14 +61,9 @@ public void testRemoveLeases() throws Exception {
* leases, the Namenode does't enter an infinite loop while holding the FSN * leases, the Namenode does't enter an infinite loop while holding the FSN
* write lock and thus become unresponsive * write lock and thus become unresponsive
*/ */
@Test (timeout=1000) @Test
public void testCheckLeaseNotInfiniteLoop() { public void testCheckLeaseNotInfiniteLoop() {
FSDirectory dir = Mockito.mock(FSDirectory.class); LeaseManager lm = new LeaseManager(makeMockFsNameSystem());
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.isRunning()).thenReturn(true);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
Mockito.when(fsn.getFSDirectory()).thenReturn(dir);
LeaseManager lm = new LeaseManager(fsn);


//Make sure the leases we are going to add exceed the hard limit //Make sure the leases we are going to add exceed the hard limit
lm.setLeasePeriod(0, 0); lm.setLeasePeriod(0, 0);
Expand All @@ -73,4 +77,49 @@ public void testCheckLeaseNotInfiniteLoop() {
//Initiate a call to checkLease. This should exit within the test timeout //Initiate a call to checkLease. This should exit within the test timeout
lm.checkLeases(); lm.checkLeases();
} }


@Test
public void testCountPath() {
LeaseManager lm = new LeaseManager(makeMockFsNameSystem());

lm.addLease("holder1", 1);
assertThat(lm.countPath(), is(1L));

lm.addLease("holder2", 2);
assertThat(lm.countPath(), is(2L));
lm.addLease("holder2", 2); // Duplicate addition
assertThat(lm.countPath(), is(2L));

assertThat(lm.countPath(), is(2L));

// Remove a couple of non-existing leases. countPath should not change.
lm.removeLease("holder2", stubInodeFile(3));
lm.removeLease("InvalidLeaseHolder", stubInodeFile(1));
assertThat(lm.countPath(), is(2L));

INodeFile file = stubInodeFile(1);
lm.reassignLease(lm.getLease(file), file, "holder2");
assertThat(lm.countPath(), is(2L)); // Count unchanged on reassign

lm.removeLease("holder2", stubInodeFile(2)); // Remove existing
assertThat(lm.countPath(), is(1L));
}

private static FSNamesystem makeMockFsNameSystem() {
FSDirectory dir = mock(FSDirectory.class);
FSNamesystem fsn = mock(FSNamesystem.class);
when(fsn.isRunning()).thenReturn(true);
when(fsn.hasWriteLock()).thenReturn(true);
when(fsn.getFSDirectory()).thenReturn(dir);
return fsn;
}

private static INodeFile stubInodeFile(long inodeId) {
PermissionStatus p = new PermissionStatus(
"dummy", "dummy", new FsPermission((short) 0777));
return new INodeFile(
inodeId, "/foo".getBytes(), p, 0L, 0L,
BlockInfo.EMPTY_ARRAY, (short) 1, 1L);
}
} }

0 comments on commit 6d4eee7

Please sign in to comment.