From 914580934c566cd18019035b244f82006868bd7b Mon Sep 17 00:00:00 2001 From: Walter Su Date: Tue, 19 May 2015 14:59:23 +0800 Subject: [PATCH] HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker. Contributed by Rakesh R. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 5 ++ .../hdfs/server/datanode/BPOfferService.java | 1 + .../erasurecode/ErasureCodingWorker.java | 59 +++++++++---------- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 939ba89b7e745..1e7dbeaec11ad 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -218,6 +218,8 @@ HDFS-8367. BlockInfoStriped uses EC schema. (Kai Sasaki via Kai Zheng) + HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549) + HDFS-8417. Erasure Coding: Pread failed to read data starting from not-first stripe. (Walter Su via jing9) @@ -228,3 +230,6 @@ HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue configurable in DFSStripedOutputStream. (Li Bo) + + HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker. + (Rakesh R via waltersu4549) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 6606d0bbc7fde..d77b36dc1e02c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -728,6 +728,7 @@ assert getBlockPoolId().equals(bp) : LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY"); Collection ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks(); dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); + break; default: LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index a1c0f722ce3a5..4723e9fca3bce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; @@ -88,12 +87,12 @@ * commands. */ public final class ErasureCodingWorker { - private final Log LOG = DataNode.LOG; + private static final Log LOG = DataNode.LOG; private final DataNode datanode; - private Configuration conf; + private final Configuration conf; - private ThreadPoolExecutor STRIPED_READ_TRHEAD_POOL; + private ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private final int STRIPED_READ_THRESHOLD_MILLIS; private final int STRIPED_READ_BUFFER_SIZE; @@ -121,7 +120,10 @@ private RawErasureDecoder newDecoder() { } private void initializeStripedReadThreadPool(int num) { - STRIPED_READ_TRHEAD_POOL = new ThreadPoolExecutor(1, num, 60, + if (LOG.isDebugEnabled()) { + LOG.debug("Using striped reads; pool threads=" + num); + } + STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, TimeUnit.SECONDS, new SynchronousQueue(), new Daemon.DaemonFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); @@ -141,7 +143,7 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { super.rejectedExecution(runnable, e); } }); - STRIPED_READ_TRHEAD_POOL.allowCoreThreadTimeOut(true); + STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); } /** @@ -231,23 +233,23 @@ private class ReconstructAndTransferBlock implements Runnable { // sources private final short[] liveIndices; - private DatanodeInfo[] sources; + private final DatanodeInfo[] sources; - private List stripedReaders; + private final List stripedReaders; // targets - private DatanodeInfo[] targets; - private StorageType[] targetStorageTypes; + private final DatanodeInfo[] targets; + private final StorageType[] targetStorageTypes; - private short[] targetIndices; - private ByteBuffer[] targetBuffers; + private final short[] targetIndices; + private final ByteBuffer[] targetBuffers; - private Socket[] targetSockets; - private DataOutputStream[] targetOutputStreams; - private DataInputStream[] targetInputStreams; + private final Socket[] targetSockets; + private final DataOutputStream[] targetOutputStreams; + private final DataInputStream[] targetInputStreams; - private long[] blockOffset4Targets; - private long[] seqNo4Targets; + private final long[] blockOffset4Targets; + private final long[] seqNo4Targets; private final int WRITE_PACKET_SIZE = 64 * 1024; private DataChecksum checksum; @@ -257,11 +259,11 @@ private class ReconstructAndTransferBlock implements Runnable { private int bytesPerChecksum; private int checksumSize; - private CachingStrategy cachingStrategy; + private final CachingStrategy cachingStrategy; - private Map, Integer> futures = new HashMap<>(); - private CompletionService readService = - new ExecutorCompletionService<>(STRIPED_READ_TRHEAD_POOL); + private final Map, Integer> futures = new HashMap<>(); + private final CompletionService readService = + new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL); ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) { ECSchema schema = recoveryInfo.getECSchema(); @@ -277,7 +279,8 @@ private class ReconstructAndTransferBlock implements Runnable { Preconditions.checkArgument(liveIndices.length >= dataBlkNum, "No enough live striped blocks."); - Preconditions.checkArgument(liveIndices.length == sources.length); + Preconditions.checkArgument(liveIndices.length == sources.length, + "liveBlockIndices and source dns should match"); targets = recoveryInfo.getTargetDnInfos(); targetStorageTypes = recoveryInfo.getTargetStorageTypes(); @@ -336,7 +339,6 @@ public void run() { if (nsuccess < dataBlkNum) { String error = "Can't find minimum sources required by " + "recovery, block id: " + blockGroup.getBlockId(); - LOG.warn(error); throw new IOException(error); } @@ -358,7 +360,6 @@ public void run() { boolean[] targetsStatus = new boolean[targets.length]; if (initTargetStreams(targetsStatus) == 0) { String error = "All targets are failed."; - LOG.warn(error); throw new IOException(error); } @@ -372,7 +373,6 @@ public void run() { if (nsuccess < dataBlkNum) { String error = "Can't read data from minimum number of sources " + "required by recovery, block id: " + blockGroup.getBlockId(); - LOG.warn(error); throw new IOException(error); } @@ -385,7 +385,6 @@ public void run() { // step3: transfer data if (transferData2Targets(targetsStatus) == 0) { String error = "Transfer failed for all targets."; - LOG.warn(error); throw new IOException(error); } @@ -906,11 +905,11 @@ private int initTargetStreams(boolean[] targetsStatus) { } private class StripedReader { - short index; - BlockReader blockReader; - ByteBuffer buffer; + private final short index; + private BlockReader blockReader; + private ByteBuffer buffer; - public StripedReader(short index) { + private StripedReader(short index) { this.index = index; } }