Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Add temporary struct col in pyspark backend to ensure that UDFs are e… #2657

Merged

Conversation

icexelloss
Copy link
Contributor

@icexelloss icexelloss commented Mar 3, 2021

…xecuted once

What change is proposed

Spark sometimes execute the UDF multiple times if

  • The output of the UDF is a struct type
  • The struct column is not explicit assigned the Spark DataFrame

This change ensures that struct column is always assigned to the Spark DataFrame before destructing to ensure exact once execution.

How is this tested

Add test for exact once execution verification under test_vectorized_udf.py

@icexelloss icexelloss force-pushed the pyspark-udf-struct-selection branch from 6d0c322 to 30b05c3 Compare March 4, 2021 00:01
Copy link
Contributor

@jreback jreback left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test for this? only on spark?

@jreback jreback added expressions Issues or PRs related to the expression API window functions Issues or PRs related to window functions labels Mar 4, 2021
@icexelloss icexelloss added pyspark The Apache PySpark backend and removed expressions Issues or PRs related to the expression API window functions Issues or PRs related to window functions labels Mar 4, 2021
@icexelloss icexelloss changed the title Add temporary struct col in pyspark backend to ensure that UDFs are e… [BUG] Add temporary struct col in pyspark backend to ensure that UDFs are e… Mar 4, 2021
@icexelloss icexelloss changed the title [BUG] Add temporary struct col in pyspark backend to ensure that UDFs are e… BUG: Add temporary struct col in pyspark backend to ensure that UDFs are e… Mar 4, 2021
@icexelloss
Copy link
Contributor Author

@jreback Added tests. The change is for spark only. And new tests are for both pandas and spark.

@jreback jreback added this to the Next release milestone Mar 4, 2021
ci/deps/pyspark.yml Show resolved Hide resolved
output_type=dt.Struct(['col1', 'col2'], [dt.double, dt.double]),
)
def add_one_struct_exact_once(v):
print(v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra print

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha my bad. Removed.

print(v)
key = v.iloc[0]
path = Path(f"{tempdir}/{key}")
assert not path.exists()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you writing things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically to create a side effect. If this function gets run for the second time it will hit the side effect and fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok cool (may want to document this in the future)


import pyspark

if LooseVersion(pyspark.__version__) < LooseVersion("3.1.1"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm not sure i love this here

can you create a pytest decorator and use it here, similar to this: https://github.com/pandas-dev/pandas/blob/master/pandas/util/_test_decorators.py#L200 (you can use skipif, but have to do the import too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha let me try

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added marker min_spark_version

print(v)
key = v.iloc[0]
path = Path(f"{tempdir}/{key}")
assert not path.exists()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok cool (may want to document this in the future)

@jreback jreback merged commit 98d4089 into ibis-project:master Mar 5, 2021
@jreback
Copy link
Contributor

jreback commented Mar 5, 2021

thanks @icexelloss

@cpcloud cpcloud removed this from the Next release milestone Jan 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pyspark The Apache PySpark backend
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants