diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 9e08e1a95d..80f9b75d17 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -59,6 +59,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; @@ -103,6 +104,9 @@ public final class RecordAccumulator { /** The chunked allocation manager factory, stored for explicit native memory release. */ private final ChunkedAllocationManager.ChunkedFactory chunkedFactory; + /** Guard to make {@link #destroyResources()} idempotent. */ + private final AtomicBoolean resourcesDestroyed = new AtomicBoolean(false); + /** The pool of lazily created arrow {@link ArrowWriter}s for arrow log write batch. */ private final ArrowWriterPool arrowWriterPool; @@ -960,16 +964,29 @@ public ReadyCheckResult( } } - /** Close this accumulator and force all the record buffers to be drained. */ + /** Close this accumulator to reject new appends. */ public void close() { closed = true; + } + /** + * Destroy all resources held by this accumulator including the Arrow writer pool and buffer + * allocator. + * + *

This must only be called after the sender thread has fully exited and no more drain + * operations will occur. Otherwise, draining batches may attempt to recycle Arrow writers using + * an already-closed allocator, causing "Accounted size went negative" errors. + * + *

This method is idempotent: subsequent calls after the first are no-ops. + */ + @VisibleForTesting + public void destroyResources() { + if (!resourcesDestroyed.compareAndSet(false, true)) { + return; + } writerBufferPool.close(); arrowWriterPool.close(); - // Release all the memory segments. - bufferAllocator.releaseBytes(bufferAllocator.getAllocatedMemory()); bufferAllocator.close(); - // Release native memory held by the chunked allocation manager factory. chunkedFactory.close(); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java index 5fc5ba2746..7b456d4bd9 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java @@ -167,6 +167,8 @@ public void run() { } } + destroyResources(); + // TODO if force close failed, add logic to abort incomplete batches. LOG.debug("Shutdown of Fluss write sender I/O thread has completed."); } @@ -643,4 +645,18 @@ public void initiateClose() { accumulator.close(); running = false; } + + /** + * Releases all resources held by the accumulator (Arrow writer pool, buffer allocator, etc.). + * + *

In production, this is called at the end of {@link #run()} after both the main loop and + * the shutdown drain loop have completed, so no further drain operations will touch the Arrow + * allocator. + * + *

This method is idempotent: repeated calls are harmless because the underlying {@link + * RecordAccumulator#destroyResources()} tolerates multiple invocations. + */ + void destroyResources() { + accumulator.destroyResources(); + } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java index cd59e8bdfc..2c4d30e96a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java @@ -43,6 +43,7 @@ import org.apache.fluss.server.tablet.TestTabletServerGateway; import org.apache.fluss.utils.clock.SystemClock; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -101,6 +102,11 @@ public void setup() { sender = setupWithIdempotenceState(); } + @AfterEach + public void teardown() throws Exception { + sender.destroyResources(); + } + @Test void testSimple() throws Exception { long offset = 0; @@ -171,6 +177,39 @@ void testInitWriterIdRequest() throws Exception { assertThat(idempotenceManager.writerId()).isEqualTo(0L); } + /** + * Verifies the two-phase close prevents the shutdown race condition. Previously, + * initiateClose() destroyed the Arrow BufferAllocator while the sender's drain loop was still + * running, causing "Accounted size went negative". With the fix, initiateClose() only rejects + * new appends; resource destruction is deferred to destroyResources(). + */ + @Test + void testCloseSenderBeforeAccumulatorDrain() throws Exception { + // Append a record so there is an undrained batch. + appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> {}); + + // Sender begins to prepare write data. + Cluster clusterSnapshot = metadataUpdater.getCluster(); + RecordAccumulator.ReadyCheckResult readyCheckResult = accumulator.ready(clusterSnapshot); + assertThat(readyCheckResult.readyNodes).isNotEmpty(); + + // Close the accumulator before it drains — this is the race window. + sender.initiateClose(); + + // Drain after the sender is closed. Before the fix this threw "Accounted size went + // negative" because the Arrow + // allocator was closed while the drain was still running. + Map> drained = + accumulator.drain(clusterSnapshot, readyCheckResult.readyNodes, MAX_REQUEST_SIZE); + assertThat(drained).isNotEmpty(); + sender.runOnce(); + assertThat(accumulator.hasUnDrained()).isFalse(); + + // even double destroy is still safe. + sender.destroyResources(); + sender.destroyResources(); + } + @Test void testCanRetryWithoutIdempotence() throws Exception { // do a successful retry.