Skip to content

Commit

Permalink
Revert "HDDS-1373. KeyOutputStream, close after write request fails a…
Browse files Browse the repository at this point in the history
…fter retries, runs into IllegalArgumentException..(#729)"

This reverts commit df2ae27.
  • Loading branch information
bshashikant committed Apr 17, 2019
1 parent df2ae27 commit 082f1e0
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 694 deletions.
Expand Up @@ -102,10 +102,4 @@ public static ExcludeList getFromProtoBuf(
}); });
return excludeList; return excludeList;
} }

public void clear() {
datanodes.clear();
containerIds.clear();
pipelineIds.clear();
}
} }
Expand Up @@ -295,66 +295,60 @@ private void handleWrite(byte[] b, int off, long len, boolean retry)
throws IOException { throws IOException {
int succeededAllocates = 0; int succeededAllocates = 0;
while (len > 0) { while (len > 0) {
try { if (streamEntries.size() <= currentStreamIndex) {
if (streamEntries.size() <= currentStreamIndex) { Preconditions.checkNotNull(omClient);
Preconditions.checkNotNull(omClient); // allocate a new block, if a exception happens, log an error and
// allocate a new block, if a exception happens, log an error and // throw exception to the caller directly, and the write fails.
// throw exception to the caller directly, and the write fails.
try {
allocateNewBlock(currentStreamIndex);
succeededAllocates += 1;
} catch (IOException ioe) {
LOG.error("Try to allocate more blocks for write failed, already "
+ "allocated " + succeededAllocates
+ " blocks for this write.");
throw ioe;
}
}
// in theory, this condition should never violate due the check above
// still do a sanity check.
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);

// length(len) will be in int range if the call is happening through
// write API of blockOutputStream. Length can be in long range if it comes
// via Exception path.
int writeLen = Math.min((int) len, (int) current.getRemaining());
long currentPos = current.getWrittenDataLength();
try { try {
if (retry) { allocateNewBlock(currentStreamIndex);
current.writeOnRetry(len); succeededAllocates += 1;
} else {
current.write(b, off, writeLen);
offset += writeLen;
}
} catch (IOException ioe) { } catch (IOException ioe) {
// for the current iteration, totalDataWritten - currentPos gives the LOG.error("Try to allocate more blocks for write failed, already "
// amount of data already written to the buffer + "allocated " + succeededAllocates + " blocks for this write.");

throw ioe;
// In the retryPath, the total data to be written will always be equal
// to or less than the max length of the buffer allocated.
// The len specified here is the combined sum of the data length of
// the buffers
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
writeLen = retry ? (int) len : dataWritten;
// In retry path, the data written is already accounted in offset.
if (!retry) {
offset += writeLen;
}
LOG.debug("writeLen {}, total len {}", writeLen, len);
handleException(current, currentStreamIndex, ioe);
} }
if (current.getRemaining() <= 0) { }
// since the current block is already written close the stream. // in theory, this condition should never violate due the check above
handleFlushOrClose(StreamAction.FULL); // still do a sanity check.
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);

// length(len) will be in int range if the call is happening through
// write API of blockOutputStream. Length can be in long range if it comes
// via Exception path.
int writeLen = Math.min((int)len, (int) current.getRemaining());
long currentPos = current.getWrittenDataLength();
try {
if (retry) {
current.writeOnRetry(len);
} else {
current.write(b, off, writeLen);
offset += writeLen;
}
} catch (IOException ioe) {
// for the current iteration, totalDataWritten - currentPos gives the
// amount of data already written to the buffer

// In the retryPath, the total data to be written will always be equal
// to or less than the max length of the buffer allocated.
// The len specified here is the combined sum of the data length of
// the buffers
Preconditions.checkState(!retry || len <= streamBufferMaxSize);
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
writeLen = retry ? (int) len : dataWritten;
// In retry path, the data written is already accounted in offset.
if (!retry) {
offset += writeLen;
} }
len -= writeLen; LOG.debug("writeLen {}, total len {}", writeLen, len);
off += writeLen; handleException(current, currentStreamIndex, ioe);
} catch (Exception e) { }
markStreamClosed(); if (current.getRemaining() <= 0) {
throw e; // since the current block is already written close the stream.
handleFlushOrClose(StreamAction.FULL);
} }
len -= writeLen;
off += writeLen;
} }
} }


Expand All @@ -371,7 +365,7 @@ private void discardPreallocatedBlocks(long containerID,
// pre allocated blocks available. // pre allocated blocks available.


// This will be called only to discard the next subsequent unused blocks // This will be called only to discard the next subsequent unused blocks
// in the streamEntryList. // in the sreamEntryList.
if (streamIndex < streamEntries.size()) { if (streamIndex < streamEntries.size()) {
ListIterator<BlockOutputStreamEntry> streamEntryIterator = ListIterator<BlockOutputStreamEntry> streamEntryIterator =
streamEntries.listIterator(streamIndex); streamEntries.listIterator(streamIndex);
Expand Down Expand Up @@ -404,20 +398,6 @@ private void removeEmptyBlocks() {
} }
} }
} }

private void cleanup() {
if (excludeList != null) {
excludeList.clear();
excludeList = null;
}
if (bufferPool != null) {
bufferPool.clearBufferPool();
}

if (streamEntries != null) {
streamEntries.clear();
}
}
/** /**
* It performs following actions : * It performs following actions :
* a. Updates the committed length at datanode for the current stream in * a. Updates the committed length at datanode for the current stream in
Expand All @@ -438,7 +418,8 @@ private void handleException(BlockOutputStreamEntry streamEntry,
closedContainerException = checkIfContainerIsClosed(t); closedContainerException = checkIfContainerIsClosed(t);
} }
PipelineID pipelineId = null; PipelineID pipelineId = null;
long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength(); long totalSuccessfulFlushedData =
streamEntry.getTotalAckDataLength();
//set the correct length for the current stream //set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData); streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = computeBufferData(); long bufferedDataLen = computeBufferData();
Expand Down Expand Up @@ -469,8 +450,8 @@ private void handleException(BlockOutputStreamEntry streamEntry,
if (closedContainerException) { if (closedContainerException) {
// discard subsequent pre allocated blocks from the streamEntries list // discard subsequent pre allocated blocks from the streamEntries list
// from the closed container // from the closed container
discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null, discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
streamIndex + 1); null, streamIndex + 1);
} else { } else {
// In case there is timeoutException or Watch for commit happening over // In case there is timeoutException or Watch for commit happening over
// majority or the client connection failure to the leader in the // majority or the client connection failure to the leader in the
Expand All @@ -494,11 +475,6 @@ private void handleException(BlockOutputStreamEntry streamEntry,
} }
} }


private void markStreamClosed() {
cleanup();
closed = true;
}

private void handleRetry(IOException exception, long len) throws IOException { private void handleRetry(IOException exception, long len) throws IOException {
RetryPolicy.RetryAction action; RetryPolicy.RetryAction action;
try { try {
Expand Down Expand Up @@ -610,46 +586,40 @@ private void handleFlushOrClose(StreamAction op) throws IOException {
return; return;
} }
while (true) { while (true) {
try { int size = streamEntries.size();
int size = streamEntries.size(); int streamIndex =
int streamIndex = currentStreamIndex >= size ? size - 1 : currentStreamIndex;
currentStreamIndex >= size ? size - 1 : currentStreamIndex; BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
BlockOutputStreamEntry entry = streamEntries.get(streamIndex); if (entry != null) {
if (entry != null) { try {
try { Collection<DatanodeDetails> failedServers = entry.getFailedServers();
Collection<DatanodeDetails> failedServers = // failed servers can be null in case there is no data written in the
entry.getFailedServers(); // stream
// failed servers can be null in case there is no data written in the if (failedServers != null && !failedServers.isEmpty()) {
// stream excludeList.addDatanodes(failedServers);
if (failedServers != null && !failedServers.isEmpty()) { }
excludeList.addDatanodes(failedServers); switch (op) {
} case CLOSE:
switch (op) { entry.close();
case CLOSE: break;
case FULL:
if (entry.getRemaining() == 0) {
entry.close(); entry.close();
break; currentStreamIndex++;
case FULL:
if (entry.getRemaining() == 0) {
entry.close();
currentStreamIndex++;
}
break;
case FLUSH:
entry.flush();
break;
default:
throw new IOException("Invalid Operation");
} }
} catch (IOException ioe) { break;
handleException(entry, streamIndex, ioe); case FLUSH:
continue; entry.flush();
break;
default:
throw new IOException("Invalid Operation");
} }
} catch (IOException ioe) {
handleException(entry, streamIndex, ioe);
continue;
} }
break;
} catch (Exception e) {
markStreamClosed();
throw e;
} }
break;
} }
} }


Expand Down Expand Up @@ -688,7 +658,7 @@ public void close() throws IOException {
} catch (IOException ioe) { } catch (IOException ioe) {
throw ioe; throw ioe;
} finally { } finally {
cleanup(); bufferPool.clearBufferPool();
} }
} }


Expand Down
Expand Up @@ -189,7 +189,6 @@ public void testBufferCaching() throws Exception {
// flush ensures watchForCommit updates the total length acknowledged // flush ensures watchForCommit updates the total length acknowledged
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());


Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit // now close the stream, It will update the ack length after watchForCommit
key.close(); key.close();


Expand All @@ -209,7 +208,7 @@ public void testBufferCaching() throws Exception {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }


Expand Down Expand Up @@ -264,7 +263,6 @@ public void testFlushChunk() throws Exception {
// Now do a flush. This will flush the data and update the flush length and // Now do a flush. This will flush the data and update the flush length and
// the map. // the map.
key.flush(); key.flush();
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
// flush is a sync call, all pending operations will complete // flush is a sync call, all pending operations will complete
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Expand Down Expand Up @@ -304,7 +302,7 @@ public void testFlushChunk() throws Exception {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3, Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }


Expand Down Expand Up @@ -399,7 +397,6 @@ public void testMultiChunkWrite() throws Exception {
metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock)); metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
Assert.assertEquals(totalOpCount + 3, Assert.assertEquals(totalOpCount + 3,
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }


Expand Down Expand Up @@ -457,7 +454,6 @@ public void testMultiChunkWrite2() throws Exception {
blockOutputStream.getCommitIndex2flushedDataMap().size()); blockOutputStream.getCommitIndex2flushedDataMap().size());


Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
key.close(); key.close();
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Expand All @@ -475,7 +471,7 @@ public void testMultiChunkWrite2() throws Exception {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }


Expand Down Expand Up @@ -540,7 +536,6 @@ public void testFullBufferCondition() throws Exception {
// Now do a flush. This will flush the data and update the flush length and // Now do a flush. This will flush the data and update the flush length and
// the map. // the map.
key.flush(); key.flush();
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
Expand Down Expand Up @@ -575,7 +570,7 @@ public void testFullBufferCondition() throws Exception {
.assertEquals(0, blockOutputStream.getBufferPool().computeBufferData()); .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }


Expand Down Expand Up @@ -643,7 +638,6 @@ public void testWriteWithExceedingMaxBufferLimit() throws Exception {
// Now do a flush. This will flush the data and update the flush length and // Now do a flush. This will flush the data and update the flush length and
// the map. // the map.
key.flush(); key.flush();
Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
Assert.assertEquals(pendingWriteChunkCount, Assert.assertEquals(pendingWriteChunkCount,
metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)); metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
Assert.assertEquals(pendingPutBlockCount, Assert.assertEquals(pendingPutBlockCount,
Expand Down Expand Up @@ -679,7 +673,7 @@ public void testWriteWithExceedingMaxBufferLimit() throws Exception {
metrics.getTotalOpCount()); metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength()); Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap()); Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size()); Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
validateData(keyName, data1); validateData(keyName, data1);
} }


Expand Down

0 comments on commit 082f1e0

Please sign in to comment.