Skip to content

Commit

Permalink
HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. Contr…
Browse files Browse the repository at this point in the history
…ibuted by Yi Liu.
  • Loading branch information
zhe-thoughts authored and Zhe Zhang committed May 26, 2015
1 parent 220ca96 commit 6616de2
Show file tree
Hide file tree
Showing 14 changed files with 1,377 additions and 55 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -175,3 +175,6 @@

HDFS-7672. Handle write failure for stripping blocks and refactor the
existing code in DFSStripedOutputStream and StripedDataStreamer. (szetszwo)

HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks.
(Yi Liu via Zhe Zhang)
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.util.DataChecksum;

/**
* A BlockReader is responsible for reading a single block
Expand Down Expand Up @@ -99,4 +100,9 @@ public interface BlockReader extends ByteBufferReadable {
* supported.
*/
ClientMmap getClientMmap(EnumSet<ReadOption> opts);

/**
* @return The DataChecksum used by the read block
*/
DataChecksum getDataChecksum();
}
Expand Up @@ -738,4 +738,9 @@ void forceAnchorable() {
void forceUnanchorable() {
replica.getSlot().makeUnanchorable();
}

@Override
public DataChecksum getDataChecksum() {
return checksum;
}
}
Expand Up @@ -732,4 +732,9 @@ public boolean isShortCircuit() {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}

@Override
public DataChecksum getDataChecksum() {
return checksum;
}
}
Expand Up @@ -369,6 +369,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads";
public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20;
public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 256 * 1024;
public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";
Expand Down
Expand Up @@ -37,7 +37,7 @@
****************************************************************/

@InterfaceAudience.Private
class DFSPacket {
public class DFSPacket {
public static final long HEART_BEAT_SEQNO = -1L;
private static long[] EMPTY = new long[0];
private final long seqno; // sequence number of buffer in block
Expand Down Expand Up @@ -80,7 +80,7 @@ class DFSPacket {
* @param checksumSize the size of checksum
* @param lastPacketInBlock if this is the last packet
*/
DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
int checksumSize, boolean lastPacketInBlock) {
this.lastPacketInBlock = lastPacketInBlock;
this.numChunks = 0;
Expand Down Expand Up @@ -114,7 +114,7 @@ synchronized void writeData(byte[] inarray, int off, int len)
dataPos += len;
}

synchronized void writeData(ByteBuffer inBuffer, int len)
public synchronized void writeData(ByteBuffer inBuffer, int len)
throws ClosedChannelException {
checkBuffer();
len = len > inBuffer.remaining() ? inBuffer.remaining() : len;
Expand All @@ -135,7 +135,7 @@ synchronized void writeData(ByteBuffer inBuffer, int len)
* @param len the length of checksums to write
* @throws ClosedChannelException
*/
synchronized void writeChecksum(byte[] inarray, int off, int len)
public synchronized void writeChecksum(byte[] inarray, int off, int len)
throws ClosedChannelException {
checkBuffer();
if (len == 0) {
Expand All @@ -154,7 +154,7 @@ synchronized void writeChecksum(byte[] inarray, int off, int len)
* @param stm
* @throws IOException
*/
synchronized void writeTo(DataOutputStream stm) throws IOException {
public synchronized void writeTo(DataOutputStream stm) throws IOException {
checkBuffer();

final int dataLen = dataPos - dataStart;
Expand Down
Expand Up @@ -505,4 +505,9 @@ public boolean isShortCircuit() {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}

@Override
public DataChecksum getDataChecksum() {
return checksum;
}
}
Expand Up @@ -474,4 +474,9 @@ public boolean isShortCircuit() {
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
return null;
}

@Override
public DataChecksum getDataChecksum() {
return checksum;
}
}
Expand Up @@ -235,6 +235,33 @@ public long getMaxLockedMemory() {
return maxLockedMemory;
}

/**
* Returns true if connect to datanode via hostname
*
* @return boolean true if connect to datanode via hostname
*/
public boolean getConnectToDnViaHostname() {
return connectToDnViaHostname;
}

/**
* Returns socket timeout
*
* @return int socket timeout
*/
public int getSocketTimeout() {
return socketTimeout;
}

/**
* Returns socket write timeout
*
* @return int socket write timeout
*/
public int getSocketWriteTimeout() {
return socketWriteTimeout;
}

/**
* Returns the SaslPropertiesResolver configured for use with
* DataTransferProtocol, or null if not configured.
Expand Down
Expand Up @@ -1161,7 +1161,8 @@ void startDataNode(Configuration conf,
saslClient = new SaslDataTransferClient(dnConf.conf,
dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
ecWorker = new ErasureCodingWorker(conf); // Initialize ErasureCoding worker
// Initialize ErasureCoding worker
ecWorker = new ErasureCodingWorker(conf, this);
}

/**
Expand Down Expand Up @@ -1226,6 +1227,10 @@ public static String generateUuid() {
return UUID.randomUUID().toString();
}

public SaslDataTransferClient getSaslClient() {
return saslClient;
}

/**
* Verify that the DatanodeUuid has been initialized. If this is a new
* datanode then we generate a new Datanode Uuid and persist it to disk.
Expand Down Expand Up @@ -1488,7 +1493,7 @@ public DatanodeRegistration getDNRegistrationForBP(String bpid)
/**
* Creates either NIO or regular depending on socketWriteTimeout.
*/
protected Socket newSocket() throws IOException {
public Socket newSocket() throws IOException {
return (dnConf.socketWriteTimeout > 0) ?
SocketChannel.open().socket() : new Socket();
}
Expand Down Expand Up @@ -2143,11 +2148,8 @@ public void run() {
//
// Header info
//
Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) {
accessToken = blockPoolTokenSecretManager.generateToken(b,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
}
Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));

long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
Expand Down Expand Up @@ -2214,14 +2216,27 @@ public void run() {
}
}

/***
* Use BlockTokenSecretManager to generate block token for current user.
*/
public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
EnumSet<AccessMode> mode) throws IOException {
Token<BlockTokenIdentifier> accessToken =
BlockTokenSecretManager.DUMMY_TOKEN;
if (isBlockTokenEnabled) {
accessToken = blockPoolTokenSecretManager.generateToken(b, mode);
}
return accessToken;
}

/**
* Returns a new DataEncryptionKeyFactory that generates a key from the
* BlockPoolTokenSecretManager, using the block pool ID of the given block.
*
* @param block for which the factory needs to create a key
* @return DataEncryptionKeyFactory for block's block pool ID
*/
DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
final ExtendedBlock block) {
return new DataEncryptionKeyFactory() {
@Override
Expand Down

0 comments on commit 6616de2

Please sign in to comment.