Skip to content

Commit

Permalink
[SPARK-45814][CONNECT][SQL] Make ArrowConverters.createEmptyArrowBatc…
Browse files Browse the repository at this point in the history
…h call close() to avoid memory leak

### What changes were proposed in this pull request?

Make `ArrowBatchIterator` implement `AutoCloseable` and `ArrowConverters.createEmptyArrowBatch()` call close() to avoid memory leak.

### Why are the changes needed?

`ArrowConverters.createEmptyArrowBatch` don't call `super.hasNext`, if `TaskContext.get` returns `None`, then memory allocated in `ArrowBatchIterator` is leaked.

In spark connect, `createEmptyArrowBatch` is called in [SparkConnectPlanner](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2558) and [SparkConnectPlanExecution](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L224), which cause a long running driver consume all off-heap memory specified by `-XX:MaxDirectMemorySize`.

This is the exception stack:
```
org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
	at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
	at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
	at org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308)
	at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273)
	at org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44)
	at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
	at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
	at scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103)
	at org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.<init>(ArrowConverters.scala:93)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.<init>(ArrowConverters.scala:138)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:231)
	at org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:176)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:178)
	at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:175)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:188)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228)
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 1069547799, max: 1073741824)
	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:845)
	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:774)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:721)
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:696)
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
	at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:137)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181)
	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
	... 37 more
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43691 from xieshuaihu/spark-45814.

Authored-by: xieshuaihu <xieshuaihu@agora.io>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit c128f81)
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
xieshuaihu authored and LuciferYang committed Nov 9, 2023
1 parent 85fbb3a commit 35d0061
Showing 1 changed file with 18 additions and 7 deletions.
Expand Up @@ -80,7 +80,7 @@ private[sql] object ArrowConverters extends Logging {
maxRecordsPerBatch: Long,
timeZoneId: String,
errorOnDuplicatedFieldNames: Boolean,
context: TaskContext) extends Iterator[Array[Byte]] {
context: TaskContext) extends Iterator[Array[Byte]] with AutoCloseable {

protected val arrowSchema =
ArrowUtils.toArrowSchema(schema, timeZoneId, errorOnDuplicatedFieldNames)
Expand All @@ -93,13 +93,11 @@ private[sql] object ArrowConverters extends Logging {
protected val arrowWriter = ArrowWriter.create(root)

Option(context).foreach {_.addTaskCompletionListener[Unit] { _ =>
root.close()
allocator.close()
close()
}}

override def hasNext: Boolean = rowIter.hasNext || {
root.close()
allocator.close()
close()
false
}

Expand All @@ -124,6 +122,11 @@ private[sql] object ArrowConverters extends Logging {

out.toByteArray
}

override def close(): Unit = {
root.close()
allocator.close()
}
}

private[sql] class ArrowBatchWithSchemaIterator(
Expand Down Expand Up @@ -226,11 +229,19 @@ private[sql] object ArrowConverters extends Logging {
schema: StructType,
timeZoneId: String,
errorOnDuplicatedFieldNames: Boolean): Array[Byte] = {
new ArrowBatchWithSchemaIterator(
val batches = new ArrowBatchWithSchemaIterator(
Iterator.empty, schema, 0L, 0L,
timeZoneId, errorOnDuplicatedFieldNames, TaskContext.get) {
override def hasNext: Boolean = true
}.next()
}
Utils.tryWithSafeFinally {
batches.next()
} {
// If taskContext is null, `batches.close()` should be called to avoid memory leak.
if (TaskContext.get() == null) {
batches.close()
}
}
}

/**
Expand Down

0 comments on commit 35d0061

Please sign in to comment.