Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37088][PYSPARK][SQL] Writer thread must not access input after task completion listener returns #34245

Closed
wants to merge 10 commits into from

Conversation

ankurdave
Copy link
Contributor

@ankurdave ankurdave commented Oct 11, 2021

What changes were proposed in this pull request?

Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (BasePythonRunner.WriterThread). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket.

When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.1

The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash:

  1. The Python writer thread is processing a row backed by off-heap memory.
  2. The task finishes, for example because it has reached a row limit.
  3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately.
  4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing.
  5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor.

This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows:

  1. The Python writer thread is processing a row backed by off-heap memory.
  2. The task finishes, for example because it has reached a row limit.
  3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit.
  4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows.

TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls TaskContextImpl#isCompleted(), which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners.

Why are the changes needed?

Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (spark.sql.columnVector.offheap.enabled true) can cause executors to segfault.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

A previous PR reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, test_pandas_udf_scalar, previously failed 30/1000 times and now always succeeds.

An internal workload previously failed with a segfault about 40% of the time when run with spark.sql.columnVector.offheap.enabled true, and now succeeds 100% of the time.

Footnotes

  1. This guarantee was not historically recognized, leading to similar bugs as far back as 2014 (SPARK-1019). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.)

@ankurdave
Copy link
Contributor Author

cc @HyukjinKwon @ueshin @cloud-fan

@SparkQA
Copy link

SparkQA commented Oct 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48573/

@timarmstrong
Copy link
Contributor

This looks correct from the point of view of avoiding races.

It looks like there were some concerns about deadlocks mentioned in #24699 (comment) . Did you take a look at that? It's definitely easy to have subtle deadlocks with any kind of synchronisation in callbacks.

I took a look myself and it looks OK to me but I am new to the code so may be missing something - TaskContextImpl.markTask{Completed,Failed} both seem to drop the TaskContext lock before invoking the listeners.

@SparkQA
Copy link

SparkQA commented Oct 11, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48573/

@ankurdave
Copy link
Contributor Author

ankurdave commented Oct 11, 2021

Hmm, thanks for pointing this out. TaskContextImpl.markTask{Completed,Failed} actually does hold the TaskContext lock while invoking the listeners. As a result, I think the following sequence of events can produce a deadlock:

  1. The main thread acquires the lock on TaskContextImpl and begins invoking the task completion listeners.
  2. The main thread interrupts the writer thread and waits for it to exit.
  3. The writer thread handles the InterruptedException. The exception handler calls TaskContextImpl#isCompleted(), which again attempts to acquire the lock on TaskContextImpl, resulting in a deadlock.

We can fix this by releasing the TaskContext lock before invoking the listeners. I'll update the PR with that change and try to write a test to repro the deadlock.

cc @viirya @zsxwing

@HyukjinKwon
Copy link
Member

Hm, seems like the test hanging. Would you mind retriggering https://github.com/ankurdave/spark/runs/3862170994?

@SparkQA
Copy link

SparkQA commented Oct 12, 2021

Test build #144095 has finished for PR 34245 at commit 926798c.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ankurdave
Copy link
Contributor Author

ankurdave commented Oct 12, 2021

Looks like the PySpark tests pyspark.sql.tests.test_pandas_udf_scalar and pyspark.tests.test_rdd both timed out after ~6 hours. This makes sense: the deadlock should be easy to repro since we should always hit it when interrupting the writer thread.

I updated the PR to release the TaskContextImpl lock before invoking the listeners.

@SparkQA
Copy link

SparkQA commented Oct 12, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48635/

@SparkQA
Copy link

SparkQA commented Oct 12, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48635/

@SparkQA
Copy link

SparkQA commented Oct 12, 2021

Test build #144157 has finished for PR 34245 at commit e7cc1f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

onCompleteCallbacks += listener
false
}
}
Copy link
Contributor

@timarmstrong timarmstrong Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API doesn't intend to guarantee any ordering of when the task completion listeners are called AFAICT. I think before this change the implementation ends up guaranteeing that the listeners are called sequentially. So it seems possible that some code could be accidentally depending on that.

