Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a hang for Pandas UDFs on DB 13.3[databricks] #9833

Merged
merged 13 commits into from
Nov 30, 2023

Conversation

firestarman
Copy link
Collaborator

@firestarman firestarman commented Nov 22, 2023

fix #9493
fix #9844

The python runner uses two separate threads to write and read data with Python processes, however on DB13.3, it becomes single-threaded, which means reading and writing run on the same thread.
Now the first reading is always ahead of the first writing. But the original BatchQueue will wait on the first reading until the first writing is done. Then it will wait forever.

Change made:

  1. Update the BatchQueue to support asking for a batch instead of waiting unitl one is inserted into the queue. This can eliminate the order requirement of reading and writing.
  2. Introduce a new class named BatchProducer to work with the new BatchQueue to support rows number peek on demand for the reading.
  3. Apply this new BatchQueue to relevant plans.
  4. Update GpuArrowPythonRunner to support writing one batch one time for the singled-threaded model.
  5. Found an issue about PythonUDAF and RunningWindoFunctionExec, it may be a bug specific to DB 13.3.
  6. Other small refactors

along with other failures

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman firestarman changed the title Fix a hang for Pandas UDFs on DB 13.3 Fix a hang for Pandas UDFs on DB 13.3[databricks] Nov 22, 2023
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

@sameerz sameerz added the bug Something isn't working label Nov 27, 2023
@firestarman
Copy link
Collaborator Author

build

}
pending = None
while (curNumRows < targetNumRows) {
val cb = inputBatchQueue.remove()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to potentially cause upstream processing to happen? If so then buf, should be a buffer of SpillableColumnerBatch instead of ColumnarBatch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, When I change to use the new BatchProducer, this happens, even on Spark 31X.
Inside this "inputBatchQueue", we indeed store SpillableColumnerBatchs .

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I see now that it is peekBatchNumRows that can pull from the upstream iterator. I guess the current code is correct, but it is not that intuitive.


override def next(): ColumnarBatch = {
val numRows = inputBatchQueue.peekBatchSize
val numRows = inputBatchQueue.peekBatchNumRows()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a pending for the input rows, shouldn't they be included in the calculation for the number of rows we want to read from python? But then I don't know what setMinReadTargetNumRows really does. The only way this code appears to work is if that is the exact number of rows that has to be returned and pending is never set to anything.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setMinReadTargetNumRows is just renamed from setMinReadTargetBatchSize. But actually it is talking about the number of rows, so I changed its name.

If we have a pending for the input rows, shouldn't they be included in the calculation for the number of rows we want to read from python?

Yes, we are doing this all the time, I don't change this behavior. The rows number (originally it is the targetBatchSize) is used by the JNI arrow reader to try to get the next batch from the Python side too.

            val table =
              withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ =>
                arrowReader.getNextIfAvailable(minReadTargetBatchSize)
              }

https://github.com/NVIDIA/spark-rapids/blob/branch-24.02/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuPythonArrowShims.scala#L131

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even we set this target rows number (or the original batch size), we still get bigger rows number from the Python side after using the new BatchProducer. So "concatInputBatch" method is introduced to let the next input batch to be combined has the same rows number with the Python output.

Look at the code here https://github.com/rapidsai/cudf/blob/branch-24.02/java/src/main/native/src/TableJni.cpp#L471.
Seems it is possible we can get more rows than expected.
We append the batch first, then do the rows number calculation and comparison. The batch that may cause rows overflow has already been appended.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the single threaded patch I can see it serializing some data to a buffer and then writing that out/refilling the buffer as the channel is marked as writable. So I can see it writing out more than a single batch of data at once to the python process, but so did the multi-threaded setup from before. So then either the old code had a bug in it, the python code changed, or we are using this code in more situations than we did before.

I think it has to be the latter because looking at the open source patches I don't see any place where python code changes. But then again this might be a databricks specific change.

I am just trying to understand why this change is happening when it was not needed before.

* the IteratorBatchQueue.
* The cached batches will be used for later combination each time getting
* a result batch from the Python side.
* It will transform the input batch by "cacheConverter" and cache the result.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really confused by this. So we modify the data twice. Why? We have a queue and pending. Batches from queue are returned by remove. But pending comes from next? It feels like what you really want is a way to split an input iterator into two and then provide different operations that can be done on the two. The interface here is the confusing part for me. I would much rather see us have an object that will take the input and the two converters and return two iterators.

Also this is always used with the CombiningIterator. Should we just put all of that together and have a single API that takes an input iterator, and two separate functions that will process that input in different ways. It will return a single iterator that has combined the two results together after doing the processing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just put all of that together and have a single API that takes an input iterator, and two separate functions that will process that input in different ways.

No, CombiningIterator depends on the Python runner, it can not be an input to the Python runner. And one of the data pipeline is used as the runner input.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this BatchProducer, and updated the original BatchQueue to support asking for a batch instead of waiting until one is inserted by the writer is writing a batch to Python. Hope it cause less confusion.

The BatchProducer played the roles for both the python input iterator and the orignal BatchQueue. So it looked like the data was modified twice. There were two data pipelines in this class. The iterator-like one was used as the python input, and the other queue-like one was to replace the original BatchQueue. I just put the two pipelines into one class. Because they should know each other when they were pulling in batches from the input iterator.

Either this BatchProducer or the updated BatchQueue, the main goal I want is to eliminate the order requirement of reading and writing. Our original BatchQueue requires the first writing is done before a reading by wating on a lock, but the single threaded python runner will start the first writing after the first reading, then a hang comes up.

The elimination of the order requirement is done by pulling in a batch when the cache queue is empty and the reader triggers a rows number peek. So now both the reader and the writer can trigger the batch pulling from the input iterator. This is different from what we did before. Before this PR, the batch pulling is only triggered by the writer, then we add the pulled batches to the queue for the later rows number peek and result combination by the reader. So reader can not procceed if the writer does not write any batch.

But with this PR, the reader can procceed even the writer does not write any batch, because the reader will trigger batch pulling to get a batch for the rows number peek. This batch is cached in the BatchQueue by the reader for later result combination, and it is also cached by the writer for the following "next" call.

@pytest.mark.skipif(not (is_databricks_runtime() and is_spark_341()), reason="only for DB13.3")
@pytest.mark.parametrize('data_gen', [int_gen], ids=idfn)
@pytest.mark.parametrize('window', [running_win_param], ids=['Unbounded_Preceding'])
def test_window_aggregate_udf_on_cpu(data_gen, window):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this test even exit? Is this supposed to detect when databricks fixes their bug? If so it will just end up with an xpass and we will ignore it. If you want that we need to verify that it failed in the way we expect it to and fail the test if it does not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to prove that this should be a bug on DB13.3.
I decided to remove it.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator Author

@revans2 Could you take a look again ?

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generally this looks good, but with how large this change is and how late it is in 23.12 I would feel much better if we put this into 24.02 and either disable the python UDF support in 23.12 or push disable all of DB 13.3 in 23.12


override def next(): ColumnarBatch = {
val numRows = inputBatchQueue.peekBatchSize
val numRows = inputBatchQueue.peekBatchNumRows()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the single threaded patch I can see it serializing some data to a buffer and then writing that out/refilling the buffer as the channel is marked as writable. So I can see it writing out more than a single batch of data at once to the python process, but so did the multi-threaded setup from before. So then either the old code had a bug in it, the python code changed, or we are using this code in more situations than we did before.

I think it has to be the latter because looking at the open source patches I don't see any place where python code changes. But then again this might be a databricks specific change.

I am just trying to understand why this change is happening when it was not needed before.

}
pending = None
while (curNumRows < targetNumRows) {
val cb = inputBatchQueue.remove()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I see now that it is peekBatchNumRows that can pull from the upstream iterator. I guess the current code is correct, but it is not that intuitive.

val rColumns = GpuColumnVector.extractColumns(rBatch).map(_.incRefCount())
new ColumnarBatch(lColumns ++ rColumns, lBatch.numRows())
private def concatInputBatch(targetNumRows: Int): ColumnarBatch = {
withResource(mutable.ArrayBuffer[ColumnarBatch]()) { buf =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you file a follow on issue for us to look into adding a retry block in here? We are building up a list of columnar batches, even though it is likely to only be one. And we are also potentially spilling input data while we do it. None of this code has retry in it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it in this PR since it targets to 24.02 now.

@@ -170,63 +170,154 @@ class RebatchingRoundoffIterator(
}
}

/** A listener to work with the BatchQueue. */
trait BatchQueuePeekListener {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a follow on issue to come back and try to remove the custom traits that are extending Iterators to not act like Iterators any more. I don't like Iterators that don't behave like a real iterator and need special calls to make them behave properly. Either we need to make it so it is not an Iterator, or it is an iterator and does not need special calls to keep it working properly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it in this PR since it targets to 24.02 now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced BatchProducer again to support the peek operation for BatchQueue, replacing the listener things. The BatchProducer accepts an iterator as the input and supports accessing the batches from the input by a new iterator returned from asIterator method.
Now BatchProducer is not an iterator, but it can produce an pure iterator.

@firestarman firestarman changed the base branch from branch-23.12 to branch-24.02 November 29, 2023 00:46
@firestarman
Copy link
Collaborator Author

retarget to 24.02

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

pendingInput = Some(second)
}

val ret = GpuSubPartitionHashJoin.concatSpillBatchesAndClose(buf.toSeq)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we move this out of GpuSubPartitionHashJoin and into a util of some kind. It just feels odd that we are calling into join for something that has nothing to do with a join.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i did it in this PR #9902

@firestarman firestarman merged commit 6d35e46 into NVIDIA:branch-24.02 Nov 30, 2023
37 checks passed
@firestarman firestarman deleted the fix-9493 branch November 30, 2023 01:12
ttnghia added a commit to ttnghia/spark-rapids that referenced this pull request Dec 1, 2023
ttnghia added a commit to ttnghia/spark-rapids that referenced this pull request Dec 1, 2023
razajafri pushed a commit to razajafri/spark-rapids that referenced this pull request Jan 25, 2024
fix NVIDIA#9493
fix NVIDIA#9844

The python runner uses two separate threads to write and read data with Python processes, 
however on DB13.3, it becomes single-threaded, which means reading and writing run on the same thread.
Now the first reading is always ahead of the first writing. But the original BatchQueue will wait
on the first reading until the first writing is done. Then it will wait forever.

Change made:

- Update the BatchQueue to support asking for a batch instead of waiting unitl one is inserted into the queue. 
   This can eliminate the order requirement of reading and writing.
- Introduce a new class named BatchProducer to work with the new BatchQueue to support rows number
   peek on demand for the reading.
- Apply this new BatchQueue to relevant plans.
- Update the Python runners to support writing one batch one time for the singled-threaded model.
- Found an issue about PythonUDAF and RunningWindoFunctionExec, it may be a bug specific to DB 13.3,
   and add a test (test_window_aggregate_udf_on_cpu) for it.
- Other small refactors
---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
razajafri added a commit that referenced this pull request Jan 26, 2024
* Download Maven from apache.org archives (#10225)

Fixes #10224 

Replace broken install using apt by downloading Maven from apache.org.

Signed-off-by: Gera Shegalov <gera@apache.org>

* Fix a hang for Pandas UDFs on DB 13.3[databricks] (#9833)

fix #9493
fix #9844

The python runner uses two separate threads to write and read data with Python processes, 
however on DB13.3, it becomes single-threaded, which means reading and writing run on the same thread.
Now the first reading is always ahead of the first writing. But the original BatchQueue will wait
on the first reading until the first writing is done. Then it will wait forever.

Change made:

- Update the BatchQueue to support asking for a batch instead of waiting unitl one is inserted into the queue. 
   This can eliminate the order requirement of reading and writing.
- Introduce a new class named BatchProducer to work with the new BatchQueue to support rows number
   peek on demand for the reading.
- Apply this new BatchQueue to relevant plans.
- Update the Python runners to support writing one batch one time for the singled-threaded model.
- Found an issue about PythonUDAF and RunningWindoFunctionExec, it may be a bug specific to DB 13.3,
   and add a test (test_window_aggregate_udf_on_cpu) for it.
- Other small refactors
---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Fix a potential data corruption for Pandas UDF (#9942)

This PR moves the BatchQueue into the DataProducer to share the same lock as the output iterator
returned by asIterator,  and make the batch movement from the input iterator to the batch queue be
an atomic operation to eliminate the race when appending the batches to the queue.

* Do some refactor for the Python UDF code to try to reduce duplicate code. (#9902)

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Fixed 330db Shims to Adopt the PythonRunner Changes [databricks] (#10232)

This PR removes the old 330db shims in favor of the new Shims, similar to the one in 341db. 

**Tests:**
Ran udf_test.py on Databricks 11.3 and they all passed. 

fixes #10228 

---------

Signed-off-by: raza jafri <rjafri@nvidia.com>

---------

Signed-off-by: Gera Shegalov <gera@apache.org>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: raza jafri <rjafri@nvidia.com>
Co-authored-by: Gera Shegalov <gera@apache.org>
Co-authored-by: Liangcai Li <firestarmanllc@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
5 participants