Permalink
Browse files

[Raid] Merge Murali's raid changes into chunk

Summary:
Contain the following changes.
1. in DistributedRaidFileSystem.java: we should try and cache the parity information in ExtsFsInputStream.
2. in Decoder.java: invoke the ParallelStreamReader constructor with the
min(bufSize, limit).
3. in ParallelStreamReader.java: the constructor should take in a
blockSize parameter. This will save about 30ms when reading 100KB data.
4. in Decoder.java: do the arraycopy instead of reading one by one. This
will save about 10 ms when reading 100KB data.

Test Plan: ant test

Reviewers: weiyan, hkuang

Reviewed By: weiyan

Task ID: 1125574
  • Loading branch information...
1 parent f9e0431 commit 8356192be121910192bf6e91a86a1a09281148f6 dikang committed with Alex Feinberg Jun 28, 2012
@@ -300,6 +300,8 @@ public UnderlyingBlock(Path path, long actualFileOffset,
private final long blockSize;
private final int buffersize;
private final Configuration conf;
+ private Configuration innerConf;
+ private List<ParityFilePair> parityFilePairs;
ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs,
Path path, long fileSize, long blockSize, int buffersize)
@@ -327,6 +329,23 @@ public UnderlyingBlock(Path path, long actualFileOffset,
this.conf = conf;
this.lfs = lfs;
+ // Initialize the "inner" conf, and cache this for all future uses.
+ //Make sure we use DFS and not DistributedRaidFileSystem for unRaid.
+ this.innerConf = new Configuration(conf);
+ Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl",
+ DistributedFileSystem.class);
+ this.innerConf.set("fs.hdfs.impl", clazz.getName());
+ // Disable caching so that a previously cached RaidDfs is not used.
+ this.innerConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+ // load the parity files
+ this.parityFilePairs = new ArrayList<ParityFilePair>();
+ for (Codec codec : Codec.getCodecs()) {
+ ParityFilePair ppair = ParityFilePair.getParityFile(codec,
+ this.path, this.innerConf);
+ this.parityFilePairs.add(ppair);
+ }
+
// Open a stream to the first block.
openCurrentStream();
}
@@ -463,7 +482,7 @@ public synchronized int read(byte[] b, int offset, int len)
nextLocation = 0;
return value;
}
-
+
@Override
public synchronized int read(long position, byte[] b, int offset, int len)
throws IOException {
@@ -473,6 +492,7 @@ public synchronized int read(long position, byte[] b, int offset, int len)
if (currentOffset >= fileSize) {
return -1;
}
+
openCurrentStream();
int limit = Math.min(blockAvailable(), len);
int value;
@@ -616,29 +636,25 @@ private DecoderInputStream getAlternateInputStream(IOException curexp,
long offset,
final long readLimit)
throws IOException{
+
+ // Start offset of block.
+ long corruptOffset = (offset / blockSize) * blockSize;
+
+
+ long fileLen = this.lfs.getFileStatus(path).getLen();
+ long limit = Math.min(readLimit,
+ blockSize - (offset - corruptOffset));
+ limit = Math.min(limit, fileLen);
+
while (nextLocation < Codec.getCodecs().size()) {
try {
int idx = nextLocation++;
Codec codec = Codec.getCodecs().get(idx);
- // Start offset of block.
- long corruptOffset = (offset / blockSize) * blockSize;
- //Make sure we use DFS and not DistributedRaidFileSystem for unRaid.
- Configuration clientConf = new Configuration(conf);
- Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl",
- DistributedFileSystem.class);
- clientConf.set("fs.hdfs.impl", clazz.getName());
- // Disable caching so that a previously cached RaidDfs is not used.
- clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
-
- long fileLen = this.lfs.getFileStatus(path).getLen();
- long limit = Math.min(readLimit,
- blockSize - (offset - corruptOffset));
- limit = Math.min(limit, fileLen);
DecoderInputStream recoveryStream =
- RaidNode.unRaidCorruptInputStream(clientConf, path,
- codec, offset, limit);
+ RaidNode.unRaidCorruptInputStream(innerConf, path,
+ codec, parityFilePairs.get(idx), blockSize, offset, limit);
if (null != recoveryStream) {
return recoveryStream;
@@ -397,7 +397,7 @@ long fixErasedBlockImpl(FileSystem srcFs, Path srcFile, FileSystem parityFs,
private byte[] buffer;
private long bufferLen;
private int position;
- private long numRead = 0;
+ private long streamOffset = 0;
private final Progressable reporter;
private InputStream[] inputs;
@@ -455,11 +455,17 @@ public long getCurrentOffset() {
}
public long getAvailable() {
- return limit - numRead;
+ return limit - streamOffset;
}
- private void checkBuffer() throws IOException {
- if (numRead >= limit) {
+ /**
+ * Will init the required objects, start the parallel reader, and
+ * put the decoding result in buffer in this method.
+ *
+ * @throws IOException
+ */
+ private void init() throws IOException {
+ if (streamOffset >= limit) {
buffer = null;
return;
}
@@ -474,8 +480,7 @@ private void checkBuffer() throws IOException {
if (null == parallelReader) {
- long offsetInBlock = numRead + startOffsetInBlock;
-
+ long offsetInBlock = streamOffset + startOffsetInBlock;
FileStatus srcStat = srcFs.getFileStatus(srcFile);
FileStatus parityStat = parityFs.getFileStatus(parityFile);
StripeReader sReader = StripeReader.getStripeReader(codec, conf,
@@ -492,7 +497,8 @@ private void checkBuffer() throws IOException {
}
assert(parallelReader == null);
- parallelReader = new ParallelStreamReader(reporter, inputs, bufSize,
+ parallelReader = new ParallelStreamReader(reporter, inputs,
+ (int)Math.min(bufSize, limit),
parallelism, boundedBufferCapacity, limit);
parallelReader.start();
}
@@ -514,23 +520,23 @@ private void checkBuffer() throws IOException {
for (int i=0; i<locationsToFix.length; i++) {
if (locationsToFix[i] == erasedLocationToFix) {
buffer = writeBufs[i];
- bufferLen = Math.min(bufSize, limit - numRead);
+ bufferLen = Math.min(bufSize, limit - streamOffset);
position = 0;
break;
}
}
}
}
-
- @Override
- public int read() throws IOException {
- while (numRead <= limit) {
+
+ /**
+ * make sure we have the correct decoding data in the buffer.
+ *
+ * @throws IOException
+ */
+ private void checkBuffer() throws IOException {
+ while (streamOffset <= limit) {
try {
- checkBuffer();
- if (null == parallelReader) {
- return -1;
- }
-
+ init();
break;
} catch (IOException e) {
if (e instanceof TooManyErasedLocations) {
@@ -541,16 +547,24 @@ public int read() throws IOException {
parallelReader.shutdown();
parallelReader = null;
}
- RaidUtils.closeStreams(inputs);
+ if (inputs != null) {
+ RaidUtils.closeStreams(inputs);
+ }
}
}
+ }
+
+ @Override
+ public int read() throws IOException {
+
+ checkBuffer();
if (null == buffer) {
return -1;
}
int result = buffer[position] & 0xff;
position ++;
- numRead ++;
+ streamOffset ++;
currentOffset ++;
return result;
@@ -574,36 +588,42 @@ public int read(byte[] b, int off, int len) throws IOException {
} else if (len == 0) {
return 0;
}
+
+ int numRead = 0;
+ while (numRead < len) {
+ try {
+ checkBuffer();
+ } catch(IOException e) {
+ long delay = System.currentTimeMillis() - startTime;
+ logRaidReconstructionMetrics("FAILURE", 0, codec, delay,
+ erasedLocations.size(), dfsNumRead);
+ throw e;
+ }
- int c = -1;
- try {
- c = read();
- } catch(IOException e) {
- long delay = System.currentTimeMillis() - startTime;
- logRaidReconstructionMetrics("FAILURE", 0, codec, delay,
- erasedLocations.size(), dfsNumRead);
- throw e;
- }
- if (c == -1) {
- return -1;
- }
- b[off] = (byte)c;
- int i = 1;
- try {
- for (; i < len ; i++) {
- c = read();
- if (c == -1) {
- break;
+ if (null == buffer) {
+ if (numRead > 0) {
+ logRaidReconstructionMetrics("SUCCESS", (int)numRead, codec,
+ System.currentTimeMillis() - startTime,
+ erasedLocations.size(), dfsNumRead);
+ return (int)numRead;
}
- b[off + i] = (byte)c;
+ return -1;
}
- } catch (IOException ee) {
+
+ int numBytesToCopy = (int) Math.min(bufferLen - position,
+ len - numRead);
+ System.arraycopy(buffer, position, b, off, numBytesToCopy);
+ position += numBytesToCopy;
+ currentOffset += numBytesToCopy;
+ streamOffset += numBytesToCopy;
+ off += numBytesToCopy;
+ numRead += numBytesToCopy;
}
-
- long delay = System.currentTimeMillis() - startTime;
- logRaidReconstructionMetrics("SUCCESS", i, codec, delay,
+
+ logRaidReconstructionMetrics("SUCCESS", numRead, codec,
+ System.currentTimeMillis() - startTime,
erasedLocations.size(), dfsNumRead);
- return i;
+ return (int)numRead;
}
private void logRaidReconstructionMetrics(
@@ -634,7 +654,9 @@ public void close() throws IOException {
parallelReader.shutdown();
parallelReader = null;
}
- RaidUtils.closeStreams(inputs);
+ if (inputs != null) {
+ RaidUtils.closeStreams(inputs);
+ }
super.close();
}
}
@@ -329,7 +329,8 @@ void encodeStripe(
configureBuffers(blockSize);
int boundedBufferCapacity = 1;
ParallelStreamReader parallelReader = new ParallelStreamReader(
- reporter, blocks, bufSize, parallelism, boundedBufferCapacity, blockSize);
+ reporter, blocks, bufSize,
+ parallelism, boundedBufferCapacity, blockSize);
parallelReader.start();
try {
for (long encoded = 0; encoded < blockSize; encoded += bufSize) {
@@ -22,6 +22,7 @@
import java.io.InputStream;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
@@ -35,6 +36,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
@@ -104,6 +106,7 @@ IOException getException() {
* @param numThreads Number of threads to use for parallelism.
* @param boundedBuffer The queue to place the results in.
*/
+
public ParallelStreamReader(
Progressable reporter,
InputStream[] streams,
@@ -118,11 +121,14 @@ public ParallelStreamReader(
this.streams[i] = streams[i];
if (this.streams[i] instanceof DFSDataInputStream) {
DFSDataInputStream stream = (DFSDataInputStream)this.streams[i];
- if (stream.getAllBlocks().size() == 0) {
+ // in directory raiding, the block size for each input stream
+ // might be different, so we need to determine the endOffset of
+ // each stream by their own block size.
+ List<LocatedBlock> blocks = stream.getAllBlocks();
+ if (blocks.size() == 0) {
this.endOffsets[i] = Long.MAX_VALUE;
} else {
- this.endOffsets[i] = stream.getPos() +
- stream.getAllBlocks().get(0).getBlockSize();
+ this.endOffsets[i] = stream.getPos() + blocks.get(0).getBlockSize();
}
} else {
this.endOffsets[i] = Long.MAX_VALUE;
@@ -1067,7 +1067,7 @@ static private void generateParityFile(Configuration conf, FileStatus stat,
throws IOException {
Path inpath = stat.getPath();
Path outpath = getOriginalParityFile(destPathPrefix, inpath);
- FileSystem outFs = outpath.getFileSystem(conf);
+ FileSystem outFs = inFs;
// If the parity file is already upto-date and source replication is set
// then nothing to do.
@@ -1136,23 +1136,23 @@ static private void generateParityFile(Configuration conf, FileStatus stat,
public static DecoderInputStream unRaidCorruptInputStream(Configuration conf,
- Path srcPath, Codec codec, long corruptOffset,
+ Path srcPath, Codec codec, ParityFilePair parityFilePair,
+ long blockSize,
+ long corruptOffset,
long limit)
throws IOException {
- // Test if parity file exists
- ParityFilePair ppair = ParityFilePair.getParityFile(codec, srcPath, conf);
- if (ppair == null) {
+ // Test if parity file exists
+ if (parityFilePair == null) {
LOG.warn("Could not find " + codec.id + " parity file for " + srcPath);
return null;
}
FileSystem srcFs = srcPath.getFileSystem(conf);
- FileStatus stat = srcFs.getFileStatus(srcPath);
Decoder decoder = new Decoder(conf, codec);
return decoder.generateAlternateStream(srcFs, srcPath,
- ppair.getFileSystem(), ppair.getPath(),
- stat.getBlockSize(), corruptOffset, limit, null);
+ parityFilePair.getFileSystem(), parityFilePair.getPath(),
+ blockSize, corruptOffset, limit, null);
}
private void doHar() throws IOException, InterruptedException {

0 comments on commit 8356192

Please sign in to comment.