This might be overengineering it, but we could have a scheme that avoided the deadlock issues and guaranteed sequential execution of callbacks. You would have at most one single thread at any point in time responsible for invoking callbacks. If another thread needs to invoke a callback, it either delegates it to the current callback invocation thread, or it becomes the callback execution thread itself. This means that the callback invocation thread needs to first invoke all of the current registered callbacks, but when it's done with those, check to see if any more callbacks have been queued.

I think we could do that by having the callback invocation thread taking ownership of the current callbacks list, but after invoking those callbacks checking to see if any more have been queued. We'd also need a variable to track if there's a current callback execution thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point that we'd be changing the behavior of this API. It would be nice to preserve the sequential execution behavior, but it does seem pretty complex. I can try implementing it and see whether it's worth it.

Either way, we should probably document and test the behavior more thoroughly. In the current state of the PR, I think the guarantee is something like the following: "Two listeners registered in the same thread will be invoked in reverse order of registration if the task finishes after both are registered. There are no ordering guarantees for listeners registered in different threads, and they may execute concurrently."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no ordering guarantees for listeners registered in different threads

I agree. When there are multiple threads I don't think we can define an "order".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timarmstrong I implemented your suggestion to ensure sequential execution of listeners - it wasn't too complex after all. I also added tests to verify sequential execution, ordering, and liveness in case of reentrancy.

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48851/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48851/

@SparkQA
Copy link

SparkQA commented Oct 18, 2021

Test build #144377 has finished for PR 34245 at commit cd12b46.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48863/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48863/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Test build #144389 has finished for PR 34245 at commit cd12b46.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

also cc @JoshRosen @Ngone51

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48888/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48888/

@SparkQA
Copy link

SparkQA commented Oct 19, 2021

Test build #144414 has finished for PR 34245 at commit 25b5c0e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.2!

@cloud-fan cloud-fan closed this in dfca1d1 Oct 21, 2021
cloud-fan pushed a commit that referenced this pull request Oct 21, 2021
… task completion listener returns

### What changes were proposed in this pull request?

Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket.

When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1]

The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately.
4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing.
5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor.

This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit.
4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows.

TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners.

[^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.)

### Why are the changes needed?

Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

