Skip to content
Permalink
Browse files

[SPARK-23233][PYTHON] Reset the cache in asNondeterministic to set de…

…terministic properly

## What changes were proposed in this pull request?

Reproducer:

```python
from pyspark.sql.functions import udf
f = udf(lambda x: x)
spark.range(1).select(f("id"))  # cache JVM UDF instance.
f = f.asNondeterministic()
spark.range(1).select(f("id"))._jdf.logicalPlan().projectList().head().deterministic()
```

It should return `False` but the current master returns `True`. Seems it's because we cache the JVM UDF instance and then we reuse it even after setting `deterministic` disabled once it's called.

## How was this patch tested?

Manually tested. I am not sure if I should add the test with a lot of JVM accesses with the intetnal stuff .. Let me know if anyone feels so. I will add.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20409 from HyukjinKwon/SPARK-23233.

(cherry picked from commit 3227d14)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information...
HyukjinKwon authored and gatorsmile committed Jan 27, 2018
1 parent 65600bf commit 3b6fc286d105ae7de737c46e50cf941e6831ab98
Showing with 16 additions and 0 deletions.
  1. +13 −0 python/pyspark/sql/tests.py
  2. +3 −0 python/pyspark/sql/udf.py
@@ -435,6 +435,19 @@ def test_nondeterministic_udf2(self):
pydoc.render_doc(random_udf1)
pydoc.render_doc(udf(lambda x: x).asNondeterministic)

def test_nondeterministic_udf3(self):
# regression test for SPARK-23233
from pyspark.sql.functions import udf
f = udf(lambda x: x)
# Here we cache the JVM UDF instance.
self.spark.range(1).select(f("id"))
# This should reset the cache to set the deterministic status correctly.
f = f.asNondeterministic()
# Check the deterministic status of udf.
df = self.spark.range(1).select(f("id"))
deterministic = df._jdf.logicalPlan().projectList().head().deterministic()
self.assertFalse(deterministic)

def test_nondeterministic_udf_in_aggregate(self):
from pyspark.sql.functions import udf, sum
import random
@@ -181,6 +181,9 @@ def asNondeterministic(self):
.. versionadded:: 2.3
"""
# Here, we explicitly clean the cache to create a JVM UDF instance
# with 'deterministic' updated. See SPARK-23233.
self._judf_placeholder = None
self.deterministic = False
return self

0 comments on commit 3b6fc28

Please sign in to comment.
You can’t perform that action at this time.