Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45814][CONNECT][SQL]Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak #43691

Closed
wants to merge 4 commits into from

Conversation

xieshuaihu
Copy link

@xieshuaihu xieshuaihu commented Nov 7, 2023

What changes were proposed in this pull request?

Make ArrowBatchIterator implement AutoCloseable and ArrowConverters.createEmptyArrowBatch() call close() to avoid memory leak.

Why are the changes needed?

ArrowConverters.createEmptyArrowBatch don't call super.hasNext, if TaskContext.get returns None, then memory allocated in ArrowBatchIterator is leaked.

In spark connect, createEmptyArrowBatch is called in SparkConnectPlanner and SparkConnectPlanExecution, which cause a long running driver consume all off-heap memory specified by -XX:MaxDirectMemorySize.

This is the exception stack:

org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
	at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
	at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
	at org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273)
	at org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44)
	at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
	at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
	at scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103)
	at org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.<init>(ArrowConverters.scala:93)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.<init>(ArrowConverters.scala:138)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:231)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:176)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:178)
	at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:175)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:188)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228)
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 1069547799, max: 1073741824)
	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:845)
	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:774)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:721)
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:696)
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
	at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:137)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
	... 37 more

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manually test

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Nov 7, 2023
@xieshuaihu xieshuaihu changed the title [SPARK-45814][CONNECT][CORE]Make ArrowConverters.createEmptyArrowBatch call hasNext to avoid memory leak [SPARK-45814][CONNECT][SQL]Make ArrowConverters.createEmptyArrowBatch call hasNext to avoid memory leak Nov 7, 2023
@xieshuaihu
Copy link
Author

cc @HyukjinKwon @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

cc @sunchao , too

@LuciferYang
Copy link
Contributor

hmm... a little strange to call super.hasNext in next. How about make ArrowBatchIterator with AutoCloseable and closing ArrowBatchWithSchemaIterator in the finally block when TaskContext.get is null?

@xieshuaihu
Copy link
Author

@LuciferYang This pr is updated, please review again

@cfmcgrady
Copy link
Contributor

the PR title/description should be updated

Iterator.empty, schema, 0L, 0L,
timeZoneId, errorOnDuplicatedFieldNames, TaskContext.get()) {
override def hasNext: Boolean = true
}.next()
}
val emptyBatch = batches.next()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utils.tryWithSafeFinally {
  batches.next()
} {
  if (TaskContext.get() == null) { // If taskContext is not null, it can be successfully closed, right?
    batches.close()
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's right

@xieshuaihu xieshuaihu changed the title [SPARK-45814][CONNECT][SQL]Make ArrowConverters.createEmptyArrowBatch call hasNext to avoid memory leak [SPARK-45814][CONNECT][SQL]Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak Nov 8, 2023
Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM if test pass

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - good catch!

@xieshuaihu
Copy link
Author

It seems like that this test failure is unrelated to this pr

@LuciferYang
Copy link
Contributor

Could re-trigger the failed one? make all tasks green is safer @xieshuaihu

@xieshuaihu
Copy link
Author

@LuciferYang all ut passed

LuciferYang pushed a commit that referenced this pull request Nov 9, 2023
…h call close() to avoid memory leak

### What changes were proposed in this pull request?

Make `ArrowBatchIterator` implement `AutoCloseable` and `ArrowConverters.createEmptyArrowBatch()` call close() to avoid memory leak.

### Why are the changes needed?

`ArrowConverters.createEmptyArrowBatch` don't call `super.hasNext`, if `TaskContext.get` returns `None`, then memory allocated in `ArrowBatchIterator` is leaked.

In spark connect, `createEmptyArrowBatch` is called in [SparkConnectPlanner](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2558) and [SparkConnectPlanExecution](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L224), which cause a long running driver consume all off-heap memory specified by `-XX:MaxDirectMemorySize`.

This is the exception stack:
```
org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
	at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
	at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
	at org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273)
	at org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44)
	at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
	at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
	at scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103)
	at org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.<init>(ArrowConverters.scala:93)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.<init>(ArrowConverters.scala:138)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:231)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:176)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:178)
	at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:175)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:188)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228)
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 1069547799, max: 1073741824)
	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:845)
	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:774)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:721)
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:696)
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
	at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:137)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
	... 37 more
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43691 from xieshuaihu/spark-45814.

Authored-by: xieshuaihu <xieshuaihu@agora.io>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit c128f81)
Signed-off-by: yangjie01 <yangjie01@baidu.com>
@LuciferYang
Copy link
Contributor

Merged into master and branch-3.5. Thanks @xieshuaihu @hvanhovell @dongjoon-hyun @cfmcgrady

This patch conflicts with branch-3.4. Could you submit a separate pr for branch-3.4? @xieshuaihu

@xieshuaihu
Copy link
Author

I will submit a new pr @LuciferYang

@LuciferYang
Copy link
Contributor

Thanks @xieshuaihu ~

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

} {
// If taskContext is null, `batches.close()` should be called to avoid memory leak.
if (TaskContext.get() == null) {
batches.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually add a small test for this? This memory leaking should be easy to test if the close is actually being invoked with createEmptyArrowBatch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants