Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. #41514

Closed
wants to merge 12 commits into from

Conversation

itholic
Copy link
Contributor

@itholic itholic commented Jun 8, 2023

What changes were proposed in this pull request?

This PR proposes to fix NullOps.(eq|ne) and NumOps.(eq|ne) for pandas API on Spark with Spark Connect.

This includes SPARK-43684, SPARK-43685, SPARK-43686, SPARK-43691 at once, because they are all related similar modifications in single file.

This PR also introduce new util function _is_extension_dtypes to check whether the given object is a type of extension dtype or not, and apply to all related functions.

Why are the changes needed?

The reason is that pandas API on Spark with Spark Connect operates differently from pandas as below:

For ne:

>>> pser = pd.Series([1.0, 2.0, np.nan])
>>> psser = ps.from_pandas(pser)
>>> pser.ne(pser)
0    False
1    False
2     True
dtype: bool
>>> psser.ne(psser)
0    False
1    False
2     None
dtype: bool

We expect True for non-equal case, but it returns None in Spark Connect. So we should cast None to True for ne.

For eq:

>>> pser = pd.Series([1.0, 2.0, np.nan])
>>> psser = ps.from_pandas(pser)
>>> pser.eq(pser)
0     True
1     True
2    False
dtype: bool
>>> psser.eq(psser)
0     True
1     True
2     None
dtype: bool

We expect False for non-equal case, but it returns None in Spark Connect. So we should cast None to False for eq.

Does this PR introduce any user-facing change?

Yes, NullOps.eq, NullOps.ne, NumOps.eq, NumOps.ne are now working as expected on Spark Connect.

How was this patch tested?

Uncomment the UTs, tested manually for vanilla PySpark.

… (NullOps|NumOps).(eq|ne) for Spark Connect.
@github-actions github-actions bot added the SQL label Jun 9, 2023
@@ -219,6 +219,10 @@ def _is_boolean_type(right: Any) -> bool:
)


def _is_extension_dtypes(object: Any) -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain the fix please?

Copy link
Contributor Author

@itholic itholic Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, basically this function just simply checks whether the given object is an extension dtype or not.

Looking at the changes in the num_ops.py, it is necessary to handle binary operations differently when either the left or right is an extension dtype. If this util function doesn't exist, the code:

if isinstance(getattr(left, "dtype", None), extension_dtypes) or isinstance(getattr(right, "dtype", None), extension_dtypes):
    ...

would have to be added to every function.

So I added this utility function to simplify the code because I believe that adding such code to every function would compromise the readability of code base.

Of course, we can just check manually for all functions instead of adding new util. I have no strong preference for this, though.

def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
try:
_sanitize_list_like(right)
except TypeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update your PR description, add a bit of docstring in _is_extension_dtypes? I can't follow why we need try-except here but not in ne.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because currently eq only support for list-like object properly.

Added some comments and test for NullOps as well.

