Skip to content

Commit

Permalink
HDFS-7830. DataNode does not release the volume lock when adding a vo…
Browse files Browse the repository at this point in the history
…lume fails. (Lei Xu via Colin P. McCabe)
  • Loading branch information
Colin Patrick Mccabe committed Mar 11, 2015
1 parent a5cf985 commit 5c1036d
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 33 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -1133,6 +1133,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7818. OffsetParam should return the default value instead of throwing
NPE when the value is unspecified. (Eric Payne via wheat9)

HDFS-7830. DataNode does not release the volume lock when adding a volume
fails. (Lei Xu via Colin P. Mccabe)

BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS

HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
Expand Down
Expand Up @@ -672,7 +672,7 @@ public boolean isShared() {
*/
public void lock() throws IOException {
if (isShared()) {
LOG.info("Locking is disabled");
LOG.info("Locking is disabled for " + this.root);
return;
}
FileLock newLock = tryLock();
Expand Down
Expand Up @@ -46,6 +46,7 @@
import javax.management.ObjectName;
import javax.management.StandardMBean;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -375,6 +376,12 @@ private void addVolume(Collection<StorageLocation> dataLocations,
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}

@VisibleForTesting
public FsVolumeImpl createFsVolume(String storageUuid, File currentDir,
StorageType storageType) throws IOException {
return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType);
}

@Override
public void addVolume(final StorageLocation location,
final List<NamespaceInfo> nsInfos)
Expand All @@ -394,8 +401,8 @@ public void addVolume(final StorageLocation location,
final Storage.StorageDirectory sd = builder.getStorageDirectory();

StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), sd.getCurrentDir(), this.conf, storageType);
final FsVolumeImpl fsVolume =
createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType);
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
ArrayList<IOException> exceptions = Lists.newArrayList();

Expand All @@ -411,6 +418,11 @@ public void addVolume(final StorageLocation location,
}
}
if (!exceptions.isEmpty()) {
try {
sd.unlock();
} catch (IOException e) {
exceptions.add(e);
}
throw MultipleIOException.createIOException(exceptions);
}

Expand Down
Expand Up @@ -37,18 +37,14 @@
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -70,7 +66,6 @@
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -538,31 +533,10 @@ public void testAddVolumeFailures() throws IOException {
private static void assertFileLocksReleased(Collection<String> dirs)
throws IOException {
for (String dir: dirs) {
StorageLocation sl = StorageLocation.parse(dir);
File lockFile = new File(sl.getFile(), Storage.STORAGE_FILE_LOCK);
RandomAccessFile raf = null;
FileChannel channel = null;
FileLock lock = null;
try {
raf = new RandomAccessFile(lockFile, "rws");
channel = raf.getChannel();
lock = channel.tryLock();
assertNotNull(String.format(
"Lock file at %s appears to be held by a different process.",
lockFile.getAbsolutePath()), lock);
} catch (OverlappingFileLockException e) {
fail(String.format("Must release lock file at %s.",
lockFile.getAbsolutePath()));
} finally {
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
LOG.warn(String.format("I/O error releasing file lock %s.",
lockFile.getAbsolutePath()), e);
}
}
IOUtils.cleanup(null, channel, raf);
FsDatasetTestUtil.assertFileLockReleased(dir);
} catch (IOException e) {
LOG.warn(e);
}
}
}
Expand Down
Expand Up @@ -19,13 +19,24 @@

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.Collection;
import java.util.Random;

import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.io.IOUtils;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

public class FsDatasetTestUtil {

Expand Down Expand Up @@ -72,4 +83,36 @@ public static void stopLazyWriter(DataNode dn) {
FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
}

/**
* Asserts that the storage lock file in the given directory has been
* released. This method works by trying to acquire the lock file itself. If
* locking fails here, then the main code must have failed to release it.
*
* @param dir the storage directory to check
* @throws IOException if there is an unexpected I/O error
*/
public static void assertFileLockReleased(String dir) throws IOException {
StorageLocation sl = StorageLocation.parse(dir);
File lockFile = new File(sl.getFile(), Storage.STORAGE_FILE_LOCK);
try (RandomAccessFile raf = new RandomAccessFile(lockFile, "rws");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
assertNotNull(String.format(
"Lock file at %s appears to be held by a different process.",
lockFile.getAbsolutePath()), lock);
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
FsDatasetImpl.LOG.warn(String.format("I/O error releasing file lock %s.",
lockFile.getAbsolutePath()), e);
throw e;
}
}
} catch (OverlappingFileLockException e) {
fail(String.format("Must release lock file at %s.",
lockFile.getAbsolutePath()));
}
}
}
Expand Up @@ -35,11 +35,13 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

Expand All @@ -57,12 +59,17 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -291,4 +298,40 @@ public Object answer(InvocationOnMock invocationOnMock)
assertFalse(volumeList.getVolumes().contains(brokenVolume));
assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
}

@Test
public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(File.class), any(StorageType.class));
doThrow(new IOException("Failed to getVolumeMap()"))
.when(mockVolume).getVolumeMap(
anyString(),
any(ReplicaMap.class),
any(RamDiskReplicaLruTracker.class));

Storage.StorageDirectory sd = createStorageDirectory(badDir);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(badDir),
Matchers.<List<NamespaceInfo>>any()))
.thenReturn(builder);

StorageLocation location = StorageLocation.parse(badDir.toString());
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}

try {
spyDataset.addVolume(location, nsInfos);
fail("Expect to throw MultipleIOException");
} catch (MultipleIOException e) {
}

FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
}
}

0 comments on commit 5c1036d

Please sign in to comment.