New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27870][SQL][PySpark] Flush batch timely for pandas UDF (for improving pandas UDFs pipeline) #24734
[SPARK-27870][SQL][PySpark] Flush batch timely for pandas UDF (for improving pandas UDFs pipeline) #24734
Conversation
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) | ||
assert(outputTypes == actualDataTypes, "Invalid schema from pandas_udf: " + | ||
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") | ||
batch.rowIterator.asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some explanation here:
The master impl has an issue,
the member "private var currentIter = ..." will be computed when create the returned iterator. Computing the currentIter
require to read the first element of columnarBatchIter
. But note that this block code being called from EvalPythonExec.doExecute
, we should not read iterator here, we can only generate iterator. Reading iterator should start after the total iterator pipeline constructed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, you are saying that when the Iterator
is constructed, the first batch of rows is read and that is wrong? I think you are right about that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a separate issue that could be merged separately. I created https://issues.apache.org/jira/browse/SPARK-27968 and I will submit a PR to fix it:
Test build #105887 has finished for PR 24734 at commit
|
Test build #105904 has finished for PR 24734 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeichenXu123 I'm not sure that adding calls toflush
will help performance here. This is usually only done when one side is waiting on data that is too small to fill the buffer. In ArrowEvalPythonExec, it will continuously write until the buffer is full, so Python will always be able to read data. So I don't see where either side is waiting and a flush
would help. Maybe you can show some benchmarks?
@@ -87,6 +87,7 @@ class ArrowPythonRunner( | |||
|
|||
arrowWriter.finish() | |||
writer.writeBatch() | |||
dataOut.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block of code will write to dataOut
until it is blocked because the write buffer is full. I think adding a flush
here will only hurt performance
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) | ||
assert(outputTypes == actualDataTypes, "Invalid schema from pandas_udf: " + | ||
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") | ||
batch.rowIterator.asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, you are saying that when the Iterator
is constructed, the first batch of rows is read and that is wrong? I think you are right about that
@BryanCutler The issue is that without flush the reader side might not even see the data. I think we need a method like async flush here. |
You mean the reader in Python? Could you explain a little more about how that would happen? The way it looks to me is that Scala continuously writes data and is only blocked when the write buffer is full, meaning Python will always have data to read and a flush would not help. |
If a buffered writer is used, it doesn't flush data to output stream immediately. So the reader on the other side, even trying to read the data from the stream at all time, cannot see the data until the buffered writer decides to flush. |
Right, with a buffered writer, the data is written once the buffer is full. With both the Scala and Python writers, they will write batches continuously in a loop, only blocking when the buffer is full and the other side hasn't read the previous data. If by chance a single batch doesn't fill the write buffer, then the next batch is written and so on until the buffer is full. The only time when the buffer could not be filled is the final batch, then the stream is closed, which calls a flush. Are you thinking of the case where an entire batch does not completely fill the write buffer? |
The performance issue (on master code) occurs at the beginning time. Suppose the batch size is small and the write buffer can hold 100 batches, now the first 100 batches could not be pipelined when run UDFs. Let's give a example, suppose there're 3 pandas UDFs (UDF1, UDF2, UDF3) pipelined: On my PR, it will be:
On the master code, it will be: (suppose write buffer can hold 100 batches)
We can see on the master code, the downstream UDF (UDF2 and UDF3) lag behind 100 batches in the pipeline. So the first 200 batches, the performance is bad. This case typically matters, in some machine learning cases, for real-time prediction, we need the batch size to be small, and inside UDF, it will run some code on GPU which will consume much time, so it deserves to parallelize these UDFs computation as much as possible. Thanks for your review! |
if this is performance, is there benchmarking result before/after? |
@felixcheung The performance matters in the case batch-size small and UDF do heavy computation. My PR: The first 100 batches computation will consume time about: 100 * 3s. Typical scenario
That's the difference. Thanks! |
What about sending large batches to Python, then taking smaller slices of each batch before sending for inference? Pyarrow supports zero-copy slices. In your use case you make 100 write calls to Python where 1 would be enough, and that adds some overhead. |
"Making batch size small" is the ML realtime inference requirement. This is another issue. Such as, input data coming from a stream, we need deal with coming data as soon as possible instead of accumulating a large batch. Under the case "we already choose small batch size", this PR improve the pipeline performance. Thanks! |
BTW, is the buffer size 65536 bytes? So .. the issue is that we should wait until 65536 bytes is full? Why don't we simply add a config to control the buffer size then? |
Yes, I think this is the right approach if there is too much latency between endpoints. There is a config for the Scala side |
@BryanCutler But python side write buffer size is hardcoded... and I doubt Scala side We can discuss in two case:
|
@WeichenXu123 this change might improve perf for your case where a pandas_udf takes massive amount of time on small data, but my concern is it could have a negative effect on other cases where IO is the main bottleneck. This PR changes all pandas_udf IO, not just scalar ones too. Could you not post a simple benchmark script and run some numbers? |
@BryanCutler Sure I will benchmark. And I can change it to be only Scalar UDF do per batch flushing. |
Can't we add a configuration to control this buffer size, spark/python/pyspark/daemon.py Lines 57 to 58 in cc613b5
and presumably spark/python/pyspark/java_gateway.py Line 181 in 3d6066e
|
Test build #106081 has finished for PR 24734 at commit
|
Thanks for posting some benchmarks @WeichenXu123 , I ran them and got similar results and it doesn't look like this would have a negative impact. For the pipeline case, I did try changing the buffer size and saw the same improvement compared to this PR. I still think this is a better solution because it's cleaner and would also help with non-pandas udfs, but I'm not against this PR if you want to continue this way. BTW, when changing the buffer size, you don't have to guess the batch size. Any write that is larger than the buffer will just write immediately. This is effectively a flush, but better because that would first copy the memory to the buffer then wait for another call to |
@BryanCutler yes, I also consider about tuning "buffer size", but one of my concern is, user may mix ML prediction with other simple udf (such as data preprocessing). Such as, some data from spark streaming, first run a simple udf, then a pipelined complex udf (which do ML prediction), if we tune the "buffer size" globally to be small, then it will hurt the first simple udf performance. Thanks! |
Test build #106262 has finished for PR 24734 at commit
|
4b5b3df
to
b7bf007
Compare
Had offline discussion with @WeichenXu123 and I removed
Additional flush doesn't hurt 1) and 3) because writing a large batch (bigger than buffer size) would trigger flush already. It would greatly help 4) by reducing the latency. It does add more flush calls to 2), at the rate no higher than 10/second. This is significant only if without it the flush frequency is much less than 10/second. Given the default buffer size 65536, the data volume is less than 1MB/s to keep the flush frequency under 10/second. Additional flush to local tcp socket at this rate doesn't hurt much. |
while (nextBatch.hasNext) { | ||
arrowWriter.write(nextBatch.next()) | ||
var lastFlushTime = System.currentTimeMillis() | ||
inputIterator.foreach { batch => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a small refactor here to simplify the impl.
Test build #106274 has finished for PR 24734 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks! Merged to master.
cc @HyukjinKwon @BryanCutler Feel free to submit the follow-up PR if you have any concern.
Sorry, I don't still get why we need to flush instead of setting no buffer. Did we try #24734 (comment)? Buffer size 0: echo "
buf_size = 0
import socket
import os
import time
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 12345))
server_socket.listen(0)
sock, addr = server_socket.accept()
infile = os.fdopen(os.dup(sock.fileno()), 'rb', buf_size)
print('got %s' % repr(infile.read(5 * 8)))
time.sleep(10)
" > server.py
echo "
buf_size = 0
import socket
import os
import time
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 12345))
outfile = os.fdopen(os.dup(sock.fileno()), 'wb', buf_size)
outfile.write(b'Hello ')
print('sent %s' % repr(b'Hello '))
time.sleep(10)
" > client.py Buffer size 65536 echo "
buf_size = 65536
import socket
import os
import time
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 54321))
server_socket.listen(0)
sock, addr = server_socket.accept()
infile = os.fdopen(os.dup(sock.fileno()), 'rb', buf_size)
print('got %s' % repr(infile.read(5 * 8)))
time.sleep(10)
" > server.py
echo "
buf_size = 65536
import socket
import os
import time
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 54321))
outfile = os.fdopen(os.dup(sock.fileno()), 'wb', buf_size)
outfile.write(b'Hello ')
print('sent %s' % repr(b'Hello '))
time.sleep(10)
" > client.py Run this in two separate terminals: python server.py python client.py |
arrowWriter.finish() | ||
writer.writeBatch() | ||
val currentTime = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use mills seconds? see https://github.com/databricks/scala-style-guide#misc_currentTimeMillis_vs_nanoTime
stream.flush() | ||
# Expect buffer size 4 bytes + buffer data 1 byte. | ||
self.assertEqual(len(wrapped.buffer), 5, "flush should work") | ||
stream.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try / finally
Can we unset Can you guys give me a test case so that I can redo this with buffer size approach? |
I am going to open a PR to revert this for trackability in case the alternative is accepted |
… first row ## What changes were proposed in this pull request? Issued fixed in apache#24734 but that PR might takes longer to merge. ## How was this patch tested? It should pass existing unit tests. Closes apache#24816 from mengxr/SPARK-27968. Authored-by: Xiangrui Meng <meng@databricks.com> Signed-off-by: Xiangrui Meng <meng@databricks.com>
…proving pandas UDFs pipeline) ## What changes were proposed in this pull request? Flush batch timely for pandas UDF. This could improve performance when multiple pandas UDF plans are pipelined. When batch being flushed in time, downstream pandas UDFs will get pipelined as soon as possible, and pipeline will help hide the donwstream UDFs computation time. For example: When the first UDF start computing on batch-3, the second pipelined UDF can start computing on batch-2, and the third pipelined UDF can start computing on batch-1. If we do not flush each batch in time, the donwstream UDF's pipeline will lag behind too much, which may increase the total processing time. I add flush at two places: * JVM process feed data into python worker. In jvm side, when write one batch, flush it * VM process read data from python worker output, In python worker side, when write one batch, flush it If no flush, the default buffer size for them are both 65536. Especially in the ML case, in order to make realtime prediction, we will make batch size very small. The buffer size is too large for the case, which cause downstream pandas UDF pipeline lag behind too much. ### Note * This is only applied to pandas scalar UDF. * Do not flush for each batch. The minimum interval between two flush is 0.1 second. This avoid too frequent flushing when batch size is small. It works like: ``` last_flush_time = time.time() for batch in iterator: writer.write_batch(batch) flush_time = time.time() if self.flush_timely and (flush_time - last_flush_time > 0.1): stream.flush() last_flush_time = flush_time ``` ## How was this patch tested? ### Benchmark to make sure the flush do not cause performance regression #### Test code: ``` numRows = ... batchSize = ... spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', str(batchSize)) df = spark.range(1, numRows + 1, numPartitions=1).select(col('id').alias('a')) pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): return x + 10 beg_time = time.time() result = df.select(sum(fp1('a'))).head() print("result: " + str(result[0])) print("consume time: " + str(time.time() - beg_time)) ``` #### Test Result: params | Consume time (Before) | Consume time (After) ------------ | ----------------------- | ---------------------- numRows=100000000, batchSize=10000 | 23.43s | 24.64s numRows=100000000, batchSize=1000 | 36.73s | 34.50s numRows=10000000, batchSize=100 | 35.67s | 32.64s numRows=1000000, batchSize=10 | 33.60s | 32.11s numRows=100000, batchSize=1 | 33.36s | 31.82s ### Benchmark pipelined pandas UDF #### Test code: ``` spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1') df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a')) pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): print("run fp1") time.sleep(1) return x + 100 pandas_udf("int", PandasUDFType.SCALAR) def fp2(x, y): print("run fp2") time.sleep(1) return x + y beg_time = time.time() result = df.select(sum(fp2(fp1('a'), col('a')))).head() print("result: " + str(result[0])) print("consume time: " + str(time.time() - beg_time)) ``` #### Test Result: **Before**: consume time: 63.57s **After**: consume time: 32.43s **So the PR improve performance by make downstream UDF get pipelined early.** Please review https://spark.apache.org/contributing.html before opening a pull request. Closes apache#24734 from WeichenXu123/improve_pandas_udf_pipeline. Lead-authored-by: WeichenXu <weichen.xu@databricks.com> Co-authored-by: Xiangrui Meng <meng@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
… (for improving pandas UDFs pipeline)" ## What changes were proposed in this pull request? This PR reverts apache@9c4eb99 for the reasons below: 1. An alternative was not considered properly, apache#24734 (comment) apache#24734 (comment) apache#24734 (comment) - I opened a PR apache#24826 2. apache@9c4eb99 fixed timely flushing which behaviour is somewhat hacky and the timing isn't also guaranteed (in case each batch takes longer to process). 3. For pipelining for smaller batches, looks it's better to allow to configure buffer size rather than having another factor to flush ## How was this patch tested? N/A Closes apache#24827 from HyukjinKwon/revert-flush. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…r Pandas UDFs ## What changes were proposed in this pull request? This PR is an alternative approach for apache#24734. This PR fixes two things: 1. Respects `spark.buffer.size` in Python workers. 2. Adds a runtime buffer size configuration for Pandas UDFs, `spark.sql.pandas.udf.buffer.size` (which falls back to `spark.buffer.size`. ## How was this patch tested? Manually tested: ```python import time from pyspark.sql.functions import * spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1') df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a')) pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): print("run fp1") time.sleep(1) return x + 100 pandas_udf("int", PandasUDFType.SCALAR) def fp2(x, y): print("run fp2") time.sleep(1) return x + y beg_time = time.time() result = df.select(sum(fp2(fp1('a'), col('a')))).head() print("result: " + str(result[0])) print("consume time: " + str(time.time() - beg_time)) ``` ``` consume time: 62.68265891075134 ``` ```python import time from pyspark.sql.functions import * spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '1') spark.conf.set('spark.sql.pandas.udf.buffer.size', '4') df = spark.range(1, 31, numPartitions=1).select(col('id').alias('a')) pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): print("run fp1") time.sleep(1) return x + 100 pandas_udf("int", PandasUDFType.SCALAR) def fp2(x, y): print("run fp2") time.sleep(1) return x + y beg_time = time.time() result = df.select(sum(fp2(fp1('a'), col('a')))).head() print("result: " + str(result[0])) print("consume time: " + str(time.time() - beg_time)) ``` ``` consume time: 34.00594782829285 ``` Closes apache#24826 from HyukjinKwon/SPARK-27870. Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Flush batch timely for pandas UDF.
This could improve performance when multiple pandas UDF plans are pipelined.
When batch being flushed in time, downstream pandas UDFs will get pipelined as soon as possible, and pipeline will help hide the donwstream UDFs computation time. For example:
When the first UDF start computing on batch-3, the second pipelined UDF can start computing on batch-2, and the third pipelined UDF can start computing on batch-1.
If we do not flush each batch in time, the donwstream UDF's pipeline will lag behind too much, which may increase the total processing time.
I add flush at two places:
If no flush, the default buffer size for them are both 65536. Especially in the ML case, in order to make realtime prediction, we will make batch size very small. The buffer size is too large for the case, which cause downstream pandas UDF pipeline lag behind too much.
Note
How was this patch tested?
Benchmark to make sure the flush do not cause performance regression
Test code:
Test Result:
Benchmark pipelined pandas UDF
Test code:
Test Result:
Before: consume time: 63.57s
After: consume time: 32.43s
So the PR improve performance by make downstream UDF get pipelined early.
Please review https://spark.apache.org/contributing.html before opening a pull request.