A [previous PR](#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds.

An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time.

Closes #34245 from ankurdave/SPARK-33277-thread-join.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dfca1d1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

I didn't realize that the linked JIRA ticket is the old one. @ankurdave can you create a new JIRA ticket for this bug? thanks!

@ankurdave
Copy link
Contributor Author

ankurdave commented Oct 21, 2021

@cloud-fan Thanks! I created https://issues.apache.org/jira/browse/SPARK-37088.

I noticed that test_udf_with_column_vector failed in branch-3.2, seemingly with the same segfault as before. I'll try to repro locally and continue to investigate.

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f2e634990f8, pid=81201, tid=0x00007f2e15aa8700
#
# JRE version: OpenJDK Runtime Environment (8.0_292-b10) (build 1.8.0_292-8u292-b10-0ubuntu1~20.04-b10)
# Java VM: OpenJDK 64-Bit Server VM (25.292-b10 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# J 17300 C2 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V (194 bytes) @ 0x00007f2e634990f8 [0x00007f2e63499060+0x98]

@cloud-fan cloud-fan changed the title [SPARK-33277][PYSPARK][SQL] Writer thread must not access input after task completion listener returns [SPARK-37088][PYSPARK][SQL] Writer thread must not access input after task completion listener returns Oct 21, 2021
@ankurdave
Copy link
Contributor Author

I noticed it occurred on another recent PR as well: #34352 failed in test_pandas_udf_with_column_vector.

I was also able to repro this locally on branch-3.2 using the following commands:

./build/sbt -Phive package
./build/sbt test:compile
seq 100 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf'

Here's the location of the segfault:

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f2569052836, pid=25950, tid=0x00007f2358fd5700
#
# JRE version: OpenJDK Runtime Environment (8.0_292-b10) (build 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10)
# Java VM: OpenJDK 64-Bit Server VM (25.292-b10 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# v  ~StubRoutines::jlong_disjoint_arraycopy
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

---------------  T H R E A D  ---------------

Current thread (0x00007f24082e7800):  JavaThread "stdout writer for /usr/bin/python3.6" daemon [_thread_in_Java, id=4879, stack(0x00007f2358ed5000,0x00007f2358fd6000)]

siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 0x0000000000004400

Registers:
RAX=0x00000000f9c9f5a0, RBX=0x0000000000004400, RCX=0x0000000000007ff8, RDX=0xfffffffffffff888
RSP=0x00007f2358fd39d0, RBP=0x00007f2358fd39d0, RSI=0x0000000000004400, RDI=0x00000000f9c9f598
R8 =0x0000000000008000, R9 =0x0000000000000000, R10=0x00007f2569052e20, R11=0x0000000000000010
R12=0x0000000000000000, R13=0x0000000000000000, R14=0x0000000000100000, R15=0x00007f24082e7800
RIP=0x00007f2569052836, EFLAGS=0x0000000000010286, CSGSFS=0x002b000000000033, ERR=0x0000000000000006
  TRAPNO=0x000000000000000e

Top of Stack: (sp=0x00007f2358fd39d0)
0x00007f2358fd39d0:   00000000f9c9b990 00007f2569ff90a0
[...]

Instructions: (pc=0x00007f2569052836)
0x00007f2569052816:   48 8b 44 d7 08 48 89 44 d1 08 48 ff c2 75 f1 48
0x00007f2569052826:   33 c0 c9 c3 66 0f 1f 44 00 00 c5 fe 6f 44 d7 c8
0x00007f2569052836:   c5 fe 7f 44 d1 c8 c5 fe 6f 4c d7 e8 c5 fe 7f 4c
0x00007f2569052846:   d1 e8 48 83 c2 08 7e e2 48 83 ea 04 7f 10 c5 fe

Register to memory mapping:

RAX=0x00000000f9c9f5a0 is an oop

[error occurred during error reporting (printing register info), id 0xb]

Stack: [0x00007f2358ed5000,0x00007f2358fd6000],  sp=0x00007f2358fd39d0,  free space=1018k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
v  ~StubRoutines::jlong_disjoint_arraycopy
J 19184 C2 org.apache.spark.unsafe.Platform.copyMemory(Ljava/lang/Object;JLjava/lang/Object;JJ)V (124 bytes) @ 0x00007f2569ff90a0 [0x00007f2569ff9020+0x80]
j  org.apache.spark.sql.execution.vectorized.OffHeapColumnVector.putLongsLittleEndian(II[BI)V+32
j  org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readLongs(ILorg/apache/spark/sql/execution/vectorized/WritableColumnVector;I)V+45
j  org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$LongUpdater.readValues(IILorg/apache/spark/sql/execution/vectorized/WritableColumnVector;Lorg/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader;)V+5
j  org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(Lorg/apache/spark/sql/execution/datasources/parquet/ParquetReadState;Lorg/apache/spark/sql/execution/vectorized/WritableColumnVector;Lorg/apache/spark/sql/execution/vectorized/WritableColumnVector;Lorg/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader;Lorg/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater;)V+260
j  org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(Lorg/apache/spark/sql/execution/datasources/parquet/ParquetReadState;Lorg/apache/spark/sql/execution/vectorized/WritableColumnVector;Lorg/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader;Lorg/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater;)V+7
j  org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(ILorg/apache/spark/sql/execution/vectorized/WritableColumnVector;)V+375
j  org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch()Z+112
j  org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue()Z+13
j  org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext()Z+19
j  org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext()Z+18
j  org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext()Z+8
j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage1;)V+6
J 19200 C2 org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V (194 bytes) @ 0x00007f256a3a3a5c [0x00007f256a3a3820+0x23c]
J 18817 C2 org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext()Z (31 bytes) @ 0x00007f256c7bb2c0 [0x00007f256c7bb260+0x60]
J 19187 C2 org.apache.spark.ContextAwareIterator.hasNext()Z (38 bytes) @ 0x00007f256a37f1ac [0x00007f256a37f0a0+0x10c]
J 12714 C2 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ 0x00007f256a9cf0a4 [0x00007f256a9cf060+0x44]
J 12714 C2 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ 0x00007f256a9cf0a4 [0x00007f256a9cf060+0x44]
J 19215 C2 scala.collection.Iterator$GroupedIterator.takeDestructively(I)Lscala/collection/Seq; (50 bytes) @ 0x00007f25696b7c38 [0x00007f25696b7ae0+0x158]
J 12032 C1 scala.collection.Iterator$GroupedIterator.go(I)Z (218 bytes) @ 0x00007f25695c144c [0x00007f25695c0fc0+0x48c]
J 10331 C1 scala.collection.Iterator$GroupedIterator.fill()Z (42 bytes) @ 0x00007f256aa58f14 [0x00007f256aa58b20+0x3f4]
J 10330 C1 scala.collection.Iterator$GroupedIterator.hasNext()Z (18 bytes) @ 0x00007f256aa595bc [0x00007f256aa59500+0xbc]
J 12714 C2 scala.collection.Iterator$$anon$10.hasNext()Z (10 bytes) @ 0x00007f256a9cf0a4 [0x00007f256a9cf060+0x44]
J 7864 C2 scala.collection.AbstractIterator.foreach(Lscala/Function1;)V (6 bytes) @ 0x00007f2569ab96f0 [0x00007f2569ab9660+0x90]
J 16662 C1 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(Lscala/collection/Iterator;Ljava/io/DataOutputStream;)V (14 bytes) @ 0x00007f256becae5c [0x00007f256becaa00+0x45c]
j  org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(Ljava/io/DataOutputStream;)V+8
J 16844 C1 org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Lorg/apache/spark/api/python/BasePythonRunner$WriterThread;)Ljava/lang/Object; (952 bytes) @ 0x00007f256bfabaa4 [0x00007f256bfa05e0+0xb4c4]
J 16843 C1 org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$2220.apply()Ljava/lang/Object; (8 bytes) @ 0x00007f256bf70cfc [0x00007f256bf70c80+0x7c]
J 15649 C1 org.apache.spark.util.Utils$.logUncaughtExceptions(Lscala/Function0;)Ljava/lang/Object; (66 bytes) @ 0x00007f256ba5dcac [0x00007f256ba5dba0+0x10c]
J 16840 C1 org.apache.spark.api.python.BasePythonRunner$WriterThread.run()V (14 bytes) @ 0x00007f256bf9c794 [0x00007f256bf9c340+0x454]
v  ~StubRoutines::call_stub
V  [libjvm.so+0x6b04aa]
V  [libjvm.so+0x6ada8b]
V  [libjvm.so+0x6ae077]
V  [libjvm.so+0x755edb]
V  [libjvm.so+0xb08d2f]
V  [libjvm.so+0xb0a0fa]
V  [libjvm.so+0x990552]
C  [libpthread.so.0+0x76db]  start_thread+0xdb