@itholic itholic marked this pull request as draft June 13, 2023 02:29
@itholic itholic changed the title [SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. [WIP][SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. Jun 13, 2023
@itholic itholic changed the title [WIP][SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. [WIP][SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][SPARK-44033][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. Jun 13, 2023
@itholic itholic changed the title [WIP][SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][SPARK-44033][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. [WIP][SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. Jun 13, 2023
@itholic itholic marked this pull request as ready for review June 13, 2023 04:18
@itholic itholic changed the title [WIP][SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. [SPARK-43684][SPARK-43685][SPARK-43686][SPARK-43691][CONNECT][PS] Fix (NullOps|NumOps).(eq|ne) for Spark Connect. Jun 13, 2023

if is_remote():
from pyspark.sql.connect.column import Column as ConnectColumn

Column = ConnectColumn
else:
Column = PySparkColumn # type: ignore[assignment]
return column_op(getattr(Column, func_name))
result = column_op(getattr(Column, func_name))(left, right)
# It works as expected on extension dtype, so we don't need to call `fillna` for this case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's "works as expected"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I means that it works the same as pandas without any additional computation such as fillna for extension dtype as below:

>>> pser = pd.Series([1.0, 2.0, np.nan], dtype="Float64")
>>> psser = ps.from_pandas(pser)
>>> pser.eq(pser)
0    True
1    True
2    <NA>
dtype: boolean
>>> psser.eq(psser)
0    True
1    True
2    <NA>
dtype: boolean


def ne(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
_sanitize_list_like(right)
return pyspark_column_op("__ne__", left, right, fillna=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why fillna=True ..?

Copy link
Contributor Author

@itholic itholic Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ne:

>>> pser = pd.Series([1.0, 2.0, np.nan])
>>> psser = ps.from_pandas(pser)
>>> pser.ne(pser)
0    False
1    False
2     True
dtype: bool
>>> psser.ne(psser)
0    False
1    False
2     None
dtype: bool

We expect True for non-equal case, but it returns None in Spark Connect. So we cast None to True for ne.

For eq:

>>> pser = pd.Series([1.0, 2.0, np.nan])
>>> psser = ps.from_pandas(pser)
>>> pser.eq(pser)
0     True
1     True
2    False
dtype: bool
>>> psser.eq(psser)
0     True
1     True
2     None
dtype: bool

We expect False for non-equal case, but it returns None in Spark Connect. So we cast None to False for eq.

@HyukjinKwon
Copy link
Member

Merged to master.

zhengruifeng pushed a commit that referenced this pull request Jun 15, 2023
…arityTests.test_series_eq`

### What changes were proposed in this pull request?

This PR proposes to enable `OpsOnDiffFramesEnabledSlowParityTests.test_series_eq` for Spark Connect.

The root cause is fixed from #41514

### Why are the changes needed?

To improve the test coverage for pandas API on Spark with Spark Connect.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Reusing the existing UT.

Closes #41582 from itholic/SPARK-43684-followup.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 19, 2023
… `(NullOps|NumOps).(eq|ne)` for Spark Connect

### What changes were proposed in this pull request?

This PR proposes to fix `NullOps.(eq|ne)` and `NumOps.(eq|ne)` for pandas API on Spark with Spark Connect.

This includes SPARK-43684, SPARK-43685, SPARK-43686, SPARK-43691 at once, because they are all related similar modifications in single file.

This PR also introduce new util function `_is_extension_dtypes` to check whether the given object is a type of extension dtype or not, and apply to all related functions.

### Why are the changes needed?

The reason is that pandas API on Spark with Spark Connect operates differently from pandas as below:

**For `ne`:**
```python
>>> pser = pd.Series([1.0, 2.0, np.nan])
>>> psser = ps.from_pandas(pser)
>>> pser.ne(pser)
0    False
1    False
2     True
dtype: bool
>>> psser.ne(psser)
0    False
1    False
2     None
dtype: bool
```

We expect `True` for non-equal case, but it returns `None` in Spark Connect. So we should cast `None` to `True` for `ne`.

**For `eq`:**
```python
>>> pser = pd.Series([1.0, 2.0, np.nan])
>>> psser = ps.from_pandas(pser)
>>> pser.eq(pser)
0     True
1     True
2    False
dtype: bool
>>> psser.eq(psser)
0     True
1     True
2     None
dtype: bool
```

We expect `False` for non-equal case, but it returns `None` in Spark Connect. So we should cast `None` to `False` for `eq`.

### Does this PR introduce _any_ user-facing change?

Yes, `NullOps.eq`, `NullOps.ne`, `NumOps.eq`, `NumOps.ne` are now working as expected on Spark Connect.

### How was this patch tested?

Uncomment the UTs, tested manually for vanilla PySpark.

Closes apache#41514 from itholic/SPARK-43684.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 19, 2023
…arityTests.test_series_eq`

### What changes were proposed in this pull request?

This PR proposes to enable `OpsOnDiffFramesEnabledSlowParityTests.test_series_eq` for Spark Connect.

The root cause is fixed from apache#41514

### Why are the changes needed?

To improve the test coverage for pandas API on Spark with Spark Connect.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Reusing the existing UT.

Closes apache#41582 from itholic/SPARK-43684-followup.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
@itholic itholic deleted the SPARK-43684 branch November 20, 2023 01:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants