-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-35344][PYTHON] Support creating a Column of numpy literals in pandas API on Spark #32955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #139943 has finished for PR 32955 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #140092 has finished for PR 32955 at commit
|
|
Test build #140095 has finished for PR 32955 at commit
|
8876e6d to
623cc1d
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #140104 has finished for PR 32955 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xinrong-databricks, I think we can just support this natively all across PySpark. Can you add an input converter here (
spark/python/pyspark/sql/types.py
Lines 1608 to 1622 in 20750a3
| class DatetimeConverter(object): | |
| def can_convert(self, obj): | |
| return isinstance(obj, datetime.datetime) | |
| def convert(self, obj, gateway_client): | |
| Timestamp = JavaClass("java.sql.Timestamp", gateway_client) | |
| seconds = (calendar.timegm(obj.utctimetuple()) if obj.tzinfo | |
| else time.mktime(obj.timetuple())) | |
| t = Timestamp(int(seconds) * 1000) | |
| t.setNanos(obj.microsecond * 1000) | |
| return t | |
| # datetime is a subclass of date, we should register DatetimeConverter first | |
| register_input_converter(DatetimeConverter()) | |
| register_input_converter(DateConverter()) |
Also, I think we can simplify it by item() (https://stackoverflow.com/a/11389998/2438480) with np.generic type check (https://numpy.org/doc/stable/reference/arrays.scalars.html).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @mengxr too FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly, I am working on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding an input converter might be tricky in this case.
Py4j takes the instance of java.lang.Integer as int rather than a JavaObject, but the return value of def convert is expected to be a JavaObject.
Unfortunately, there is also no wrapper in java.sql package for numeric values (as java.sql.Timestamp). And instantiating a JavaObject out of a value seems impossible according to its constructor.
Do you have insights on that by any chance? @HyukjinKwon @ueshin
Attached please find my pseudo-code
class SomeConverter(object):
def can_convert(self, obj):
import numpy as np
return isinstance(obj, np.generic)
def convert(self, obj, gateway_client):
Integer = JavaClass("java.lang.Integer", gateway_client)
return Integer.valueOf(obj.item()) # This is an `int`
...
register_input_converter(SomeConverter())And the exception stack trace looks like
Traceback (most recent call last):
File "/Users/xinrong.meng/spark/python/pyspark/pandas/tests/data_type_ops/test_udt_ops.py", line 43, in test
print(F.lit(np.int64(1)))
File "/Users/xinrong.meng/spark/python/pyspark/sql/functions.py", line 100, in lit
return col if isinstance(col, Column) else _invoke_function("lit", col)
File "/Users/xinrong.meng/spark/python/pyspark/sql/functions.py", line 58, in _invoke_function
return Column(jf(*args))
File "/miniconda2/envs/pyspark-dev-pd-1.1.5/lib/python3.9/site-packages/py4j/java_gateway.py", line 1313, in __call__
temp_arg._detach()
AttributeError: 'int' object has no attribute '_detach'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just found something: https://www.py4j.org/advanced_topics.html#boxing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we may support creating a Column of numpy literal value in pandas-on-Spark first. We might need more research on supporting that in PySpark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with the current implementation since sounds like the converter is difficult.
I'd leave this to @HyukjinKwon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okie, I am fine as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
623cc1d to
c13414e
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #140158 has finished for PR 32955 at commit
|
c13414e to
23982c1
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #140287 has finished for PR 32955 at commit
|
23982c1 to
73c43eb
Compare
|
Thanks! merging to master. |
|
@xinrong-databricks Please file follow-up tickets if needed. Thanks! |
|
Thanks @ueshin! I will file follow-up tickets. |
This is more a bug or safeguard. After conversion of arguments (via our `Converter.convert` interface in Py4J), the returned argument might not be a plain `JavaObject`. For example, `JavaObject(java.lang.Integer)` would be converted to `int` automatically, see also #163. However, the current codebase requires it to be a `JavaObject` by assuming `_detach` method exists (to garbage collect the instance). In fact, calling a Java method with these Python primitives are valid in Py4J, so it makes sense to allow passing returning primitive types via `Converter.convert`. Therefore, this PR proposes to call `_detach` only when it exists, and delegates the type checking into actual method invocation that is consistent with calling the usual JVM methods via Py4J. I manually tested, and will add the integration test into PySpark side. It's a bit tricky to add a unittest, and it will be tested together with PySpark. See also apache/spark#32955 (comment)
What changes were proposed in this pull request?
The PR is proposed to support creating a Column of numpy literal value in pandas-on-Spark. It consists of three changes mainly:
litfunction defined inpyspark.pandas.spark.functionsto support numpy literals input.F.litbySF.lit, that is, uselitfunction defined inpyspark.pandas.spark.functionsrather thanlitfunction defined inpyspark.sql.functionsto allow creating columns out of numpy literals.isinmethodNon-goal:
to_replaceparameter inreplaceAPI). This PR doesn't aim to adjust all of them. This PR adjustsisinonly, because the PR is inspired by that (as AttributeError: 'numpy.int64' object has no attribute '_get_object_id' databricks/koalas#2161).Why are the changes needed?
Spark (
litfunction defined inpyspark.sql.functions) doesn't support creating a Column out of numpy literal value.So
litfunction defined inpyspark.pandas.spark.functionsis adjusted in order to support that in pandas-on-Spark.Does this PR introduce any user-facing change?
Yes.
Before:
After:
How was this patch tested?
Unit tests.
Keyword: SPARK-35337