diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 2b9129598903f..93c3162f352ac 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -350,3 +350,6 @@ HDFS-8702. Erasure coding: update BlockManager.blockHasEnoughRacks(..) logic for striped block. (Kai Sasaki via jing9) + + HDFS-8734. Erasure Coding: fix one cell need two packets. (Walter Su via + jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f41044b299ef7..9e201ad5d921a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -419,7 +419,7 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); - currentPacket.incNumChunks(1); + currentPacket.incNumChunks(); streamer.incBytesCurBlock(len); // If packet is full, enqueue it for transmission diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index 2698de39c268f..a26e35e7c586b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -259,10 +259,10 @@ synchronized int getNumChunks() { } /** - * increase the number of chunks by n + * increase the number of chunks by one */ - synchronized void incNumChunks(int n) { - numChunks += n; + synchronized void incNumChunks() { + numChunks++; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index e6de714abafc4..a4bb49de3f62b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -254,6 +254,7 @@ private void flipDataBuffers() { private final CellBuffers cellBuffers; private final RawErasureEncoder encoder; private final List streamers; + private final DFSPacket[] currentPackets; // current Packet of each streamer /** Size of each striping cell, must be a multiple of bytesPerChecksum */ private final int cellSize; @@ -301,6 +302,7 @@ ExtendedBlock getBlock() { s.add(streamer); } streamers = Collections.unmodifiableList(s); + currentPackets = new DFSPacket[streamers.size()]; setCurrentStreamer(0); } @@ -316,9 +318,18 @@ private synchronized StripedDataStreamer getCurrentStreamer() { return (StripedDataStreamer)streamer; } - private synchronized StripedDataStreamer setCurrentStreamer(int i) { - streamer = streamers.get(i); + private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) + throws IOException { + // backup currentPacket for current streamer + int oldIdx = streamers.indexOf(streamer); + if (oldIdx >= 0) { + currentPackets[oldIdx] = currentPacket; + } + + streamer = streamers.get(newIdx); + currentPacket = currentPackets[newIdx]; adjustChunkBoundary(); + return getCurrentStreamer(); } @@ -366,41 +377,6 @@ private void handleStreamerFailure(String err, currentPacket = null; } - /** - * Generate packets from a given buffer. This is only used for streamers - * writing parity blocks. - * - * @param byteBuffer the given buffer to generate packets - * @param checksumBuf the checksum buffer - * @return packets generated - * @throws IOException - */ - private List generatePackets( - ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{ - List packets = new ArrayList<>(); - assert byteBuffer.hasArray(); - getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0, - byteBuffer.remaining(), checksumBuf, 0); - int ckOff = 0; - while (byteBuffer.remaining() > 0) { - DFSPacket p = createPacket(packetSize, chunksPerPacket, - getCurrentStreamer().getBytesCurBlock(), - getCurrentStreamer().getAndIncCurrentSeqno(), false); - int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum; - int toWrite = byteBuffer.remaining() > maxBytesToPacket ? - maxBytesToPacket: byteBuffer.remaining(); - int chunks = (toWrite - 1) / bytesPerChecksum + 1; - int ckLen = chunks * getChecksumSize(); - p.writeChecksum(checksumBuf, ckOff, ckLen); - ckOff += ckLen; - p.writeData(byteBuffer, toWrite); - getCurrentStreamer().incBytesCurBlock(toWrite); - p.incNumChunks(chunks); - packets.add(p); - } - return packets; - } - @Override protected synchronized void writeChunk(byte[] bytes, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { @@ -413,11 +389,6 @@ protected synchronized void writeChunk(byte[] bytes, int offset, int len, if (!current.isFailed()) { try { super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); - - // cell is full and current packet has not been enqueued, - if (cellFull && currentPacket != null) { - enqueueCurrentPacketFull(); - } } catch(Exception e) { handleStreamerFailure("offset=" + offset + ", length=" + len, e); } @@ -581,10 +552,14 @@ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf final long oldBytes = current.getBytesCurBlock(); if (!current.isFailed()) { try { - for (DFSPacket p : generatePackets(buffer, checksumBuf)) { - getCurrentStreamer().waitAndQueuePacket(p); + DataChecksum sum = getDataChecksum(); + sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0); + for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { + int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); + int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); + super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset, + getChecksumSize()); } - endBlock(); } catch(Exception e) { handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); } @@ -628,16 +603,13 @@ protected synchronized void closeImpl() throws IOException { // flush from all upper layers try { flushBuffer(); - if (currentPacket != null) { - enqueueCurrentPacket(); - } + // if the last stripe is incomplete, generate and write parity cells + writeParityCellsForLastStripe(); + enqueueAllCurrentPackets(); } catch(Exception e) { handleStreamerFailure("closeImpl", e); } - // if the last stripe is incomplete, generate and write parity cells - writeParityCellsForLastStripe(); - for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); if (!s.isFailed()) { @@ -667,4 +639,15 @@ protected synchronized void closeImpl() throws IOException { setClosed(); } } + + private void enqueueAllCurrentPackets() throws IOException { + int idx = streamers.indexOf(getCurrentStreamer()); + for(int i = 0; i < streamers.size(); i++) { + setCurrentStreamer(i); + if (currentPacket != null) { + enqueueCurrentPacket(); + } + } + setCurrentStreamer(idx); + } }