Skip to content

[client] release the resource of record accumulator only when sender thread is stopped.#3102

Open
loserwang1024 wants to merge 2 commits intoapache:mainfrom
loserwang1024:fix-sender-close
Open

[client] release the resource of record accumulator only when sender thread is stopped.#3102
loserwang1024 wants to merge 2 commits intoapache:mainfrom
loserwang1024:fix-sender-close

Conversation

@loserwang1024
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #3101

Brief change log

Tests

API and Format

Documentation

Copy link
Copy Markdown

@utafrali utafrali left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core fix is correct: splitting close() (reject appends) from destroyResources() (free Arrow resources) and calling the latter only after all drain loops have finished cleanly resolves the 'Accounted size went negative' race. The main issues are an unused public getter that leaks internal state, missing lifecycle documentation on Sender.destroyResources() that hides a latent double-close risk, and a regression test with no assertion.

Comment thread fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java Outdated
Comment thread fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java Outdated
@loserwang1024 loserwang1024 force-pushed the fix-sender-close branch 2 times, most recently from b1738fa to 107c3b4 Compare April 17, 2026 07:31
…thread is stopped.

# Conflicts:
#	fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@loserwang1024
Copy link
Copy Markdown
Contributor Author

@platinumhamburg ,CC

* <p>This method is idempotent: subsequent calls after the first are no-ops.
*/
public void destroyResources() {
if (resourcesDestroyed) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile guarantees visibility but does not guarantee atomicity of the check + set. Two threads can both read false, pass the guard, and both enter the destruction path. Arrow's BufferAllocator.close() is not idempotent and will throw IllegalStateException on a second invocation.

The Javadoc claims "This method is idempotent: subsequent calls after the first are no-ops", but the implementation cannot honor this guarantee under concurrent access.

Suggested fix:

private final AtomicBoolean resourcesDestroyed = new AtomicBoolean(false);

public void destroyResources() {
    if (!resourcesDestroyed.compareAndSet(false, true)) {
        return;
    }
    writerBufferPool.close();
    arrowWriterPool.close();
    bufferAllocator.close();
    chunkedFactory.close();
}

Copy link
Copy Markdown
Contributor

@platinumhamburg platinumhamburg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @loserwang1024, I left some comments.

* <p>This method is idempotent: repeated calls are harmless because the underlying {@link
* RecordAccumulator#destroyResources()} tolerates multiple invocations.
*/
void destroyResources() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing @VisibleForTesting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

java.lang.IllegalArgumentException: Accounted size went negative.

3 participants