-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-22978] [PySpark] Register Vectorized UDFs for SQL Statement #20171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #85747 has finished for PR 20171 at commit
|
|
Test build #85748 has finished for PR 20171 at commit
|
|
Test build #85750 has finished for PR 20171 at commit
|
|
Test build #85754 has finished for PR 20171 at commit
|
|
retest this please |
|
Test build #85759 has finished for PR 20171 at commit
|
|
Test build #85764 has finished for PR 20171 at commit
|
The result is wrong. cc @icexelloss @BryanCutler @ueshin @cloud-fan Should we issue an exception in this case? Just opened a JIRA https://issues.apache.org/jira/browse/SPARK-22980 |
|
I think that's because we expect Pandas's |
python/pyspark/sql/catalog.py
Outdated
| if hasattr(f, 'asNondeterministic'): | ||
| udf = UserDefinedFunction(f.func, returnType=returnType, name=name, | ||
| evalType=PythonEvalType.SQL_BATCHED_UDF, | ||
| evalType=f.evalType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't started to review yet as it looks WIP but let's don't forget to fail fast when it's not a PythonEvalType.SQL_BATCHED_UDF as we discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when it's not a PythonEvalType.SQL_BATCHED_UDF
->
when it's neither aPythonEvalType.SQL_BATCHED_UDFnorPythonEvalType.SQL_PANDAS_SCALAR_UDF, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I think that's right.
python/pyspark/sql/tests.py
Outdated
| from pyspark.rdd import PythonEvalType | ||
| import random | ||
| randomPandasUDF = pandas_udf( | ||
| lambda x: random.randint(6, 6) + x, StringType()).asNondeterministic() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UDF returnType doesn't match the returnType in registerFunction, what's the expected behavior in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question, also cc @ueshin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the following strategy?
- make the default value for
returnTypeNone. - if
returnType is Nonefor a Python function, useStringTypeas the same as the current default value. - if
returnType is Nonefor UDF, use the UDF'sreturnType, otherwise respect the user specifiedreturnType(but with warning?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure about 3, I think return type is a property of the defined UDF, not a register-time stuff. So if users are registering a UDF(not python function), it's not allowed to specify the returnType parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me too. Another alternative is to have a registerUDF(name, udf) instead of having registerFunction work with both lambda function and UDFs. This way we don't need to have the confusing situation with returnType arg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good.
python/pyspark/sql/tests.py
Outdated
| def test_register_vectorized_udf_basic(self): | ||
| from pyspark.sql.functions import pandas_udf | ||
| from pyspark.rdd import PythonEvalType | ||
| twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is wrong: there is only one arg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
twoArgsPandasUDF -> two_args_pandas_udf too.
python/pyspark/sql/tests.py
Outdated
| def test_register_vectorized_udf_basic(self): | ||
| from pyspark.sql.functions import pandas_udf | ||
| from pyspark.rdd import PythonEvalType | ||
| twoArgsPandasUDF = pandas_udf(lambda x: len(x), IntegerType()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
x.str.len() instead of len(x)?
|
Let me close this PR and open a new PR to introduce a new function |
|
Test build #86100 has finished for PR 20171 at commit
|
|
Test build #86101 has finished for PR 20171 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine to me otherwise.
python/pyspark/sql/catalog.py
Outdated
| PythonEvalType.SQL_PANDAS_SCALAR_UDF]: | ||
| raise ValueError( | ||
| "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF") | ||
| if returnType is not None and returnType != f.returnType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just simply exclude returnType != f.returnType? I think return type could be a string too so this case might be failed:
I just double checked:
from pyspark.rdd import PythonEvalType
from pyspark.sql.functions import pandas_udf, col, expr
original_add = pandas_udf(lambda x, y: x + y, "integer")
spark.udf.register("add", original_add, "integer")ValueError: Invalid returnType: the provided returnType (integer) is inconsistent with the returnType
(IntegerType) of the provided f. When the provided f is a UDF, returnType is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not get your point. If we just check returnType != f.returnType, it will fail, because None != f.returnType is always true.
python/pyspark/sql/context.py
Outdated
| >>> from pyspark.sql.types import IntegerType | ||
| >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() | ||
| >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType()) | ||
| >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to keep sqlContext.registerFunction as was? The documentation will show the examples for SQLContext.registerFunction API ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sqlContext has been deprecated since 2.0. SparkSession should be the default entrance. Here, the example is just to show the way we recommend to the users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that way, we should replace sqlContext to spark. It's for testing purpose too as these are actually ran. Also, we should leave a note that it's an alias for udf.register too with a warn from warning for an IDE to detect deprecated methods and for users to see the warning. If we will just have an exactly same doc, we can simply reassign __doc__ as suggested by @ueshin and @icexelloss. Simplest way is just to leave as was.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can submit a separate PR for it? For testing purpose, we should do it in a test suite instead of using doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doctest is for testing purpose too. I intended to do this in a separate PR and that's why I suggest to leave it as was.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean it doesn't completely cover the concern:
sqlContexthas been deprecated since 2.0. SparkSession should be the default entrance
and this change doesn't completely replace it too. If it's meant to be separate, we should better leave this change out. What I was wondering is why this partially fixes this concern in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I beg to just leave it as was ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can revert it back if you want to take it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I want to take it. Thanks.
python/pyspark/sql/catalog.py
Outdated
| "Invalid returnType: the provided returnType (%s) is inconsistent with " | ||
| "the returnType (%s) of the provided f. When the provided f is a UDF, " | ||
| "returnType is not needed." % (returnType, f.returnType)) | ||
| registerUDF = UserDefinedFunction(f.func, returnType=f.returnType, name=name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
registerUDF -> register_udf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the naming convention in PySpark? Three different styles are found. Pretty confusing.
python/pyspark/sql/catalog.py
Outdated
| registerUDF = UserDefinedFunction(f.func, returnType=f.returnType, name=name, | ||
| evalType=f.evalType, | ||
| deterministic=f.deterministic) | ||
| returnUDF = f |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returnUDF -> return_udf
python/pyspark/sql/tests.py
Outdated
| self.assertEqual(row[0], 5) | ||
| self.assertEqual(row[0], u'5') | ||
|
|
||
| def test_udf_using_registerFunction_incompatibleTypes(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about test_udf_registration_return_type_mismatch?
python/pyspark/sql/tests.py
Outdated
| from pyspark.sql.functions import pandas_udf | ||
| from pyspark.rdd import PythonEvalType | ||
| import random | ||
| randomPandasUDF = pandas_udf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
randomPandasUDF -> random_pandas_udf
python/pyspark/sql/tests.py
Outdated
| lambda x: random.randint(6, 6) + x, IntegerType()).asNondeterministic() | ||
| self.assertEqual(randomPandasUDF.deterministic, False) | ||
| self.assertEqual(randomPandasUDF.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) | ||
| nondeterministicPandasUDF = self.spark.catalog.registerFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nondeterministicPandasUDF -> nondeterministic_pandas_udf
python/pyspark/sql/tests.py
Outdated
| [StructField('id', LongType()), | ||
| StructField('v1', DoubleType())]), | ||
| PandasUDFType.GROUP_MAP | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could simplify this to
foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUP_MAP)|
Test build #86121 has finished for PR 20171 at commit
|
python/pyspark/sql/context.py
Outdated
| >>> from pyspark.sql.types import IntegerType | ||
| >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() | ||
| >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType()) | ||
| >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that way, we should replace sqlContext to spark. It's for testing purpose too as these are actually ran. Also, we should leave a note that it's an alias for udf.register too with a warn from warning for an IDE to detect deprecated methods and for users to see the warning. If we will just have an exactly same doc, we can simply reassign __doc__ as suggested by @ueshin and @icexelloss. Simplest way is just to leave as was.
python/pyspark/sql/context.py
Outdated
| >>> from pyspark.sql.types import IntegerType | ||
| >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() | ||
| >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType()) | ||
| >>> newRandom_udf = sqlContext.udf.register("random_udf", random_udf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newRandom_udf -> new_random_udf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's a bit confusing. It's because we started to have the same names in the API. Similar things also apply to R. We follow PEP 8 with few exceptions. It should be with underscore in general if possible. There's an example to refer , threading.py in Python. It also happened to have the similar case with us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine about the naming convention. Do we have a style recommendation for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by a style recommendation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, do you maybe literally mean like Scala style guide? It's basically PEP 8.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
http://spark.apache.org/contributing.html I just checked it. It is already documented.
python/pyspark/sql/catalog.py
Outdated
| "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF") | ||
| if returnType is not None and not isinstance(returnType, DataType): | ||
| returnType = _parse_datatype_string(returnType) | ||
| if returnType is not None and returnType != f.returnType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean we can simply throw an exception always if returnType is given (not None) but f is a udf. I thought we try to resemable an overloading forregister(name, f).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you cc other guys @gatorsmile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure which one is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are trying to avoid to set returnType at register time. Current way appearently allows to take returnType when they are matched. Also, the suggestion allows resemble the overloading of the Scala version we talked. I would like to get this into 2.3 and push forward.
Could you maybe eblabourate why you are not sure? Let me try to explain it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not saying we should have the same message. I am trying to persuade you to throw an error in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it common in our current PySpark impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might miss something but I think it's okay to take returnType parameter optionally if the value is the same as the udf's.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional value is okay but I mean it's better to throw an exception. I am not seeing the advantage of supporting this optionally. @ueshin do you think it's better to support this case?
I am less sure of the point of supporting returnType with UDF when we are disallowed to change. It causes confusion like we allow it but then if the type is different, we will issue an exception.
Is it more important to allow this corner case than we make the APIs clear as if we have def register(name, f) # for UDF alone? We can keep clear about disallowing returnType at register time too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean. Now I became neutral but slightly on your side.
| ... def add_one(x): | ||
| ... return x + 1 | ||
| ... | ||
| >>> _ = spark.udf.register("add_one", add_one) # doctest: +SKIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to return to a underscore placeholder? It might seem confusing to users if not required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to avoid generating the random hex value returned by PySpark. You can try spark.udf.register("add_one", add_one)
With the underscore placeholder, we can remove # doctest: +SKIP
python/pyspark/sql/catalog.py
Outdated
| >>> from pyspark.sql.types import IntegerType | ||
| >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() | ||
| >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) | ||
| >>> newRandom_udf = spark.udf.register("random_udf", random_udf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new_random_udf?
|
Test build #86165 has finished for PR 20171 at commit
|
|
Test build #86166 has finished for PR 20171 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for some comments. Those will be addressed in the separate pr?
| In addition to a name and the function itself, the return type can be optionally specified. | ||
| When the return type is not given it default to a string and conversion will automatically | ||
| be done. For any other return type, the produced object must match the specified type. | ||
| :func:`spark.udf.register` is an alias for :func:`spark.catalog.registerFunction`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:func:spark.catalog.registerFunction is an alias for :func:spark.udf.register. ?
| In addition to a name and the function itself, the return type can be optionally specified. | ||
| When the return type is not given it default to a string and conversion will automatically | ||
| be done. For any other return type, the produced object must match the specified type. | ||
| :func:`spark.udf.register` is an alias for :func:`sqlContext.registerFunction`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:func:sqlContext.registerFunction is an alias for :func:spark.udf.register. ?
| original_add = pandas_udf(lambda x, y: x + y, IntegerType()) | ||
| self.assertEqual(original_add.deterministic, True) | ||
| self.assertEqual(original_add.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) | ||
| new_add = self.spark.catalog.registerFunction("add1", original_add) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.udf.register instead of spark.catalog.registerFunction?
| with QuietTest(self.sc): | ||
| with self.assertRaisesRegexp(ValueError, 'f must be either SQL_BATCHED_UDF or ' | ||
| 'SQL_PANDAS_SCALAR_UDF'): | ||
| self.spark.catalog.registerFunction("foo_udf", foo_udf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too.
| 1) When f is a Python function, `returnType` defaults to a string. The produced object must | ||
| match the specified type. 2) When f is a :class:`UserDefinedFunction`, Spark uses the return | ||
| type of the given UDF as the return type of the registered UDF. The input parameter | ||
| `returnType` is None by default. If given by users, the value must be None. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would simply say that data type is disallowed to set to returnType rather then None should be set.
| deterministic=f.deterministic) | ||
| if returnType is not None: | ||
| raise TypeError( | ||
| "Invalid returnType: None is expected when f is a UserDefinedFunction, " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too, I think here we should say returnType is disallowed to be set when f is a UserDefinedFunction.
|
Will try to handle with doc and minor stuff soon within few days. Seems it might be a bit more tricky then I thought. |
|
Merged to master and branch-2.3. |
## What changes were proposed in this pull request?
Register Vectorized UDFs for SQL Statement. For example,
```Python
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> pandas_udf("integer", PandasUDFType.SCALAR)
... def add_one(x):
... return x + 1
...
>>> _ = spark.udf.register("add_one", add_one)
>>> spark.sql("SELECT add_one(id) FROM range(3)").collect()
[Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
```
## How was this patch tested?
Added test cases
Author: gatorsmile <gatorsmile@gmail.com>
Closes #20171 from gatorsmile/supportVectorizedUDF.
(cherry picked from commit b85eb94)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
What changes were proposed in this pull request?
Register Vectorized UDFs for SQL Statement. For example,
How was this patch tested?
Added test cases