Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batch #12057

Closed
wants to merge 7 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Mar 30, 2016

What changes were proposed in this pull request?

This PR support multiple Python UDFs within single batch, also improve the performance.

>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
>>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
>>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
+- OneRowRelation$

== Analyzed Logical Plan ==
double(add(1, 2)): int, add(double(2), 1): int
Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [double(add(1, 2))#14,add(double(2), 1)#15]
   +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
      +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
         +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
            +- OneRowRelation$

== Optimized Logical Plan ==
Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
   +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
      +- OneRowRelation$

== Physical Plan ==
WholeStageCodegen
:  +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
:     +- INPUT
+- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
   +- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
      +- Scan OneRowRelation[]

How was this patch tested?

Added new tests.

Using the following script to benchmark 1, 2 and 3 udfs,

df = sqlContext.range(1, 1 << 23, 1, 4)
double = F.udf(lambda x: x * 2, LongType())
print df.select(double(df.id)).count()
print df.select(double(df.id), double(df.id + 1)).count()
print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()

Here is the results:

N Before After speed up
1 22 s 7 s 3.1X
2 38 s 13 s 2.9X
3 58 s 16 s 3.6X

This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54504 has finished for PR 12057 at commit 8e6e5bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -77,22 +77,30 @@ private[spark] case class PythonFunction(
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]])


object PythonRunner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private[spark].

@JoshRosen
Copy link
Contributor

Took a quick pass and have a couple of questions to clarify my understanding of this code since I wasn't fully familiar with our old UDF evaluation strategy (for instance, was unfamiliar with our neat trick for handling composed Python UDFs).

@davies
Copy link
Contributor Author

davies commented Mar 31, 2016

@JoshRosen I had addressed you comments also pushed some changes to improve the performance.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54579 has finished for PR 12057 at commit 87f4bb4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54581 has finished for PR 12057 at commit 8dc1adf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54584 has finished for PR 12057 at commit dd71ba9.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54602 has finished for PR 12057 at commit 8597bba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • // enable memo iff we serialize the row with schema (schema and class should be memorized)

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54603 has finished for PR 12057 at commit 72a5ec0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

