Skip to content

Commit

Permalink
[SPARK-24334] Fix race condition in ArrowPythonRunner causes unclean …
Browse files Browse the repository at this point in the history
…shutdown of Arrow memory allocator

## What changes were proposed in this pull request?

There is a race condition of closing Arrow VectorSchemaRoot and Allocator in the writer thread of ArrowPythonRunner.

The race results in memory leak exception when closing the allocator. This patch removes the closing routine from the TaskCompletionListener and make the writer thread responsible for cleaning up the Arrow memory.

This issue be reproduced by this test:

```
def test_memory_leak(self):
    from pyspark.sql.functions import pandas_udf, col, PandasUDFType, array, lit, explode

   # Have all data in a single executor thread so it can trigger the race condition easier
    with self.sql_conf({'spark.sql.shuffle.partitions': 1}):
        df = self.spark.range(0, 1000)
        df = df.withColumn('id', array([lit(i) for i in range(0, 300)])) \
                   .withColumn('id', explode(col('id'))) \
                   .withColumn('v',  array([lit(i) for i in range(0, 1000)]))

       pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
       def foo(pdf):
           xxx
           return pdf

       result = df.groupby('id').apply(foo)

       with QuietTest(self.sc):
           with self.assertRaises(py4j.protocol.Py4JJavaError) as context:
               result.count()
           self.assertTrue('Memory leaked' not in str(context.exception))
```

Note: Because of the race condition, the test case cannot reproduce the issue reliably so it's not added to test cases.

## How was this patch tested?

Because of the race condition, the bug cannot be unit test easily. So far it has only happens on large amount of data. This is currently tested manually.

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21397 from icexelloss/SPARK-24334-arrow-memory-leak.
  • Loading branch information
icexelloss authored and HyukjinKwon committed May 28, 2018
1 parent d440699 commit 672209f
Showing 1 changed file with 18 additions and 11 deletions.
Expand Up @@ -70,19 +70,13 @@ class ArrowPythonRunner(
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"stdout writer for $pythonExec", 0, Long.MaxValue)

val root = VectorSchemaRoot.create(arrowSchema, allocator)
val arrowWriter = ArrowWriter.create(root)

context.addTaskCompletionListener { _ =>
root.close()
allocator.close()
}

val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()

Utils.tryWithSafeFinally {
val arrowWriter = ArrowWriter.create(root)
val writer = new ArrowStreamWriter(root, null, dataOut)
writer.start()

while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()

Expand All @@ -94,8 +88,21 @@ class ArrowPythonRunner(
writer.writeBatch()
arrowWriter.reset()
}
} {
// end writes footer to the output stream and doesn't clean any resources.
// It could throw exception if the output stream is closed, so it should be
// in the try block.
writer.end()
} {
// If we close root and allocator in TaskCompletionListener, there could be a race
// condition where the writer thread keeps writing to the VectorSchemaRoot while
// it's being closed by the TaskCompletion listener.
// Closing root and allocator here is cleaner because root and allocator is owned
// by the writer thread and is only visible to the writer thread.
//
// If the writer thread is interrupted by TaskCompletionListener, it should either
// (1) in the try block, in which case it will get an InterruptedException when
// performing io, and goes into the finally block or (2) in the finally block,
// in which case it will ignore the interruption and close the resources.
root.close()
allocator.close()
}
Expand Down

0 comments on commit 672209f

Please sign in to comment.