Skip to content

Commit

Permalink
HDFS-8188. Erasure coding: refactor client-related code to sync with H…
Browse files Browse the repository at this point in the history
…DFS-8082 and HDFS-8169. Contributed by Zhe Zhang.
  • Loading branch information
zhe-thoughts authored and Zhe Zhang committed May 26, 2015
1 parent dfba46a commit 922631f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 36 deletions.
Expand Up @@ -177,6 +177,18 @@ interface HedgedRead {
int THREADPOOL_SIZE_DEFAULT = 0;
}

/** dfs.client.read.striped configuration properties */
interface StripedRead {
String PREFIX = Read.PREFIX + "striped.";

String THREADPOOL_SIZE_KEY = PREFIX + "threadpool.size";
/**
* With default 6+3 schema, each normal read could span 6 DNs. So this
* default value accommodates 3 read streams
*/
int THREADPOOL_SIZE_DEFAULT = 18;
}

/** dfs.http.client configuration properties */
interface HttpClient {
String PREFIX = "dfs.http.client.";
Expand Down
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;

import java.util.Arrays;

Expand All @@ -43,14 +42,6 @@ public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs,
System.arraycopy(indices, 0, blockIndices, 0, indices.length);
}

public LocatedStripedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages,
int[] indices, long startOffset, boolean corrupt) {
this(b, DatanodeStorageInfo.toDatanodeInfos(storages),
DatanodeStorageInfo.toStorageIDs(storages),
DatanodeStorageInfo.toStorageTypes(storages), indices,
startOffset, corrupt, EMPTY_LOCS);
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" + getBlock()
Expand Down
Expand Up @@ -382,21 +382,12 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
dfsClientConf);

if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize());
}
numThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE,
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
if (numThreads <= 0) {
LOG.warn("The value of "
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE
+ " must be greater than 0. The current setting is " + numThreads
+ ". Reset it to the default value "
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
numThreads =
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE;
}
this.initThreadsNumForStripedReads(numThreads);
this.initThreadsNumForHedgedReads(dfsClientConf.
getHedgedReadThreadpoolSize());
}

this.initThreadsNumForStripedReads(dfsClientConf.
getStripedReadThreadpoolSize());
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
Expand Down
Expand Up @@ -38,6 +38,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;

import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
Expand Down Expand Up @@ -101,6 +102,8 @@ public class DfsClientConf {
private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize;

private final int stripedReadThreadpoolSize;

public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf);
Expand Down Expand Up @@ -191,7 +194,7 @@ public DfsClientConf(Configuration conf) {
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
hdfsBlocksMetadataEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
fileBlockStorageLocationsNumThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
Expand All @@ -215,6 +218,13 @@ public DfsClientConf(Configuration conf) {
hedgedReadThreadpoolSize = conf.getInt(
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);

stripedReadThreadpoolSize = conf.getInt(
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT);
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
" must be greater than 0.");
}

private DataChecksum.Type getChecksumType(Configuration conf) {
Expand Down Expand Up @@ -491,6 +501,13 @@ public int getHedgedReadThreadpoolSize() {
return hedgedReadThreadpoolSize;
}

/**
* @return the stripedReadThreadpoolSize
*/
public int getStripedReadThreadpoolSize() {
return stripedReadThreadpoolSize;
}

/**
* @return the shortCircuitConf
*/
Expand Down Expand Up @@ -744,4 +761,4 @@ public String confAsString() {
return builder.toString();
}
}
}
}
Expand Up @@ -874,7 +874,7 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
blk);
return new LocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
false);
} else {
assert blk instanceof BlockInfoContiguousUnderConstruction;
Expand All @@ -883,13 +883,8 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
blk);
return new LocatedBlock(eb, storages, pos, false);
return newLocatedBlock(eb, storages, pos, false);
}
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction) blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return newLocatedBlock(eb, storages, pos, false);
}

// get block locations
Expand Down Expand Up @@ -932,7 +927,7 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) {
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return blockIndices == null ?
newLocatedBlock(eb, machines, pos, isCorrupt) :
new LocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
}

/** Create a LocatedBlocks. */
Expand Down Expand Up @@ -3514,7 +3509,7 @@ boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
if (pendingReplicationBlocksCount == 0 &&
underReplicatedBlocksCount == 0) {
LOG.info("Node {} is dead and there are no under-replicated" +
" blocks or blocks pending replication. Safe to decommission.",
" blocks or blocks pending replication. Safe to decommission.",
node);
return true;
}
Expand Down Expand Up @@ -3920,6 +3915,18 @@ public static LocatedBlock newLocatedBlock(
null);
}

public static LocatedStripedBlock newLocatedStripedBlock(
ExtendedBlock b, DatanodeStorageInfo[] storages,
int[] indices, long startOffset, boolean corrupt) {
// startOffset is unknown
return new LocatedStripedBlock(
b, DatanodeStorageInfo.toDatanodeInfos(storages),
DatanodeStorageInfo.toStorageIDs(storages),
DatanodeStorageInfo.toStorageTypes(storages),
indices, startOffset, corrupt,
null);
}

/**
* This class is used internally by {@link this#computeRecoveryWorkForBlocks}
* to represent a task to recover a block through replication or erasure
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
Expand All @@ -45,7 +46,7 @@ public class TestStripedINodeFile {
"userName", null, FsPermission.getDefault());

private static INodeFile createStripedINodeFile() {
return new INodeFile(INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
return new INodeFile(HdfsConstantsClient.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
null, (short)0, 1024L, HdfsConstants.COLD_STORAGE_POLICY_ID);
}

Expand Down

0 comments on commit 922631f

Please sign in to comment.