Skip to content

Commit

Permalink
HDFS-7496. Fix FsVolume removal race conditions on the DataNode by re…
Browse files Browse the repository at this point in the history
…ference-counting the volume instances (lei via cmccabe)
  • Loading branch information
Colin Patrick Mccabe committed Jan 21, 2015
1 parent 889ab07 commit b7f4a31
Show file tree
Hide file tree
Showing 24 changed files with 717 additions and 220 deletions.
Expand Up @@ -26,7 +26,6 @@
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
Expand All @@ -49,10 +48,8 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
Expand Down Expand Up @@ -125,6 +122,8 @@ class BlockReceiver implements Closeable {

private boolean syncOnClose;
private long restartBudget;
/** the reference of the volume where the block receiver writes to */
private final ReplicaHandler replicaHandler;

/**
* for replaceBlock response
Expand Down Expand Up @@ -179,48 +178,50 @@ class BlockReceiver implements Closeable {
// Open local disk out
//
if (isDatanode) { //replication or move
replicaInfo = datanode.data.createTemporary(storageType, block);
replicaHandler = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
block, replicaHandler.getReplica().getStorageUuid());
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
replicaHandler = datanode.data.recoverRbw(
block, newGs, minBytesRcvd, maxBytesRcvd);
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
block, replicaHandler.getReplica().getStorageUuid());
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
block, replicaHandler.getReplica().getStorageUuid());
break;
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaInfo = datanode.data.createTemporary(storageType, block);
replicaHandler =
datanode.data.createTemporary(storageType, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
}
}
replicaInfo = replicaHandler.getReplica();
this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
datanode.getDnConf().dropCacheBehindWrites :
cachingStrategy.getDropBehind();
Expand Down Expand Up @@ -339,6 +340,9 @@ public void close() throws IOException {
finally{
IOUtils.closeStream(out);
}
if (replicaHandler != null) {
IOUtils.cleanup(null, replicaHandler);
}
if (measuredFlushTime) {
datanode.metrics.addFlushNanos(flushTotalNanos);
}
Expand Down Expand Up @@ -950,15 +954,12 @@ private Checksum computePartialChunkCrc(long blkoff, long ckoff)
//
byte[] buf = new byte[sizePartialChunk];
byte[] crcbuf = new byte[checksumSize];
ReplicaInputStreams instr = null;
try {
instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
try (ReplicaInputStreams instr =
datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);

// open meta file and read in crc value computer earlier
IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
} finally {
IOUtils.closeStream(instr);
}

// compute crc of partial chunk from data read in the block file.
Expand Down Expand Up @@ -1244,28 +1245,7 @@ public void run() {

if (lastPacketInBlock) {
// Finalize the block and close the block file
try {
finalizeBlock(startTime);
} catch (ReplicaNotFoundException e) {
// Verify that the exception is due to volume removal.
FsVolumeSpi volume;
synchronized (datanode.data) {
volume = datanode.data.getVolume(block);
}
if (volume == null) {
// ReplicaInfo has been removed due to the corresponding data
// volume has been removed. Don't need to check disk error.
LOG.info(myString
+ ": BlockReceiver is interrupted because the block pool "
+ block.getBlockPoolId() + " has been removed.", e);
sendAckUpstream(ack, expected, totalAckTimeNanos, 0,
Status.OOB_INTERRUPTED);
running = false;
receiverThread.interrupt();
continue;
}
throw e;
}
finalizeBlock(startTime);
}

sendAckUpstream(ack, expected, totalAckTimeNanos,
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
Expand Down Expand Up @@ -143,6 +144,8 @@ class BlockSender implements java.io.Closeable {

/** The file descriptor of the block being sent */
private FileDescriptor blockInFd;
/** The reference to the volume where the block is located */
private FsVolumeReference volumeRef;

// Cache-management related fields
private final long readaheadLength;
Expand Down Expand Up @@ -257,6 +260,9 @@ class BlockSender implements java.io.Closeable {
this.transferToAllowed = datanode.getDnConf().transferToAllowed &&
(!is32Bit || length <= Integer.MAX_VALUE);

// Obtain a reference before reading data
this.volumeRef = datanode.data.getVolume(block).obtainReference();

/*
* (corruptChecksumOK, meta_file_exist): operation
* True, True: will verify checksum
Expand Down Expand Up @@ -420,6 +426,10 @@ public void close() throws IOException {
blockIn = null;
blockInFd = null;
}
if (volumeRef != null) {
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
// throw IOException if there is any
if(ioe!= null) {
throw ioe;
Expand Down
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;

import java.io.Closeable;
import java.io.IOException;

/**
* This class includes a replica being actively written and the reference to
* the fs volume where this replica is located.
*/
public class ReplicaHandler implements Closeable {
private final ReplicaInPipelineInterface replica;
private final FsVolumeReference volumeReference;

public ReplicaHandler(
ReplicaInPipelineInterface replica, FsVolumeReference reference) {
this.replica = replica;
this.volumeReference = reference;
}

@Override
public void close() throws IOException {
if (this.volumeReference != null) {
volumeReference.close();
}
}

public ReplicaInPipelineInterface getReplica() {
return replica;
}
}
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
Expand Down Expand Up @@ -198,7 +199,7 @@ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createTemporary(StorageType storageType,
public ReplicaHandler createTemporary(StorageType storageType,
ExtendedBlock b) throws IOException;

/**
Expand All @@ -208,7 +209,7 @@ public ReplicaInPipelineInterface createTemporary(StorageType storageType,
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createRbw(StorageType storageType,
public ReplicaHandler createRbw(StorageType storageType,
ExtendedBlock b, boolean allowLazyPersist) throws IOException;

/**
Expand All @@ -221,7 +222,7 @@ public ReplicaInPipelineInterface createRbw(StorageType storageType,
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
public ReplicaHandler recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;

/**
Expand All @@ -241,7 +242,7 @@ public ReplicaInPipelineInterface convertTemporaryToRbw(
* @return the meata info of the replica which is being written to
* @throws IOException
*/
public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS,
public ReplicaHandler append(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException;

/**
Expand All @@ -254,8 +255,8 @@ public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS,
* @return the meta info of the replica which is being written to
* @throws IOException
*/
public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException;
public ReplicaHandler recoverAppend(
ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;

/**
* Recover a failed pipeline close
Expand Down
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;

import java.io.Closeable;
import java.io.IOException;

/**
* This is the interface for holding reference count as AutoClosable resource.
* It increases the reference count by one in the constructor, and decreases
* the reference count by one in {@link #close()}.
*
* <pre>
* {@code
* try (FsVolumeReference ref = volume.obtainReference()) {
* // Do IOs on the volume
* volume.createRwb(...);
* ...
* }
* }
* </pre>
*/
public interface FsVolumeReference extends Closeable {
/**
* Descrese the reference count of the volume.
* @throws IOException it never throws IOException.
*/
@Override
public void close() throws IOException;

/** Returns the underlying volume object */
public FsVolumeSpi getVolume();
}
Expand Up @@ -19,13 +19,23 @@

import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;

import org.apache.hadoop.hdfs.StorageType;

/**
* This is an interface for the underlying volume.
*/
public interface FsVolumeSpi {
/**
* Obtain a reference object that had increased 1 reference count of the
* volume.
*
* It is caller's responsibility to close {@link FsVolumeReference} to decrease
* the reference count on the volume.
*/
FsVolumeReference obtainReference() throws ClosedChannelException;

/** @return the StorageUuid of the volume */
public String getStorageID();

Expand Down
Expand Up @@ -30,9 +30,12 @@
public class ReplicaInputStreams implements Closeable {
private final InputStream dataIn;
private final InputStream checksumIn;
private final FsVolumeReference volumeRef;

/** Create an object with a data input stream and a checksum input stream. */
public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd) {
public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd,
FsVolumeReference volumeRef) {
this.volumeRef = volumeRef;
this.dataIn = new FileInputStream(dataFd);
this.checksumIn = new FileInputStream(checksumFd);
}
Expand All @@ -51,5 +54,6 @@ public InputStream getChecksumIn() {
public void close() {
IOUtils.closeStream(dataIn);
IOUtils.closeStream(checksumIn);
IOUtils.cleanup(null, volumeRef);
}
}

0 comments on commit b7f4a31

Please sign in to comment.