-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25811][PySpark] Raise a proper error when unsafe cast is detected by PyArrow #22807
Conversation
Test build #97925 has finished for PR 22807 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @viirya , looks good so far! Were you thinking of putting in a spark config to toggle the safe
flag here or in a follow up? I think we really need that because prior to v0.11.0 it allowed unsafe casts, but now raises an error by default.
python/pyspark/sql/tests.py
Outdated
udf_boolean = df.select(['A']).withColumn('udf', udf('A')) | ||
|
||
# Since 0.11.0, PyArrow supports the feature to raise an error for unsafe cast. | ||
if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked how 0.8.0 is working and it does raise an error for something like overflows, but not truncation like this test. Can you also add a check for overflow not dependent on the version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, let's bump up the minimal required PyArrow and Pandas version up if possible at 3.0 :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler Do you mean this same udf raises an error like overflows? I tried with 0.8.0 but it doesn't? Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I was talking about something like an integer overflow. In pyarrow 0.8.0 it will raise an error:
In [10]: pa.Array.from_pandas(pd.Series([128]), type=pa.int8())
---------------------------------------------------------------------------
ArrowInvalid Traceback (most recent call last)
<ipython-input-10-49026b548de3> in <module>()
----> 1 pa.Array.from_pandas(pd.Series([128]), type=pa.int8())
...
ArrowInvalid: Integer value out of bounds
but in 0.11.1 with safe=False
, it will allow this
In [11]: pa.Array.from_pandas(pd.Series([128]), type=pa.int8(), safe=False)
Out[11]:
<pyarrow.lib.Int8Array object at 0x7f49eebb0f48>
[
-128
]
So I think I was saying you could add a test that makes sure the default behavior is to raise an error on overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the cast from float to integral types are always working without an error in pyarrow < 0.11.
>>> pa.Array.from_pandas(pd.Series([128.0]), type=pa.int8())
<pyarrow.lib.Int8Array object at 0x11c42d940>
[
-128
]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but in pyarrow 0.11.0+ you'd see an error:
>>> import pandas as pd
>>> import pyarrow as pa
>>> pa.__version__
'0.11.1'
>>> pa.Array.from_pandas(pd.Series([128.0]), type=pa.int8())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Floating point value truncated
>>> pa.Array.from_pandas(pd.Series([128.0]), type=pa.int8(), safe=False)
<pyarrow.lib.Int8Array object at 0x7f3ee1a4b868>
[
-128
]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I was thinking how we should handle this behavior change.
We will have the behavior change anyway regardless of the config, right?
safe=True
: we can't use nullable integral typessafe=False
: we can't detect an integer overflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. safe=False
does the type conversion anyway even an overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If using safe=False
by default, it is possible to break user's code without notifications. If using safe=True
by default, it still breaks user's code, but there is error message so it should let user know what's happen.
@BryanCutler Thanks for looking at this! Yea, this is a WIP work for early review and I will add a config to toggle the |
return pa.Array.from_pandas(s, mask=mask, type=t, safe=False) | ||
try: | ||
array = pa.Array.from_pandas(s, mask=mask, type=t, safe=True) | ||
except pa.ArrowException as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler Now it catches ArrowException
.
Sorry for not pushing this for a while. Hope we can make it soon. |
Test build #100917 has finished for PR 22807 at commit
|
Do we have proper way to get spark config value at executor side like serializers here? I found that in |
Test build #100918 has finished for PR 22807 at commit
|
So since pyarrow 0.11, we won't allow nullable integral types by default? |
@ueshin Ideally we should provide a config to turn off this check. But I'm wondering if there is a proper way to obtain config value at the serializers (#22807 (comment)). Do you have some suggestions? |
@viirya We can use |
@ueshin Thanks. I've noticed that way, but wondering if it is only way to do that. I will use it to set safe check. |
Test build #100926 has finished for PR 22807 at commit
|
Test build #100931 has finished for PR 22807 at commit
|
Test build #100935 has finished for PR 22807 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for getting back to this @viirya! Looks pretty good, I just think maybe parse the runner_conf
earlier and use a safe
flag in the ArrowPandasSerializer`. Also, I'm a little concerned about the default value that might break a lot of people's code... but also if it is false by default, then it will allow overflows when it didn't before, so I'm not sure what's best.
python/pyspark/worker.py
Outdated
@@ -253,7 +253,7 @@ def read_udfs(pickleSer, infile, eval_type): | |||
|
|||
# NOTE: if timezone is set here, that implies respectSessionTimeZone is True | |||
timezone = runner_conf.get("spark.sql.session.timeZone", None) | |||
ser = ArrowStreamPandasSerializer(timezone) | |||
ser = ArrowStreamPandasSerializer(timezone, runner_conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's slightly better to parse the runner_conf
for the needed options here and pass in the required flags like we do with timezone
, instead of passing around the runner_conf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I passed in the required flag now.
python/pyspark/sql/session.py
Outdated
batches = [_create_batch([(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)], | ||
timezone) | ||
timezone, runner_conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, it doesn't really make sense to use a runner_conf
, so it would be better to just pass in the flag
python/pyspark/serializers.py
Outdated
return pa.Array.from_pandas(s, mask=mask, type=t, safe=False) | ||
|
||
enabledArrowSafeTypeCheck = \ | ||
runner_conf.get("spark.sql.execution.pandas.arrowSafeTypeConversion", "true") == 'true' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want to tack on .lower()
to ensure you are checking lower case
res = df.select(int_f(col('int'))) | ||
self.assertEquals(df.collect(), res.collect()) | ||
with self.sql_conf({ | ||
"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and other tests fail if arrowSafeTypeConversion=True
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please see @ueshin's comment #22807 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it's because of the NULL values
"when detecting unsafe type conversion. When false, disabling Arrow's type " + | ||
"check and do type conversions anyway.") | ||
.booleanConf | ||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If enabling by default causes a lot of the current tests to fail, it might also do the same for users - maybe we should disable by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you said, if it is false by default, it allows overflow. cc @ueshin @HyukjinKwon What do you think? Which one is better default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd favor true by default. Do we even need this flag? As in many such cases, I'm just not clear a) how a user would find this option and b) when they would disable it. Disabling allows something to continue that is going to also fail or give an incorrect answer, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the big issue with this is when NULL values are introduced in an integer column. Pandas will automatically convert these to floating-points to represent the NULLs, then when Arrow casts it back to integer, it will raise an error due to truncation - I don't think Arrow checks the actual values, but maybe it should? For example, with safe=True:
>>> pa.Array.from_pandas(pd.Series([1, None]), type=pa.int32(), safe=True)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Floating point value truncated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Spark check for these types of errors with standard udfs, when not using Arrow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @kszucs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have left some comments about that JIRA together tho rather than making everybody click and read the whole issue. So, NULL -> Integer issue will be fixed in Arrow 0.12.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean leaving comments in this SQL config doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No .. I was referring #22807 (comment) ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, NULL -> Integer issue will be fixed in Arrow 0.12.0?
Yes, I verified this and ran the unit tests on the release
val PANDAS_ARROW_SAFE_TYPE_CONVERSION = | ||
buildConf("spark.sql.execution.pandas.arrowSafeTypeConversion") | ||
.internal() | ||
.doc("When true, enabling Arrow do safe type conversion check when converting" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe reword to "When true, Arrow will perform safe type conversion when converting " +
buildConf("spark.sql.execution.pandas.arrowSafeTypeConversion") | ||
.internal() | ||
.doc("When true, enabling Arrow do safe type conversion check when converting" + | ||
"Pandas.Series to Arrow Array during serialization. Arrow will raise errors " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't capitalize Array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
.internal() | ||
.doc("When true, enabling Arrow do safe type conversion check when converting" + | ||
"Pandas.Series to Arrow Array during serialization. Arrow will raise errors " + | ||
"when detecting unsafe type conversion. When false, disabling Arrow's type " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be good to elaborate what is an unsafe conversion, e.g. overflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
Thanks @BryanCutler for continuing to review this. I addressed above comments. We need to decide the default config value. |
Test build #100958 has finished for PR 22807 at commit
|
So, in these situations, we agree some error should occur. The current one is an overflow error -- right? or does it silently overfloat? Non-arrow UDFs seem to do the latter. Based on that, I'm not clear the Arrow behavior should be different, especially if it already matches that behavior, and is what Arrow users expect. No, this isn't a case for a flag. That is a failure to decide, punted to the user. |
@srowen we discussed adding this flag a while ago when pyarrow 0.11.1 made this option which seemed to change default behavior in pyspark. Our intention was to preserve default behavior in pyspark and provide the user a config to change it if needed. It was only recently we learned there was a behavior change either way the config is set. It does seem like there is a bug in Arrow that is affecting this too, https://issues.apache.org/jira/browse/ARROW-4258, although we still should make a patch for v0.11.1 |
@BryanCutler @srowen Based on the summary, to set a false for the config value, I think we should also leave some words in migration guide for the behavior change. What do you think? |
To be clear, if the new setting is false, then that enables the 'silent' behavior? that seems like the right default. Yes, can't hurt to note this in release notes ('release-notes' label in JIRA and 'Docs text') |
I think a default setting of false and a note in the migration guide sound like the best option. |
Changed default config value to false and added a note to migration guide. Please take a look when you have time. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good pending tests
Test build #101473 has finished for PR 22807 at commit
|
@@ -197,6 +197,66 @@ def foofoo(x, y): | |||
).collect | |||
) | |||
|
|||
def test_pandas_udf_detect_unsafe_type_conversion(self): | |||
from distutils.version import LooseVersion | |||
from pyspark.sql.functions import pandas_udf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: after the unit test reorg, the pandas_udf
import is at the top so it's not needed here or in the other test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for just one minor nit
</th> | ||
</tr> | ||
</table> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks for adding this!
</tr> | ||
<tr> | ||
<th> | ||
<b>version > 0.11.0, arrowSafeTypeConversion=true</b> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just quick question (don't block by this). So, do we target to use true
in the near future? Looks it was false
by default to prevent behaviour changes, and in particular due to of NULL -> integer issue (which is fixed in Arrow 0.12.0).
Sounds like we can make it true and remove this configuration when we set the minimal Arrow version to 0.12.0. Am I correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this config can be kept even we set the minimal Arrow version to 0.12.0. If anything goes wrong, users still can disable the type check.
For now, there isn't consistent behavior across integer overflow and float point truncation. Either true
or false
causes behavior change. It is false
by default to make it consistent to non-arrow UDFs.
If we are going to change it to true
in the future, isn't a behavior change again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmhmm .. yea .. Good point about consistency with the regular UDFs. I was thinking of 1.
removing such configurations out eventually (personally I don't like the bunch of configurations we have currently ..), 2. making all UDFs to expect the exact types (<- not sure yet it needs discussion). Yes, we can talk later ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think the config should stay too since it is only able to be set through pyarrow
Test build #101501 has finished for PR 22807 at commit
|
retest this please. |
# Disabling Arrow safe type check. | ||
with self.sql_conf({ | ||
"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): | ||
df.select(['A']).withColumn('udf', udf('A')).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we asset the result too?
|
||
values = [1.0] * 3 | ||
pdf = pd.DataFrame({'A': values}) | ||
df = self.spark.createDataFrame(pdf).repartition(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry if I missed something. Why should we repartition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought I was writing this to make sure all values are in single partition so it matches the length of returned Pandas Series. This can be simplified. We can do this if we touch the code here next time.
import pandas as pd | ||
import pyarrow as pa | ||
|
||
df = self.spark.range(0, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit .. : spark.range(0)
.
# Overflow cast causes an error. | ||
with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): | ||
with self.assertRaisesRegexp(Exception, | ||
"Integer value out of bounds"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks it can be inlined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too
Test build #101506 has finished for PR 22807 at commit
|
Merged to master. |
Thanks guys! @HyukjinKwon I will make a followup to address above minor comments. Thanks. |
Eh, it's okie. minor is minor. let's fix it later when we touch that code. |
@HyukjinKwon ok. no problem. |
Thanks @viirya ! |
…ted by PyArrow ## What changes were proposed in this pull request? Since 0.11.0, PyArrow supports to raise an error for unsafe cast ([PR](apache/arrow#2504)). We should use it to raise a proper error for pandas udf users when such cast is detected. Added a SQL config `spark.sql.execution.pandas.arrowSafeTypeConversion` to disable Arrow safe type check. ## How was this patch tested? Added test and manually test. Closes apache#22807 from viirya/SPARK-25811. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Since 0.11.0, PyArrow supports to raise an error for unsafe cast (PR). We should use it to raise a proper error for pandas udf users when such cast is detected.
Added a SQL config
spark.sql.execution.pandas.arrowSafeTypeConversion
to disable Arrow safe type check.How was this patch tested?
Added test and manually test.