Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ antlr4-runtime/4.13.1//antlr4-runtime-4.13.1.jar
aopalliance-repackaged/3.0.6//aopalliance-repackaged-3.0.6.jar
arpack/3.1.1//arpack-3.1.1.jar
arpack_combined_all/0.1//arpack_combined_all-0.1.jar
arrow-compression/18.3.0//arrow-compression-18.3.0.jar
arrow-format/18.3.0//arrow-format-18.3.0.jar
arrow-memory-core/18.3.0//arrow-memory-core-18.3.0.jar
arrow-memory-netty-buffer-patch/18.3.0//arrow-memory-netty-buffer-patch-18.3.0.jar
arrow-memory-netty/18.3.0//arrow-memory-netty-18.3.0.jar
arrow-vector/18.3.0//arrow-vector-18.3.0.jar
arrow-compression/19.0.0//arrow-compression-19.0.0.jar
arrow-format/19.0.0//arrow-format-19.0.0.jar
arrow-memory-core/19.0.0//arrow-memory-core-19.0.0.jar
arrow-memory-netty-buffer-patch/19.0.0//arrow-memory-netty-buffer-patch-19.0.0.jar
arrow-memory-netty/19.0.0//arrow-memory-netty-19.0.0.jar
arrow-vector/19.0.0//arrow-vector-19.0.0.jar
audience-annotations/0.12.0//audience-annotations-0.12.0.jar
avro-ipc/1.12.1//avro-ipc-1.12.1.jar
avro-mapred/1.12.1//avro-mapred-1.12.1.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@
./python/pyspark/sql/pandas/utils.py, ./python/packaging/classic/setup.py,
./python/packaging/client/setup.py, and ./python/packaging/connect/setup.py too.
-->
<arrow.version>18.3.0</arrow.version>
<arrow.version>19.0.0</arrow.version>
<ammonite.version>3.0.8</ammonite.version>
<jjwt.version>0.13.0</jjwt.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,16 @@ private[sql] class SparkResult[T](
throw new IllegalStateException(
s"Expected $expectedNumRows rows in arrow batch but got $numRecordsInBatch.")
}
val messagesInBatch = messages.result()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After making the modification here, all tests have passed. However, I haven't yet examined the specific change details in version 19.0.0 that necessitated this modification, so I'm temporarily unable to confirm whether it is truly issue-free for version 18.3.0 or if problems simply haven't been uncovered yet. Let's hold off for a while. If this turns out to be a lingering issue, I will submit a pr to fix it first.

Personally, I'm more inclined to think that this is a lingering issue.

Copy link
Copy Markdown
Contributor Author

@LuciferYang LuciferYang Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1. Problem Statement

After upgrading arrow-java from 18.3.0 to 19.0.0, Spark Connect client tests(e.g. CatalogSuite, DataFrameTableValuedFunctionsSuite) fail in afterAll() when allocator.close() is called:

java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding buffers allocated (12).

The stack trace points to:

SparkResult.processResponses()
  → MessageIterator.next()
    → MessageSerializer.deserializeRecordBatch()
      → ArrowBuf.slice()

2. Root Cause

2.1 Pre-existing bug in SparkResult.processResponses()

In SparkResult.processResponses(), when a deserialized Arrow batch contains 0 rows (numRecordsInBatch == 0), the ArrowMessage objects are neither stored in resultMap nor closed:

// Before fix
if (numRecordsInBatch > 0) {
  numRecords += numRecordsInBatch
  resultMap.put(nextResultIndex, (reader.bytesRead, messages.result()))
  nextResultIndex += 1
  // ...
}
// When numRecordsInBatch == 0: messages.result() is silently dropped — no close()

SparkResultCloseable.close() only releases messages stored in resultMap. Empty-batch messages fall through and their underlying Arrow buffers are never released.

2.2 Arrow GH-343 made it observable

Arrow-Java GH-343 fixed offset buffer IPC serialization for empty vectors (valueCount == 0). This fix, included in v19.0.0, changed the IPC body size of empty batches from 0 bytes to a non-zero value, which turned the previously-silent Spark bug into a visible allocator failure.

The relevant commits between v18.3.0 and v19.0.0:

Commit Scope
0f8a0808f (PR #967) Fix ListVector / LargeListVector offset buffer when valueCount == 0
77df3ecb2 (PR #989) Fix BaseVariableWidthVector / BaseLargeVariableWidthVector offset buffer when valueCount == 0

What changed: when valueCount == 0, setReaderAndWriterIndex() previously set offsetBuffer.writerIndex(0), making readableBytes() == 0 and writing 0 bytes to the IPC stream. The Arrow spec requires that offset buffers always contain at least one entry [0], so GH-343 changed this to offsetBuffer.writerIndex(OFFSET_WIDTH), making readableBytes() == 4.

Version setReaderAndWriterIndex() when valueCount == 0 IPC body size
v18.3.0 offsetBuffer.writerIndex(0)readableBytes() = 0 0 bytes
v19.0.0 offsetBuffer.writerIndex(OFFSET_WIDTH = 4)readableBytes() = 4 > 0 bytes

3. Detailed Causal Chain

Step v18.3.0 v19.0.0
1. Server serializes empty batch (valueCount=0) Offset buffer writes 0 bytes → IPC body = 0 bytes Offset buffer writes 4+ bytes → IPC body > 0 bytes
2. Client calls readMessageBody(in, bodyLength, allocator) allocator.buffer(0) → returns singleton getEmpty(), backed by EmptyReferenceManager (not tracked by allocator) allocator.buffer(bodyLength > 0) → allocates real ArrowBuf (tracked by allocator)
3. deserializeRecordBatch calls body.slice() per field buffer Slices share EmptyReferenceManager; retain()/release() are no-ops Slices share real BufferLedger; retain() increments refcount
4. ArrowRecordBatch constructor calls retain() per slice No-op Refcount increases
5. body.getReferenceManager().release() No-op Refcount decreases by 1, but slices still hold references
6. ArrowRecordBatch.close() never called (Spark bug) No impact — empty buffers are untracked Buffer leak — refcount > 0, tracked buffers remain
7. allocator.close() Succeeds — no outstanding tracked buffers Throws IllegalStateException

Key mechanism: BaseAllocator.buffer(0) returns untracked empty buffer

// BaseAllocator.java
public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) {
    if (initialRequestSize == 0) {
        return getEmpty();  // singleton, EmptyReferenceManager — not tracked
    }
    // ... allocate real buffer — tracked by allocator
}

In v18.3.0, empty-batch IPC body is 0 bytes → allocator.buffer(0)getEmpty() (untracked). All downstream slice(), retain(), and release() calls are no-ops. The missing close() in SparkResult is harmless.

In v19.0.0, empty-batch IPC body is > 0 bytes → allocator.buffer(n) → real tracked buffer. The missing close() becomes a real off-heap memory leak.

4. Does v18.3.0 have an actual memory leak?

No. Under v18.3.0, empty batches cause no memory leak at all:

  • Off-heap memory: Zero leak. allocator.buffer(0) returns the pre-allocated singleton empty buffer. No additional off-heap memory is allocated for empty batches, so there is nothing to leak.
  • Java heap objects: The orphaned ArrowRecordBatch / ArrowBuf wrapper objects hold no strong references after processResponses() returns, and are collected by normal GC.
  • Allocator tracking: EmptyReferenceManager is a no-op singleton. The allocator never registers these buffers, so allocator.close() sees no outstanding allocations.

The bug in SparkResult is logically present in v18.3.0, but it is structurally impossible to cause any resource leak because the entire empty buffer path — from allocation through slicing to reference counting — operates on untracked no-op objects.

Under v19.0.0 without the fix, the situation is different:

  • allocator.buffer(bodyLength > 0) allocates real off-heap memory.
  • ArrowRecordBatch is never close()-d, so the BufferLedger refcount never reaches 0.
  • ArrowBuf has no finalizer or Cleaner, so GC of the Java wrapper does not decrement the off-heap refcount.
  • The off-heap memory is permanently leaked until allocator.close() detects and reports it.

5. The Fix

When numRecordsInBatch == 0, the deserialized ArrowMessage objects are explicitly closed. This calls ArrowRecordBatch.close(), which invokes release() on each sliced buffer, allowing the BufferLedger refcount to reach 0 and the off-heap memory to be freed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After conducting research, I believe this issue will not have a material impact on version 18.3.0, so I personally prefer to address it with a fix in the current pr.

// Skip the entire result if it is empty.
if (numRecordsInBatch > 0) {
numRecords += numRecordsInBatch
resultMap.put(nextResultIndex, (reader.bytesRead, messages.result()))
resultMap.put(nextResultIndex, (reader.bytesRead, messagesInBatch))
nextResultIndex += 1
nonEmpty |= true
stop |= stopOnFirstNonEmptyResponse
} else {
messagesInBatch.foreach(_.close())
}
}
}
Expand Down