Skip to content

Commit

Permalink
HDFS-8734. Erasure Coding: fix one cell need two packets. Contributed…
Browse files Browse the repository at this point in the history
… by Walter Su.
  • Loading branch information
Jing9 committed Jul 14, 2015
1 parent 6ff957b commit 0a93712
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 55 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -350,3 +350,6 @@


HDFS-8702. Erasure coding: update BlockManager.blockHasEnoughRacks(..) logic HDFS-8702. Erasure coding: update BlockManager.blockHasEnoughRacks(..) logic
for striped block. (Kai Sasaki via jing9) for striped block. (Kai Sasaki via jing9)

HDFS-8734. Erasure Coding: fix one cell need two packets. (Walter Su via
jing9)
Expand Up @@ -419,7 +419,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len,


currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len); currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks(1); currentPacket.incNumChunks();
streamer.incBytesCurBlock(len); streamer.incBytesCurBlock(len);


// If packet is full, enqueue it for transmission // If packet is full, enqueue it for transmission
Expand Down
Expand Up @@ -259,10 +259,10 @@ synchronized int getNumChunks() {
} }


/** /**
* increase the number of chunks by n * increase the number of chunks by one
*/ */
synchronized void incNumChunks(int n) { synchronized void incNumChunks() {
numChunks += n; numChunks++;
} }


/** /**
Expand Down
Expand Up @@ -254,6 +254,7 @@ private void flipDataBuffers() {
private final CellBuffers cellBuffers; private final CellBuffers cellBuffers;
private final RawErasureEncoder encoder; private final RawErasureEncoder encoder;
private final List<StripedDataStreamer> streamers; private final List<StripedDataStreamer> streamers;
private final DFSPacket[] currentPackets; // current Packet of each streamer


/** Size of each striping cell, must be a multiple of bytesPerChecksum */ /** Size of each striping cell, must be a multiple of bytesPerChecksum */
private final int cellSize; private final int cellSize;
Expand Down Expand Up @@ -301,6 +302,7 @@ ExtendedBlock getBlock() {
s.add(streamer); s.add(streamer);
} }
streamers = Collections.unmodifiableList(s); streamers = Collections.unmodifiableList(s);
currentPackets = new DFSPacket[streamers.size()];
setCurrentStreamer(0); setCurrentStreamer(0);
} }


Expand All @@ -316,9 +318,18 @@ private synchronized StripedDataStreamer getCurrentStreamer() {
return (StripedDataStreamer)streamer; return (StripedDataStreamer)streamer;
} }


private synchronized StripedDataStreamer setCurrentStreamer(int i) { private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
streamer = streamers.get(i); throws IOException {
// backup currentPacket for current streamer
int oldIdx = streamers.indexOf(streamer);
if (oldIdx >= 0) {
currentPackets[oldIdx] = currentPacket;
}

streamer = streamers.get(newIdx);
currentPacket = currentPackets[newIdx];
adjustChunkBoundary(); adjustChunkBoundary();

return getCurrentStreamer(); return getCurrentStreamer();
} }


Expand Down Expand Up @@ -366,41 +377,6 @@ private void handleStreamerFailure(String err,
currentPacket = null; currentPacket = null;
} }


/**
* Generate packets from a given buffer. This is only used for streamers
* writing parity blocks.
*
* @param byteBuffer the given buffer to generate packets
* @param checksumBuf the checksum buffer
* @return packets generated
* @throws IOException
*/
private List<DFSPacket> generatePackets(
ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{
List<DFSPacket> packets = new ArrayList<>();
assert byteBuffer.hasArray();
getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0,
byteBuffer.remaining(), checksumBuf, 0);
int ckOff = 0;
while (byteBuffer.remaining() > 0) {
DFSPacket p = createPacket(packetSize, chunksPerPacket,
getCurrentStreamer().getBytesCurBlock(),
getCurrentStreamer().getAndIncCurrentSeqno(), false);
int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
maxBytesToPacket: byteBuffer.remaining();
int chunks = (toWrite - 1) / bytesPerChecksum + 1;
int ckLen = chunks * getChecksumSize();
p.writeChecksum(checksumBuf, ckOff, ckLen);
ckOff += ckLen;
p.writeData(byteBuffer, toWrite);
getCurrentStreamer().incBytesCurBlock(toWrite);
p.incNumChunks(chunks);
packets.add(p);
}
return packets;
}

@Override @Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len, protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException { byte[] checksum, int ckoff, int cklen) throws IOException {
Expand All @@ -413,11 +389,6 @@ protected synchronized void writeChunk(byte[] bytes, int offset, int len,
if (!current.isFailed()) { if (!current.isFailed()) {
try { try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);

// cell is full and current packet has not been enqueued,
if (cellFull && currentPacket != null) {
enqueueCurrentPacketFull();
}
} catch(Exception e) { } catch(Exception e) {
handleStreamerFailure("offset=" + offset + ", length=" + len, e); handleStreamerFailure("offset=" + offset + ", length=" + len, e);
} }
Expand Down Expand Up @@ -581,10 +552,14 @@ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
final long oldBytes = current.getBytesCurBlock(); final long oldBytes = current.getBytesCurBlock();
if (!current.isFailed()) { if (!current.isFailed()) {
try { try {
for (DFSPacket p : generatePackets(buffer, checksumBuf)) { DataChecksum sum = getDataChecksum();
getCurrentStreamer().waitAndQueuePacket(p); sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
getChecksumSize());
} }
endBlock();
} catch(Exception e) { } catch(Exception e) {
handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
} }
Expand Down Expand Up @@ -628,16 +603,13 @@ protected synchronized void closeImpl() throws IOException {
// flush from all upper layers // flush from all upper layers
try { try {
flushBuffer(); flushBuffer();
if (currentPacket != null) { // if the last stripe is incomplete, generate and write parity cells
enqueueCurrentPacket(); writeParityCellsForLastStripe();
} enqueueAllCurrentPackets();
} catch(Exception e) { } catch(Exception e) {
handleStreamerFailure("closeImpl", e); handleStreamerFailure("closeImpl", e);
} }


// if the last stripe is incomplete, generate and write parity cells
writeParityCellsForLastStripe();

for (int i = 0; i < numAllBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i); final StripedDataStreamer s = setCurrentStreamer(i);
if (!s.isFailed()) { if (!s.isFailed()) {
Expand Down Expand Up @@ -667,4 +639,15 @@ protected synchronized void closeImpl() throws IOException {
setClosed(); setClosed();
} }
} }

private void enqueueAllCurrentPackets() throws IOException {
int idx = streamers.indexOf(getCurrentStreamer());
for(int i = 0; i < streamers.size(); i++) {
setCurrentStreamer(i);
if (currentPacket != null) {
enqueueCurrentPacket();
}
}
setCurrentStreamer(idx);
}
} }

0 comments on commit 0a93712

Please sign in to comment.