---------------  P R O C E S S  ---------------

Java Threads: ( => current thread )
  0x00007f24082e3800 JavaThread "Worker Monitor for /usr/bin/python3.6" daemon [_thread_blocked, id=4880, stack(0x00007f234dc45000,0x00007f234dd46000)]
=>0x00007f24082e7800 JavaThread "stdout writer for /usr/bin/python3.6" daemon [_thread_in_Java, id=4879, stack(0x00007f2358ed5000,0x00007f2358fd6000)]
[...]

@ankurdave
Copy link
Contributor Author

I think I know why this is happening. The task completion listener that closes the vectorized reader is registered lazily in ParquetFileFormat#buildReaderWithPartitionValues(). Since task completion listeners are executed in reverse order of registration, it always runs before the writer thread can be interrupted.

I didn't realize this was the case, and it contradicts the assumption in this PR that task completion listeners are registered bottom-up.

I'll submit a new PR to fix this.

@HyukjinKwon
Copy link
Member

Hm, the UDF tests (test_udf_with_column_vector) became flaky IMO from this PR:

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f1cf90527b5, pid=49084, tid=0x00007f1ca0bed700
#
# JRE version: OpenJDK Runtime Environment (8.0_292-b10) (build 1.8.0_292-8u292-b10-0ubuntu1~20.04-b10)
# Java VM: OpenJDK 64-Bit Server VM (25.292-b10 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# v  ~StubRoutines::jlong_disjoint_arraycopy
#
# Core dump written. Default location: /__w/spark/spark/core or core.49084
#
# An error report file with more information is saved as:
# /__w/spark/spark/hs_err_pid49084.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

https://github.com/apache/spark/runs/3981850943?check_suite_focus=true
https://github.com/apache/spark/runs/3981850943?check_suite_focus=true

@ankurdave
Copy link
Contributor Author

Yeah, sorry about that. The flakiness should be fixed by #34369 when we merge that PR.

