Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ public void write(int b) throws IOException {

private void writeChunkIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
LOG.debug("WriteChunk from write(), buffer = {}", currentBuffer);
clientMetrics.getWriteChunksDuringWrite().incr();
writeChunk(currentBuffer);
updateWriteChunkLength();
Expand Down Expand Up @@ -427,8 +426,6 @@ private void allocateNewBufferIfNeeded() throws IOException {
try {
currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
currentBufferRemaining = currentBuffer.remaining();
LOG.debug("Allocated new buffer {}, used = {}, capacity = {}", currentBuffer,
bufferPool.getNumberOfUsedBuffers(), bufferPool.getCapacity());
} catch (InterruptedException e) {
handleInterruptedException(e, false);
}
Expand Down Expand Up @@ -460,10 +457,6 @@ public synchronized void writeOnRetry(long len) throws IOException {
// the BufferPool. For each pending buffers in the BufferPool, we sequentially flush it and wait synchronously.

List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Retrying write length {} on target blockID {}, {} buffers", this, len, blockID,
allocatedBuffers.size());
}
Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize());
int count = 0;
while (len > 0) {
Expand All @@ -474,7 +467,6 @@ public synchronized void writeOnRetry(long len) throws IOException {
writtenDataLength += writeLen;
updateWriteChunkLength();
updatePutBlockLength();
LOG.debug("Write chunk on retry buffer = {}", buffer);
CompletableFuture<PutBlockResult> putBlockFuture;
if (allowPutBlockPiggybacking) {
putBlockFuture = writeChunkAndPutBlock(buffer, false);
Expand Down Expand Up @@ -533,17 +525,14 @@ private CompletableFuture<Void> watchForCommit(long commitIndex) {
throw new FlushRuntimeException(e);
}

LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
final long start = Time.monotonicNowNanos();
return sendWatchForCommit(commitIndex)
.thenAccept(this::checkReply)
.exceptionally(e -> {
throw new FlushRuntimeException(setIoException(e));
})
.whenComplete((r, e) -> {
LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex);
clientMetrics.getHsyncWatchForCommitNs().add(Time.monotonicNowNanos() - start);
});
.whenComplete((r, e) ->
clientMetrics.getHsyncWatchForCommitNs().add(Time.monotonicNowNanos() - start));
}

private void checkReply(XceiverClientReply reply) {
Expand Down Expand Up @@ -613,10 +602,6 @@ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
}
return e;
}, responseExecutor).exceptionally(e -> {
if (LOG.isDebugEnabled()) {
LOG.debug("putBlock failed for blockID {} with exception {}",
blockID, e.getLocalizedMessage());
}
CompletionException ce = new CompletionException(e);
setIoException(ce);
throw ce;
Expand Down Expand Up @@ -666,7 +651,6 @@ private void writeChunk(ChunkBuffer buffer) throws IOException {

private CompletableFuture<PutBlockResult> writeChunkAndPutBlock(ChunkBuffer buffer, boolean close)
throws IOException {
LOG.debug("WriteChunk and Putblock from flush, buffer={}", buffer);
writeChunkCommon(buffer);
return writeChunkToContainer(buffer.duplicate(0, buffer.position()), true, close);
}
Expand Down Expand Up @@ -699,12 +683,10 @@ protected void handleFlush(boolean close) throws IOException {
private void handleFlushInternal(boolean close)
throws IOException, InterruptedException, ExecutionException {
checkOpen();
LOG.debug("Start handleFlushInternal close={}", close);
CompletableFuture<Void> toWaitFor = captureLatencyNs(clientMetrics.getHsyncSynchronizedWorkNs(),
() -> handleFlushInternalSynchronized(close));

if (toWaitFor != null) {
LOG.debug("Waiting for flush");
try {
long startWaiting = Time.monotonicNowNanos();
toWaitFor.get();
Expand All @@ -716,7 +698,6 @@ private void handleFlushInternal(boolean close)
throw ex;
}
}
LOG.debug("Flush done.");
}

if (close) {
Expand Down Expand Up @@ -773,8 +754,6 @@ private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boo
// data since latest flush - we need to send the "EOF" flag
updatePutBlockLength();
putBlockResultFuture = executePutBlock(true, true);
} else {
LOG.debug("Flushing without data");
}
if (putBlockResultFuture != null) {
recordWatchForCommitAsync(putBlockResultFuture);
Expand Down Expand Up @@ -825,11 +804,6 @@ public IOException setIoException(Throwable e) {
if (ioe == null) {
IOException exception = new IOException(EXCEPTION_MSG + e.toString(), e);
ioException.compareAndSet(null, exception);
LOG.debug("Exception: for block ID: " + blockID, e);
} else {
LOG.debug("Previous request had already failed with {} " +
"so subsequent request also encounters " +
"Storage Container Exception {}", ioe, e);
}
return getIoException();
}
Expand Down Expand Up @@ -916,11 +890,6 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(

long flushPos = totalWriteChunkLength;

if (LOG.isDebugEnabled()) {
LOG.debug("Writing chunk {} length {} at offset {}",
chunkInfo.getChunkName(), effectiveChunkSize, offset);
}

final ChunkInfo previous = previousChunkInfo.getAndSet(chunkInfo);
final long expectedOffset = previous == null ? 0
: chunkInfo.getChunkName().equals(previous.getChunkName()) ?
Expand Down Expand Up @@ -952,7 +921,6 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
Objects.requireNonNull(byteBufferList, "byteBufferList == null");

blockData = containerBlockData.build();
LOG.debug("piggyback chunk list {}", blockData);

if (supportIncrementalChunkList) {
// remove any chunks in the containerBlockData list.
Expand Down Expand Up @@ -982,7 +950,6 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
}, responseExecutor).exceptionally(e -> {
String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
" into block " + blockID;
LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
CompletionException ce = new CompletionException(msg, e);
setIoException(ce);
throw ce;
Expand Down Expand Up @@ -1010,13 +977,6 @@ private void handleSuccessfulPutBlock(
.equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block
blockID.set(responseBlockID);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
+ " blockID " + blockID + " bufferPool size " + bufferPool
.getSize());
}
// for standalone protocol, logIndex will always be 0.
updateCommitInfo(asyncReply, byteBufferList);
}
Expand All @@ -1039,12 +999,6 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
// So remove it.
removeLastPartialChunk();
chunk.rewind();
LOG.debug("Adding chunk pos {} limit {} remaining {}." +
"lastChunkBuffer pos {} limit {} remaining {} lastChunkOffset = {}",
chunk.position(), chunk.limit(), chunk.remaining(),
lastChunkBuffer.position(), lastChunkBuffer.limit(),
lastChunkBuffer.remaining(), lastChunkOffset);

// Append the chunk to the last chunk buffer.
// if the resulting size exceeds limit (4MB),
// drop the full chunk and keep the rest.
Expand All @@ -1060,8 +1014,6 @@ private void updateBlockDataForWriteChunk(ChunkBuffer chunk)
appendLastChunkBuffer(chunk, remainingBufferSize,
chunk.remaining() - remainingBufferSize);
}
LOG.debug("after append, lastChunkBuffer={} lastChunkOffset={}",
lastChunkBuffer, lastChunkOffset);

updateBlockDataWithLastChunkBuffer();
}
Expand All @@ -1070,7 +1022,6 @@ private void updateBlockDataWithLastChunkBuffer()
throws OzoneChecksumException {
// create chunk info for lastChunkBuffer
ChunkInfo lastChunkInfo = createChunkInfo(lastChunkOffset);
LOG.debug("lastChunkInfo = {}", lastChunkInfo);
long lastChunkSize = lastChunkInfo.getLen();
addToBlockData(lastChunkInfo);
// Set ByteBuffer limit to capacity, pos to 0. Does not erase data
Expand All @@ -1087,17 +1038,13 @@ private void updateBlockDataWithLastChunkBuffer()

private void appendLastChunkBuffer(ChunkBuffer chunkBuffer, int offset,
int length) {
LOG.debug("copying to last chunk buffer offset={} length={}",
offset, length);
int pos = 0;
int uncopied = length;
for (ByteBuffer bb : chunkBuffer.asByteBufferList()) {
if (pos + bb.remaining() >= offset) {
int copyStart = offset < pos ? 0 : offset - pos;
int copyLen = Math.min(uncopied, bb.remaining());
try {
LOG.debug("put into last chunk buffer start = {} len = {}",
copyStart, copyLen);
int origPos = bb.position();
int origLimit = bb.limit();
bb.position(copyStart).limit(copyStart + copyLen);
Expand Down Expand Up @@ -1167,11 +1114,9 @@ private boolean isFullChunk(ChunkInfo chunkInfo) {
}

private void addToBlockData(ChunkInfo revisedChunkInfo) {
LOG.debug("containerBlockData chunk: {}", containerBlockData);
if (containerBlockData.getChunksCount() > 0) {
ChunkInfo lastChunk = containerBlockData.getChunks(
containerBlockData.getChunksCount() - 1);
LOG.debug("revisedChunkInfo chunk: {}", revisedChunkInfo);
Preconditions.checkState(lastChunk.getOffset() + lastChunk.getLen() ==
revisedChunkInfo.getOffset(),
"lastChunk.getOffset() + lastChunk.getLen() " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A bounded pool implementation that provides {@link ChunkBuffer}s. This pool allows allocating and releasing
Expand All @@ -42,8 +40,6 @@
* wait until a allocated buffer is released.
*/
public class BufferPool {
private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);

private static final BufferPool EMPTY = new BufferPool(0, 0);
private final int bufferSize;
private final int capacity;
Expand Down Expand Up @@ -90,7 +86,6 @@ public ChunkBuffer allocateBuffer(int increment) throws InterruptedException {
"Total created buffer must not exceed capacity.");

while (allocated.size() == capacity) {
LOG.debug("Allocation needs to wait the pool is at capacity (allocated = capacity = {}).", capacity);
notFull.await();
}
// Get a buffer to allocate, preferably from the released ones.
Expand All @@ -99,16 +94,13 @@ public ChunkBuffer allocateBuffer(int increment) throws InterruptedException {
allocated.add(buffer);
currentBuffer = buffer;

LOG.debug("Allocated new buffer {}, number of used buffers {}, capacity {}.",
buffer, allocated.size(), capacity);
return buffer;
} finally {
lock.unlock();
}
}

void releaseBuffer(ChunkBuffer buffer) {
LOG.debug("Releasing buffer {}", buffer);
lock.lock();
try {
Preconditions.assertTrue(removeByIdentity(allocated, buffer), "Releasing unknown buffer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,13 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;

/**
* Test for {@link BufferPool}.
*/
class TestBufferPool {

@BeforeAll
static void init() {
GenericTestUtils.setLogLevel(BufferPool.class, Level.DEBUG);
}

@Test
void testBufferPool() throws Exception {
testBufferPool(BufferPool.empty());
Expand All @@ -63,11 +54,13 @@ void testBufferPoolConcurrently() throws Exception {
assertAllocationBlockedUntilReleased(pool, buffers);
}

private void assertAllocationBlockedUntilReleased(BufferPool pool, Deque<ChunkBuffer> buffers) throws Exception {
private void assertAllocationBlockedUntilReleased(BufferPool pool, Deque<ChunkBuffer> buffers)
throws Exception {
// As the pool is full, allocation will need to wait until a buffer is released.
assertFull(pool);

LogCapturer logCapturer = LogCapturer.captureLogs(BufferPool.class);
assertEquals(buffers.size(), pool.getAllocatedBuffers().size());

AtomicReference<ChunkBuffer> allocated = new AtomicReference<>();
AtomicBoolean allocatorStarted = new AtomicBoolean();
Thread allocator = new Thread(() -> {
Expand All @@ -90,18 +83,16 @@ private void assertAllocationBlockedUntilReleased(BufferPool pool, Deque<ChunkBu
throw new RuntimeException(e);
}
}

releaser.start();
allocator.join();
assertEquals(buffers.size() + 1, pool.getAllocatedBuffers().size());
assertEquals(toRelease, allocated.get());
assertTrue(logCapturer.getOutput().contains("Allocation needs to wait the pool is at capacity"));
Copy link
Copy Markdown
Contributor

@amaliujia amaliujia Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are losing testing coverage if you remove this line.

We might be able to do the following above

assertTrue(pool.allocated.size() == pool.capacity)
allocated.set(pool.allocateBuffer(0));

However I am not completely sure.

To be safe, we can keep this type of verification. Other removals over LOG_DEBUG look ok.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd avoid validation through log messages and prefer to do it via actual state check

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a check if pool is full at the beginning of the function:
assertFull(pool);

}

private void assertAllocationBlocked(BufferPool pool) throws Exception {
// As the pool is full, new allocation will be blocked interruptably if no allocated buffer is released.
assertFull(pool);

LogCapturer logCapturer = LogCapturer.captureLogs(BufferPool.class);
AtomicBoolean allocatorStarted = new AtomicBoolean();
AtomicBoolean interrupted = new AtomicBoolean(false);
Thread allocator = new Thread(() -> {
Expand All @@ -128,7 +119,6 @@ private void assertAllocationBlocked(BufferPool pool) throws Exception {
allocator.interrupt();
allocator.join();
assertTrue(interrupted.get());
assertTrue(logCapturer.getOutput().contains("Allocation needs to wait the pool is at capacity"));
}

private static void testBufferPool(final int capacity, final int bufferSize) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private IOUtils() {
* Close the Closeable objects and <b>ignore</b> any {@link Throwable} or
* null pointers. Must only be used for cleanup in exception handlers.
*
* @param logger the log to record problems to at debug level. Can be
* @param logger the log to record problems to at warn level. Can be
* null.
* @param closeables the objects to close
*/
Expand All @@ -58,7 +58,7 @@ public static void cleanupWithLogger(Logger logger, AutoCloseable... closeables)
c.close();
} catch (Throwable e) {
if (logger != null) {
logger.debug("Exception in closing {}", c, e);
logger.warn("Exception in closing {}", c, e);
}
}
}
Expand Down