Skip to content

Commit

Permalink
Revert "HDFS-7915. The DataNode can sometimes allocate a ShortCircuit…
Browse files Browse the repository at this point in the history
…Shm slot and fail to tell the DFSClient about it because of a network error (cmccabe)" (jenkins didn't run yet)

This reverts commit 5aa892e.
  • Loading branch information
Colin Patrick Mccabe committed Mar 14, 2015
1 parent 5aa892e commit 32741cf
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 178 deletions.
3 changes: 0 additions & 3 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -1177,9 +1177,6 @@ Release 2.7.0 - UNRELEASED
HDFS-7722. DataNode#checkDiskError should also remove Storage when error
is found. (Lei Xu via Colin P. McCabe)

HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and
fail to tell the DFSClient about it because of a network error (cmccabe)

Release 2.6.1 - UNRELEASED

INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs;

import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
Expand Down Expand Up @@ -71,23 +69,12 @@
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);

public static class FailureInjector {
public void injectRequestFileDescriptorsFailure() throws IOException {
// do nothing
}
}

@VisibleForTesting
static ShortCircuitReplicaCreator
createShortCircuitReplicaInfoCallback = null;

private final DFSClient.Conf conf;

/**
* Injects failures into specific operations during unit tests.
*/
private final FailureInjector failureInjector;

/**
* The file name, for logging and debugging purposes.
*/
Expand Down Expand Up @@ -182,7 +169,6 @@ public void injectRequestFileDescriptorsFailure() throws IOException {

public BlockReaderFactory(DFSClient.Conf conf) {
this.conf = conf;
this.failureInjector = conf.brfFailureInjector;
this.remainingCacheTries = conf.nCachedConnRetry;
}

Expand Down Expand Up @@ -532,12 +518,11 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
SlotId slotId = slot == null ? null : slot.getSlotId();
new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true);
new Sender(out).requestShortCircuitFds(block, token, slotId, 1);
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
DomainSocket sock = peer.getDomainSocket();
failureInjector.injectRequestFileDescriptorsFailure();
switch (resp.getStatus()) {
case SUCCESS:
byte buf[] = new byte[1];
Expand All @@ -547,13 +532,8 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
try {
ExtendedBlockId key =
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
LOG.trace("Sending receipt verification byte for slot " + slot);
sock.getOutputStream().write(0);
}
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
Time.monotonicNow(), slot);
return new ShortCircuitReplicaInfo(replica);
} catch (IOException e) {
// This indicates an error reading from disk, or a format error. Since
// it's not a socket communication problem, we return null rather than
Expand All @@ -565,6 +545,7 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
}
}
return new ShortCircuitReplicaInfo(replica);
case ERROR_UNSUPPORTED:
if (!resp.hasShortCircuitAccessVersion()) {
LOG.warn("short-circuit read access is disabled for " +
Expand Down
Expand Up @@ -337,8 +337,6 @@ public static class Conf {
final long shortCircuitCacheStaleThresholdMs;

final long keyProviderCacheExpiryMs;
public BlockReaderFactory.FailureInjector brfFailureInjector =
new BlockReaderFactory.FailureInjector();

public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
Expand Down
Expand Up @@ -138,13 +138,10 @@ public void transferBlock(final ExtendedBlock blk,
* to use no slot id.
* @param maxVersion Maximum version of the block data the client
* can understand.
* @param supportsReceiptVerification True if the client supports
* receipt verification.
*/
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException;
SlotId slotId, int maxVersion) throws IOException;

/**
* Release a pair of short-circuit FDs requested earlier.
Expand Down
Expand Up @@ -186,7 +186,7 @@ private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
try {
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()),
slotId, proto.getMaxVersion(), true);
slotId, proto.getMaxVersion());
} finally {
if (traceScope != null) traceScope.close();
}
Expand Down
Expand Up @@ -181,16 +181,14 @@ public void transferBlock(final ExtendedBlock blk,
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException {
SlotId slotId, int maxVersion) throws IOException {
OpRequestShortCircuitAccessProto.Builder builder =
OpRequestShortCircuitAccessProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(
blk, blockToken)).setMaxVersion(maxVersion);
if (slotId != null) {
builder.setSlotId(PBHelper.convert(slotId));
}
builder.setSupportsReceiptVerification(supportsReceiptVerification);
OpRequestShortCircuitAccessProto proto = builder.build();
send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
}
Expand Down
Expand Up @@ -22,8 +22,6 @@
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import static org.apache.hadoop.util.Time.now;

Expand Down Expand Up @@ -293,83 +291,64 @@ public void run() {
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token,
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException {
SlotId slotId, int maxVersion) throws IOException {
updateCurrentThreadName("Passing file descriptors for block " + blk);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] = null;
SlotId registeredSlotId = null;
boolean success = false;
try {
if (peer.getDomainSocket() == null) {
throw new IOException("You cannot pass file descriptors over " +
"anything but a UNIX domain socket.");
}
if (slotId != null) {
boolean isCached = datanode.data.
isCached(blk.getBlockPoolId(), blk.getBlockId());
datanode.shortCircuitRegistry.registerSlot(
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
}
try {
if (peer.getDomainSocket() == null) {
throw new IOException("You cannot pass file descriptors over " +
"anything but a UNIX domain socket.");
}
if (slotId != null) {
boolean isCached = datanode.data.
isCached(blk.getBlockPoolId(), blk.getBlockId());
datanode.shortCircuitRegistry.registerSlot(
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
registeredSlotId = slotId;
}
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
Preconditions.checkState(fis != null);
bld.setStatus(SUCCESS);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
} catch (ShortCircuitFdsVersionException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
bld.setMessage(e.getMessage());
} catch (ShortCircuitFdsUnsupportedException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setMessage(e.getMessage());
} catch (InvalidToken e) {
bld.setStatus(ERROR_ACCESS_TOKEN);
bld.setMessage(e.getMessage());
} catch (IOException e) {
bld.setStatus(ERROR);
bld.setMessage(e.getMessage());
} finally {
if ((fis == null) && (slotId != null)) {
datanode.shortCircuitRegistry.unregisterSlot(slotId);
}
}
bld.setStatus(SUCCESS);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
} catch (ShortCircuitFdsVersionException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
bld.setMessage(e.getMessage());
} catch (ShortCircuitFdsUnsupportedException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setMessage(e.getMessage());
} catch (InvalidToken e) {
bld.setStatus(ERROR_ACCESS_TOKEN);
bld.setMessage(e.getMessage());
} catch (IOException e) {
bld.setStatus(ERROR);
bld.setMessage(e.getMessage());
}
try {
bld.build().writeDelimitedTo(socketOut);
if (fis != null) {
FileDescriptor fds[] = new FileDescriptor[fis.length];
for (int i = 0; i < fds.length; i++) {
fds[i] = fis[i].getFD();
}
byte buf[] = new byte[1];
if (supportsReceiptVerification) {
buf[0] = (byte)USE_RECEIPT_VERIFICATION.getNumber();
} else {
buf[0] = (byte)DO_NOT_USE_RECEIPT_VERIFICATION.getNumber();
}
DomainSocket sock = peer.getDomainSocket();
sock.sendFileDescriptors(fds, buf, 0, buf.length);
if (supportsReceiptVerification) {
LOG.trace("Reading receipt verification byte for " + slotId);
int val = sock.getInputStream().read();
if (val < 0) {
throw new EOFException();
}
} else {
LOG.trace("Receipt verification is not enabled on the DataNode. " +
"Not verifying " + slotId);
}
success = true;
byte buf[] = new byte[] { (byte)0 };
peer.getDomainSocket().
sendFileDescriptors(fds, buf, 0, buf.length);
}
} finally {
if ((!success) && (registeredSlotId != null)) {
LOG.info("Unregistering " + registeredSlotId + " because the " +
"requestShortCircuitFdsForRead operation failed.");
datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
}
if (ClientTraceLog.isInfoEnabled()) {
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
.getBlockPoolId());
BlockSender.ClientTraceLog.info(String.format(
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
" blockid: %s, srvID: %s, success: %b",
blk.getBlockId(), dnR.getDatanodeUuid(), success));
blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
));
}
if (fis != null) {
IOUtils.cleanup(LOG, fis);
Expand Down
Expand Up @@ -30,7 +30,6 @@
import java.util.Iterator;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -84,7 +83,7 @@ public class ShortCircuitRegistry {

private static final int SHM_LENGTH = 8192;

public static class RegisteredShm extends ShortCircuitShm
private static class RegisteredShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler {
private final String clientName;
private final ShortCircuitRegistry registry;
Expand Down Expand Up @@ -384,14 +383,4 @@ public void shutdown() {
}
IOUtils.closeQuietly(watcher);
}

public static interface Visitor {
void accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots);
}

@VisibleForTesting
public synchronized void visit(Visitor visitor) {
visitor.accept(segments, slots);
}
}
11 changes: 0 additions & 11 deletions hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
Expand Up @@ -179,12 +179,6 @@ message OpRequestShortCircuitAccessProto {
* The shared memory slot to use, if we are using one.
*/
optional ShortCircuitShmSlotProto slotId = 3;

/**
* True if the client supports verifying that the file descriptor has been
* sent successfully.
*/
optional bool supportsReceiptVerification = 4 [default = false];
}

message ReleaseShortCircuitAccessRequestProto {
Expand Down Expand Up @@ -236,11 +230,6 @@ enum Status {
IN_PROGRESS = 12;
}

enum ShortCircuitFdResponse {
DO_NOT_USE_RECEIPT_VERIFICATION = 0;
USE_RECEIPT_VERIFICATION = 1;
}

message PipelineAckProto {
required sint64 seqno = 1;
repeated uint32 reply = 2;
Expand Down

0 comments on commit 32741cf

Please sign in to comment.