Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
Expand Down Expand Up @@ -103,6 +104,9 @@ public final class RecordAccumulator {
/** The chunked allocation manager factory, stored for explicit native memory release. */
private final ChunkedAllocationManager.ChunkedFactory chunkedFactory;

/** Guard to make {@link #destroyResources()} idempotent. */
private final AtomicBoolean resourcesDestroyed = new AtomicBoolean(false);

/** The pool of lazily created arrow {@link ArrowWriter}s for arrow log write batch. */
private final ArrowWriterPool arrowWriterPool;

Expand Down Expand Up @@ -960,16 +964,29 @@ public ReadyCheckResult(
}
}

/** Close this accumulator and force all the record buffers to be drained. */
/** Close this accumulator to reject new appends. */
public void close() {
closed = true;
}

/**
* Destroy all resources held by this accumulator including the Arrow writer pool and buffer
* allocator.
*
* <p>This must only be called after the sender thread has fully exited and no more drain
* operations will occur. Otherwise, draining batches may attempt to recycle Arrow writers using
* an already-closed allocator, causing "Accounted size went negative" errors.
*
* <p>This method is idempotent: subsequent calls after the first are no-ops.
*/
@VisibleForTesting
public void destroyResources() {
if (!resourcesDestroyed.compareAndSet(false, true)) {
return;
}
writerBufferPool.close();
arrowWriterPool.close();
// Release all the memory segments.
bufferAllocator.releaseBytes(bufferAllocator.getAllocatedMemory());
bufferAllocator.close();
// Release native memory held by the chunked allocation manager factory.
chunkedFactory.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public void run() {
}
}

destroyResources();

// TODO if force close failed, add logic to abort incomplete batches.
LOG.debug("Shutdown of Fluss write sender I/O thread has completed.");
}
Expand Down Expand Up @@ -643,4 +645,18 @@ public void initiateClose() {
accumulator.close();
running = false;
}

Comment thread
loserwang1024 marked this conversation as resolved.
/**
* Releases all resources held by the accumulator (Arrow writer pool, buffer allocator, etc.).
*
* <p>In production, this is called at the end of {@link #run()} after both the main loop and
* the shutdown drain loop have completed, so no further drain operations will touch the Arrow
* allocator.
*
* <p>This method is idempotent: repeated calls are harmless because the underlying {@link
* RecordAccumulator#destroyResources()} tolerates multiple invocations.
*/
void destroyResources() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @VisibleForTesting

accumulator.destroyResources();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.fluss.server.tablet.TestTabletServerGateway;
import org.apache.fluss.utils.clock.SystemClock;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -101,6 +102,11 @@ public void setup() {
sender = setupWithIdempotenceState();
}

@AfterEach
public void teardown() throws Exception {
sender.destroyResources();
}

@Test
void testSimple() throws Exception {
long offset = 0;
Expand Down Expand Up @@ -171,6 +177,39 @@ void testInitWriterIdRequest() throws Exception {
assertThat(idempotenceManager.writerId()).isEqualTo(0L);
}

/**
* Verifies the two-phase close prevents the shutdown race condition. Previously,
* initiateClose() destroyed the Arrow BufferAllocator while the sender's drain loop was still
* running, causing "Accounted size went negative". With the fix, initiateClose() only rejects
* new appends; resource destruction is deferred to destroyResources().
*/
@Test
void testCloseSenderBeforeAccumulatorDrain() throws Exception {
// Append a record so there is an undrained batch.
appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> {});

// Sender begins to prepare write data.
Cluster clusterSnapshot = metadataUpdater.getCluster();
RecordAccumulator.ReadyCheckResult readyCheckResult = accumulator.ready(clusterSnapshot);
assertThat(readyCheckResult.readyNodes).isNotEmpty();

// Close the accumulator before it drains — this is the race window.
sender.initiateClose();

// Drain after the sender is closed. Before the fix this threw "Accounted size went
// negative" because the Arrow
// allocator was closed while the drain was still running.
Map<Integer, List<ReadyWriteBatch>> drained =
accumulator.drain(clusterSnapshot, readyCheckResult.readyNodes, MAX_REQUEST_SIZE);
assertThat(drained).isNotEmpty();
sender.runOnce();
assertThat(accumulator.hasUnDrained()).isFalse();

// even double destroy is still safe.
sender.destroyResources();
sender.destroyResources();
}

@Test
void testCanRetryWithoutIdempotence() throws Exception {
// do a successful retry.
Expand Down