extends Logging {

require(funcs.length == argOffsets.length, "numArgs should have the same length as funcs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numArgs -> argOffsets

val dataTypes = new ArrayBuffer[DataType]
val argOffsets = inputs.map { input =>
input.map { e =>
if (allInputs.exists(_.semanticEquals(e))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the worst-case this loop is N^2, but N is probably pretty small so it probably doesn't matter compared to other perf. issues impacting Python UDFs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

@JoshRosen
Copy link
Contributor

LGTM pending Jenkins.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54662 has finished for PR 12057 at commit 876f9f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Mar 31, 2016

Merging this into master, thanks!

@asfgit asfgit closed this in f0afafd Mar 31, 2016
asfgit pushed a commit that referenced this pull request May 10, 2017
…repeated arg.

## What changes were proposed in this pull request?

There's a latent corner-case bug in PySpark UDF evaluation where executing a `BatchPythonEvaluation` with a single multi-argument UDF where _at least one argument value is repeated_ will crash at execution with a confusing error.

This problem was introduced in #12057: the code there has a fast path for handling a "batch UDF evaluation consisting of a single Python UDF", but that branch incorrectly assumes that a single UDF won't have repeated arguments and therefore skips the code for unpacking arguments from the input row (whose schema may not necessarily match the UDF inputs due to de-duplication of repeated arguments which occurred in the JVM before sending UDF inputs to Python).

This fix here is simply to remove this special-casing: it turns out that the code in the "multiple UDFs" branch just so happens to work for the single-UDF case because Python treats `(x)` as equivalent to `x`, not as a single-argument tuple.

## How was this patch tested?

New regression test in `pyspark.python.sql.tests` module (tested and confirmed that it fails before my fix).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17927 from JoshRosen/SPARK-20685.

(cherry picked from commit 8ddbc43)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
asfgit pushed a commit that referenced this pull request May 10, 2017
…repeated arg.

## What changes were proposed in this pull request?

There's a latent corner-case bug in PySpark UDF evaluation where executing a `BatchPythonEvaluation` with a single multi-argument UDF where _at least one argument value is repeated_ will crash at execution with a confusing error.

This problem was introduced in #12057: the code there has a fast path for handling a "batch UDF evaluation consisting of a single Python UDF", but that branch incorrectly assumes that a single UDF won't have repeated arguments and therefore skips the code for unpacking arguments from the input row (whose schema may not necessarily match the UDF inputs due to de-duplication of repeated arguments which occurred in the JVM before sending UDF inputs to Python).

This fix here is simply to remove this special-casing: it turns out that the code in the "multiple UDFs" branch just so happens to work for the single-UDF case because Python treats `(x)` as equivalent to `x`, not as a single-argument tuple.

## How was this patch tested?

New regression test in `pyspark.python.sql.tests` module (tested and confirmed that it fails before my fix).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17927 from JoshRosen/SPARK-20685.
asfgit pushed a commit that referenced this pull request May 10, 2017
…repeated arg.

## What changes were proposed in this pull request?

There's a latent corner-case bug in PySpark UDF evaluation where executing a `BatchPythonEvaluation` with a single multi-argument UDF where _at least one argument value is repeated_ will crash at execution with a confusing error.

This problem was introduced in #12057: the code there has a fast path for handling a "batch UDF evaluation consisting of a single Python UDF", but that branch incorrectly assumes that a single UDF won't have repeated arguments and therefore skips the code for unpacking arguments from the input row (whose schema may not necessarily match the UDF inputs due to de-duplication of repeated arguments which occurred in the JVM before sending UDF inputs to Python).

This fix here is simply to remove this special-casing: it turns out that the code in the "multiple UDFs" branch just so happens to work for the single-UDF case because Python treats `(x)` as equivalent to `x`, not as a single-argument tuple.

## How was this patch tested?

New regression test in `pyspark.python.sql.tests` module (tested and confirmed that it fails before my fix).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17927 from JoshRosen/SPARK-20685.

(cherry picked from commit 8ddbc43)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
…repeated arg.

## What changes were proposed in this pull request?

There's a latent corner-case bug in PySpark UDF evaluation where executing a `BatchPythonEvaluation` with a single multi-argument UDF where _at least one argument value is repeated_ will crash at execution with a confusing error.

This problem was introduced in apache#12057: the code there has a fast path for handling a "batch UDF evaluation consisting of a single Python UDF", but that branch incorrectly assumes that a single UDF won't have repeated arguments and therefore skips the code for unpacking arguments from the input row (whose schema may not necessarily match the UDF inputs due to de-duplication of repeated arguments which occurred in the JVM before sending UDF inputs to Python).

This fix here is simply to remove this special-casing: it turns out that the code in the "multiple UDFs" branch just so happens to work for the single-UDF case because Python treats `(x)` as equivalent to `x`, not as a single-argument tuple.

## How was this patch tested?

New regression test in `pyspark.python.sql.tests` module (tested and confirmed that it fails before my fix).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#17927 from JoshRosen/SPARK-20685.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
…repeated arg.

There's a latent corner-case bug in PySpark UDF evaluation where executing a `BatchPythonEvaluation` with a single multi-argument UDF where _at least one argument value is repeated_ will crash at execution with a confusing error.

This problem was introduced in apache#12057: the code there has a fast path for handling a "batch UDF evaluation consisting of a single Python UDF", but that branch incorrectly assumes that a single UDF won't have repeated arguments and therefore skips the code for unpacking arguments from the input row (whose schema may not necessarily match the UDF inputs due to de-duplication of repeated arguments which occurred in the JVM before sending UDF inputs to Python).

This fix here is simply to remove this special-casing: it turns out that the code in the "multiple UDFs" branch just so happens to work for the single-UDF case because Python treats `(x)` as equivalent to `x`, not as a single-argument tuple.

New regression test in `pyspark.python.sql.tests` module (tested and confirmed that it fails before my fix).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#17927 from JoshRosen/SPARK-20685.

(cherry picked from commit 8ddbc43)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants