-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34799][PYTHON][SQL] Return User-defined types from Pandas UDF #31735
Conversation
ok to test |
jenkins retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #135744 has finished for PR 31735 at commit
|
Test build #135740 has finished for PR 31735 at commit
|
@HyukjinKwon would you mind to take a look? |
@eddyxu Could you please check the failed test? https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135740/consoleText:
|
@attilapiros Thanks for point this out. Looking. |
Kubernetes integration test starting |
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test status failure |
Test build #135773 has finished for PR 31735 at commit
|
@@ -89,9 +90,35 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute] | |||
|
|||
columnarBatchIter.flatMap { batch => | |||
val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType()) | |||
assert(outputTypes == actualDataTypes, "Invalid schema from pandas_udf: " + | |||
s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}") | |||
assert(plainSchema(outputTypes) == plainSchema(actualDataTypes), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we wouldn't need to call plainSchema(actualDataTypes)
because Arrow schema cannot contain PySpark's UDF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
@@ -54,6 +54,9 @@ class ArrowPythonRunner( | |||
"Pandas execution requires more than 4 bytes. Please set higher buffer. " + | |||
s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.") | |||
|
|||
/** This is a private key */ | |||
private val PANDAS_UDF_RETURN_TYPE_JSON = "spark.sql.execution.pandas.udf.return.type.json" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid sending this together with configurations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, do you suggest that we do not send schema via conf, or do not send schema at all?
I see the schema is valuable on the worker for 2 purposes:
1). it can amortize the overhead of checking each row of a returned pandas.{Series/DataFrame} for its schema. By detecting whether we have UDT in the schema before running @pandas_udf
, we can avoid to invoke the expensive code path for the existing plain schema case.
and 2) this will be useful for passing UDT into pandas_udf
, as wired format is on pyarrow schema, the wire data needs to be reconstruct before sending into the pandas_udf
in the worker.
Also as you suggested below, this schema can be used to generate a function that avoid type dispatch when doing UDT->struct conversion.
An alternative implementation could be:
dataOut.writeInt(conf)
for ((k, v) <- conf) {
PythonRDD.writeUTF(k, dataOut)
PythonRDD.writeUTF(v, dataOut)
}
PythonRDD.writeUTF(schema_json, dataOut)
PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
Do we have concern about wire compatibility here? IIUC worker.py
should be deployed together with ArrowEvalPythonExec
, it might not be an issue.
@@ -183,6 +212,21 @@ def create_array(s, t): | |||
raise e | |||
return array | |||
|
|||
def to_plain_struct(cell): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance here will be very bad. We should create a function based on the type, and avoid type-dispatching for every value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me see what i can do.
import org.apache.spark.sql.util.ArrowUtils | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove all these unreleased changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
@@ -24,9 +24,10 @@ import org.apache.spark.api.python.ChainedPythonFunctions | |||
import org.apache.spark.sql.catalyst.InternalRow | |||
import org.apache.spark.sql.catalyst.expressions._ | |||
import org.apache.spark.sql.execution.SparkPlan | |||
import org.apache.spark.sql.types.StructType | |||
import org.apache.spark.sql.types._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and avoid wildcard import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
@eddyxu, how does it work with regular Python UDF? Looks like the performance here will be very bad. Can you do a quick benchmark? cc @BryanCutler and @ueshin too FYI |
Thanks for the reviews, @HyukjinKwon. TLDR, this PR should not introduce performance regression for non-
Performance wise , i will do some benchmarks and look into a way to erase UDT without much type dispatching. |
Test build #135775 has finished for PR 31735 at commit
|
I am sorry I wasn't clear. I meant to compare how it works in regular Python UDF and the pandas UDFs. I would be great if we can use a similar approach for both. Furthermore, we will probably have to do it for |
BTW, thanks for working on this :-) @eddyxu. |
Kubernetes integration test starting |
Kubernetes integration test status success |
|
||
arrs = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu Here is the refactored impl:
- use
create_arrs_names
- Use
dt: Optional[DataType]
to separate the logic - Move down
Make input conform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm. we cannot extract these additional code for UDT outside _create_batch
? I meant it like this;
if <series has udt>:
series = _preprocess_for_udt(series)
_create_batch(series)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this part can be improved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kubernetes integration test starting |
Kubernetes integration test status failure |
@eddyxu I wrote a UDT with Timestamp, but failed to make it work. See the demo pr: eddyxu#4 For ExampleBox, serialize to list works fine. But for ExamplePointWithTimeUDT, to make Do we need to make UDT with Timestamp work in this PR? How about postpone it in another JIRA ticket? @maropu What's your opinion? I do not want to make this PR too complicated and hard to review. |
Test build #136390 has started for PR 31735 at commit |
Kubernetes integration test starting |
Kubernetes integration test status success |
"but got: %s" % str(type(s))) | ||
if isinstance(dt, DataType): | ||
type_not_match = "dt must be instance of StructType when t is pyarrow struct" | ||
assert isinstance(dt, StructType), type_not_match |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"dt must be StructType as t is pyarrow struct"
Remove temp variable type_not_match
with the above shortened error msg. (will change it after the next round of code review)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Test build #136412 has started for PR 31735 at commit |
Could you take another round of code review? @HyukjinKwon @maropu @ueshin Github Action are queueing and waiting, I will improve this PR based on your review together with the known-nit I reviewed by myself. |
Kubernetes integration test starting |
sgtm. I think it is better to make the PR simpler, so how about focusing on supporting a simple case ( |
|
||
# SPARK-34799 | ||
def test_user_defined_types_in_array(self): | ||
@pandas_udf(ArrayType(ExamplePointUDT())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests for @pandas_udf(ArrayType(ExamplePointUDT(), False))
? It seems it doesn't work well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postponed
s = s.apply(dt.serialize) | ||
elif isinstance(dt, ArrayType) and isinstance(dt.elementType, UserDefinedType): | ||
udt = dt.elementType | ||
s = s.apply(lambda x: [udt.serialize(f) for f in x]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add assert
to check if dt
is UDT
or Array(UDT)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is extracted, see https://github.com/apache/spark/pull/32026/files
|
||
def create_array(s, t): | ||
mask = s.isnull() | ||
def create_array(s, t: pa.DataType, dt: Optional[DataType] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we don't use type hints for internal funcs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…rrow support enabled (#6)
toPandas and createDataFrame is supported in the latest commits. See dca35df Just learned about the SPIP vote by you: http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-SPIP-Support-pandas-API-layer-on-PySpark-td30996.html I wonder if there are any overlap for this PR and the SPIP. I'm new to PySpark. My previous experience/contribution of Apache Spark mainly focused on Yarn/SQL. If there are any overlap or conflicts, could give any feedback.
Thanks for your reply and suggestion. Supporting And I will try to submit a good and small/minimum first splitted PR for you to review with a better and cleaner implemenation of Here is my plan:
|
Test build #136757 has finished for PR 31735 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Hello, this looks like very good work. I'm having some trouble reading the code -- is there a possibility that these UDTs could leverage https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extension-types when they're in Pandas/Python to skip a costly conversion to Object that currently happens? |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR allows to return User-defined types (UDT) from PandasUDF.
This PR converts UDT into its corresponding
UDT.sqlType / StructType
before sending the results from PySpark worker to JVM. In JVM , it relaxes the requirements for schema checking, so that Spark sql would consider UDT is compatible to itssqlType
type.Why are the changes needed?
We have use cases that builds UDT to present semantic meanings of results. We use pandas UDF because certain computation (i.e., model inference) requiring expensive initialization, which makes Iterator based
PandasUDF
a desired implementation:Does this PR introduce any user-facing change?
User can start to specify UDT in
pandas_udf
's returnTypeHow was this patch tested?
This patch includes 3 tests returning UDT in different forms