HyukjinKwon pushed a commit that referenced this pull request Oct 25, 2021
…ner lazily

### What changes were proposed in this pull request?

The previous PR #34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault.

The fix is to close the output iterator using FileScanRDD's task completion listener.

### Why are the changes needed?

Without this PR, the Python tests introduced in #34245 are flaky ([see details in thread](#34245 (comment))). They intermittently fail with a segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Repeatedly ran one of the Python tests introduced in #34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs.

```sh
./build/sbt -Phive clean package && ./build/sbt test:compile
seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf'
```

Closes #34369 from ankurdave/SPARK-37089.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Oct 25, 2021
…ner lazily

### What changes were proposed in this pull request?

The previous PR #34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault.

The fix is to close the output iterator using FileScanRDD's task completion listener.

### Why are the changes needed?

Without this PR, the Python tests introduced in #34245 are flaky ([see details in thread](#34245 (comment))). They intermittently fail with a segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Repeatedly ran one of the Python tests introduced in #34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs.

```sh
./build/sbt -Phive clean package && ./build/sbt test:compile
seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf'
```

Closes #34369 from ankurdave/SPARK-37089.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 1fc1d07)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
sunchao pushed a commit to sunchao/spark that referenced this pull request Dec 8, 2021
… task completion listener returns

### What changes were proposed in this pull request?

Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket.

When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1]

The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately.
4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing.
5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor.

This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit.
4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows.

TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners.

[^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.)

### Why are the changes needed?

Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

A [previous PR](apache#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds.

An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time.

Closes apache#34245 from ankurdave/SPARK-33277-thread-join.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dfca1d1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
sunchao pushed a commit to sunchao/spark that referenced this pull request Dec 8, 2021
…ner lazily

### What changes were proposed in this pull request?

The previous PR apache#34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault.

The fix is to close the output iterator using FileScanRDD's task completion listener.

### Why are the changes needed?

Without this PR, the Python tests introduced in apache#34245 are flaky ([see details in thread](apache#34245 (comment))). They intermittently fail with a segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Repeatedly ran one of the Python tests introduced in apache#34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs.

```sh
./build/sbt -Phive clean package && ./build/sbt test:compile
seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf'
```

Closes apache#34369 from ankurdave/SPARK-37089.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 1fc1d07)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 22, 2022
… task completion listener returns

### What changes were proposed in this pull request?

Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket.

When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1]

The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately.
4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing.
5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor.

This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit.
4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows.

TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners.

[^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.)

### Why are the changes needed?

Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

A [previous PR](apache#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds.

An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time.

Closes apache#34245 from ankurdave/SPARK-33277-thread-join.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dfca1d1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 22, 2022
…ner lazily

### What changes were proposed in this pull request?

The previous PR apache#34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault.

The fix is to close the output iterator using FileScanRDD's task completion listener.

### Why are the changes needed?

Without this PR, the Python tests introduced in apache#34245 are flaky ([see details in thread](apache#34245 (comment))). They intermittently fail with a segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Repeatedly ran one of the Python tests introduced in apache#34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs.

```sh
./build/sbt -Phive clean package && ./build/sbt test:compile
seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf'
```

Closes apache#34369 from ankurdave/SPARK-37089.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 1fc1d07)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
catalinii pushed a commit to lyft/spark that referenced this pull request Mar 4, 2022
… task completion listener returns

### What changes were proposed in this pull request?

Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket.

When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1]

The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately.
4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing.
5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor.

This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit.
4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows.

TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners.

[^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.)

### Why are the changes needed?

Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

A [previous PR](apache#30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds.

An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time.

Closes apache#34245 from ankurdave/SPARK-33277-thread-join.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit dfca1d1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Mar 4, 2022
…ner lazily

### What changes were proposed in this pull request?

The previous PR apache#34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault.

The fix is to close the output iterator using FileScanRDD's task completion listener.

### Why are the changes needed?

Without this PR, the Python tests introduced in apache#34245 are flaky ([see details in thread](apache#34245 (comment))). They intermittently fail with a segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Repeatedly ran one of the Python tests introduced in apache#34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs.

```sh
./build/sbt -Phive clean package && ./build/sbt test:compile
seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf'
```

Closes apache#34369 from ankurdave/SPARK-37089.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 1fc1d07)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants