Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37491][PYTHON]Fix Series.asof for unsorted values #35191

Closed
wants to merge 14 commits into from

Conversation

pralabhkumar
Copy link
Contributor

@pralabhkumar pralabhkumar commented Jan 13, 2022

What changes were proposed in this pull request?

Fix Series.asof when values of the series is not sorted

Before

import pandas as pd
from pyspark import pandas as ps
import numpy as np
pser = pd.Series([2, 1, np.nan, 4], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
psser.asof([5, 25])
5     NaN
25    2.0
Name: Koalas, dtype: float64

pser = pd.Series([4, np.nan, np.nan, 2], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
psser.asof([5, 100])

5      NaN
100    4.0

After

import pandas as pd
from pyspark import pandas as ps
import numpy as np
pser = pd.Series([2, 1, np.nan, 4], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
psser.asof([5, 25])
5     NaN
25    1.0
Name: Koalas, dtype: float64

pser = pd.Series([4, np.nan, np.nan, 2], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
psser.asof([5, 100])
5      NaN
100    2.0

Why are the changes needed?

There is a bug in ps.as_of, when the series is not sorted

Does this PR introduce any user-facing change?

Yes user will be able to see the behavior exactly matching to pandas

How was this patch tested?

unit tests

@pralabhkumar
Copy link
Contributor Author

@itholic @HyukjinKwon Please review . Have added details in jira about the approach.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

@pralabhkumar sorry do you mind rebasing and syncing to the latest master? Seems like something went wrong in CI.

Copy link
Member

@Yikun Yikun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just FYI, here is the orginal CI link: https://github.com/pralabhkumar/spark/runs/4801311278

.withColumn("identifier", col("values.identifier"))
.withColumn("value", col("values.Koalas"))
.drop("values")
.na.drop(subset="value")
Copy link
Member

@Yikun Yikun Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, there was a complain in lint CI, it should be a bug on type hint of na.drop, I submit the fixed on #35201.

You could just use dropna(subset="value") instead of .na.drop(subset="value") to work around, and also I think dropna is more reasonable and simple in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for the comment , i'll do the same

@pralabhkumar
Copy link
Contributor Author

@HyukjinKwon Will rebase and sync to latest master.

HyukjinKwon pushed a commit that referenced this pull request Jan 17, 2022
### What changes were proposed in this pull request?
Fix drop subset inline type hint

### Why are the changes needed?
it should be same with `DataFrame.dropna`:
https://github.com/apache/spark/blob/90003398745bfee78416074ed786e986fcb2c8cd/python/pyspark/sql/dataframe.py#L2359

See also: #35191 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #35201 from Yikun/SPARK-36885-FOLLOWUP.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@itholic
Copy link
Contributor

itholic commented Jan 17, 2022

Could you update the PR description with Before & After example ?? e.g. #34931

@pralabhkumar
Copy link
Contributor Author

Could you update the PR description with Before & After example ?? e.g. #34931

Done
@itholic

@itholic
Copy link
Contributor

itholic commented Jan 17, 2022

Thanks! Let me leave some comments after taking deeper look tomorrow.

Btw, you can make the code example more prettier with python keyword as below, just FYI :-)

Screen Shot 2022-01-17 at 3 39 29 PM

this results to:

import pandas as pd
from pyspark import pandas as ps
import numpy as np
pser = pd.Series([2, 1, np.nan, 4], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
psser.asof([5, 25])
5     NaN
25    2.0
Name: Koalas, dtype: float64

pser = pd.Series([4, np.nan, np.nan, 2], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
psser.asof([5, 100])

5      NaN
100    4.0

@pralabhkumar
Copy link
Contributor Author

@itholic , Please let me know about your review comments.

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some more comment for each step to demonstrate the code more understandable ?

It's hard to track each sdf.

Maybe the Series.argsort is one of good example: https://github.com/apache/spark/blob/master/python/pyspark/pandas/series.py#L5612-L5680

@@ -5228,22 +5228,62 @@ def asof(self, where: Union[Any, List]) -> Union[Scalar, "Series"]:
where = [where]
index_scol = self._internal.index_spark_columns[0]
index_type = self._internal.spark_type_for(index_scol)
from pyspark.sql.functions import struct, lit, explode, col, row_number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyspark.sql.functions is already imported as F. I think we can just reuse it.

@@ -2071,6 +2071,18 @@ def test_asof(self):
with ps.option_context("compute.eager_check", False):
self.assert_eq(psser.asof(20), 4.0)

pser = pd.Series([2, 1, np.nan, 4], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
self.assert_eq(psser.asof([5, 25]), pser.asof([5, 25]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about psser.asof([25, 25]) ? It might fail

F.when(
index_scol <= SF.lit(index).cast(index_type),
struct(
lit(column_prefix_constant + str(index)).alias("identifier"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since where can be the same value, the column name here can be duplicated.

index_scol <= SF.lit(index).cast(index_type),
struct(
lit(column_prefix_constant + str(index)).alias("identifier"),
self.spark.column.alias("Koalas"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have to use alias here, instead of just use the existing column name ??

At least, the alias "Koalas" looks a bit wired here to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the alias name , it was required to alias the column , so that i can easily refer it later on

@pralabhkumar
Copy link
Contributor Author

pralabhkumar commented Jan 20, 2022

@itholic Thx for the comments , working on it

@pralabhkumar
Copy link
Contributor Author

@itholic Addressed the review comments, please review

@itholic
Copy link
Contributor

itholic commented Jan 27, 2022

@pralabhkumar Sorry for being delayed. I've been busy for couple of days. Will take a closer look soon 🙏

@pralabhkumar
Copy link
Contributor Author

@itholic Please review PR . Thx for your time

@pralabhkumar
Copy link
Contributor Author

@itholic Gentle ping

@itholic
Copy link
Contributor

itholic commented Feb 2, 2022

@pralabhkumar Just came back from vacation. Will leave the comment very soon! 🙏

Comment on lines 5238 to 5244
F.when(
index_scol <= SF.lit(index).cast(index_type),
F.struct(
F.lit(column_prefix_constant + str(index) + "_" + str(idx)).alias("identifier"),
self.spark.column.alias("col_value"),
),
).alias(column_prefix_constant + str(index) + "_" + str(idx))
Copy link
Contributor

@itholic itholic Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we can use F.last with ignorenulls=True instead of F.max ??
(https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.last.html)

e.g.

        cond = [
            F.last(F.when(index_scol <= SF.lit(index).cast(index_type), self.spark.column), ignorenulls=True)
            for index in where
        ]

It returns the last non-null value from given column, and seems like you want to do the same thing in your fix.

If this works, I think we simply need to fix only this part.

Copy link
Contributor Author

@pralabhkumar pralabhkumar Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @itholic , this is working (since index_level_0) is sorted. However , test case with psser.asof([25, 25]) is failing , ambiguous of duplicate cols in psdf = ps.DataFrame(sdf) . Therefore , in order to pass above test case ,
below is the change.

cond = [
            F.last(
                F.when(index_scol <= SF.lit(index).cast(index_type), self.spark.column),
                ignorenulls=True,
            ).alias(column_prefix_constant + str(index) + "_" + str(idx))
            for idx, index in enumerate(where)
        ]

Then

with ps.option_context("compute.default_index_type", "distributed", "compute.max_rows", 1):
            psdf = ps.DataFrame(sdf)  # type: DataFrame
            df = pd.DataFrame(psdf.transpose().values, columns=[self.name], index=where)
            return df[df.columns[0]]

Please let me know , if its ok , i'll update the PR

Copy link
Contributor

@itholic itholic Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to return the pandas-on-Spark Series rather than pandas Series, and should leverage the pandas DataFrame directly when only the where has duplicate item.

So, how about this ??

        cond = [
            F.last(
                F.when(index_scol <= SF.lit(index).cast(index_type), self.spark.column),
                ignorenulls=True,
            )
            for index in where
        ]

Then

        # The data is expected to be small so it's fine to transpose/use default index.
        with ps.option_context("compute.default_index_type", "distributed", "compute.max_rows", 1):
            if len(where) == len(set(where)):
                psdf: DataFrame = DataFrame(sdf)
                psdf.columns = pd.Index(where)
                return first_series(psdf.transpose()).rename(self.name)
            else:
                # If `where` has duplicate items, leverage the pandas directly
                # since pandas API on Spark doesn't support the duplicate column name.
                pdf: pd.DataFrame = sdf.limit(1).toPandas()
                pdf.columns = pd.Index(where)
                return first_series(DataFrame(pdf.transpose())).rename(self.name)

??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itholic Yes i think this is good suggestion. Please let me know , if I update this PR , or you are planning to create new PR with the suggested code changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay to just update here! :-)

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, looks pretty good.


pser = pd.Series([2, 1, np.nan, 4], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
self.assert_eq(psser.asof([25, 25]), pser.asof([25, 25]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test the string & timestamp index ??

e.g.

>>> pser = pd.Series([2, 1, np.nan, 4], index=['a', 'b', 'c', 'd'])
>>> pser.asof(['a', 'd'])
a    2.0
d    4.0
dtype: float64
>>> pser = pd.Series([2, 1, np.nan, 4], index=[pd.Timestamp(2020, 1, 1), pd.Timestamp(2020, 2, 2), pd.Timestamp(2020, 3, 3), pd.Timestamp(2020, 4, 4)])
>>> pser.asof([pd.Timestamp(2020, 1, 1), pd.Timestamp(2020, 2, 4)])
2020-01-01    2.0
2020-02-04    1.0
dtype: float64

Copy link
Contributor Author

@pralabhkumar pralabhkumar Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itholic For the timestamp , its throwing exception , since Timestamp name is like tuple so pd.Index , throws assertion error assert is_name_like_tuple(column_label, check_type=True), column_label . However the earlier code(suggested one) is working fine . Currently working on resolving it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to revert to earlier code because of this issue .


pser = pd.Series([2, 1, np.nan, 4], index=[10, 20, 30, 40], name="Koalas")
psser = ps.from_pandas(pser)
self.assert_eq(psser.asof([25, 25]), pser.asof([25, 25]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also set the np.nan as where ?

>>> pser.asof([np.nan, np.nan])
NaN    3.0
NaN    3.0
dtype: float64

Seems like this case is only supported for numeric type index.

@pralabhkumar
Copy link
Contributor Author

@itholic , please review the PR .

Comment on lines 5257 to 5263
if len(original_where) > 0:
df = pd.DataFrame(
psdf.transpose().values, columns=[self.name], index=original_where
)
else:
df = pd.DataFrame(psdf.transpose().values, columns=[self.name], index=where)
return df[df.columns[0]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this returns the pandas DataFrame, but we should return the pandas-on-Spark DataFrame.

I think maybe we can just keep the previous fix and address the Timestamp case separately ??

e.g.

        with ps.option_context("compute.default_index_type", "distributed", "compute.max_rows", 1):
            if (len(where) == len(set(where))) and not isinstance(index_type, TimestampType):
                psdf: DataFrame = DataFrame(sdf)
                psdf.columns = pd.Index(where)
                return first_series(psdf.transpose()).rename(self.name)
            else:
                # If `where` has duplicate items, leverage the pandas directly
                # since pandas API on Spark doesn't support the duplicate column name.
                pdf: pd.DataFrame = sdf.limit(1).toPandas()
                pdf.columns = pd.Index(where)
                return first_series(DataFrame(pdf.transpose())).rename(self.name)

@pralabhkumar
Copy link
Contributor Author

@itholic Please review the changes

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, looks pretty good

Comment on lines 5233 to 5250
if np.nan in where:
max_index = self._internal.spark_frame.select(F.last(index_scol)).take(1)[0][0]
modified_where = [max_index if x is np.nan else x for x in where]
cond = [
F.last(
F.when(index_scol <= SF.lit(index).cast(index_type), self.spark.column),
ignorenulls=True,
)
for idx, index in enumerate(modified_where)
]
else:
cond = [
F.last(
F.when(index_scol <= SF.lit(index).cast(index_type), self.spark.column),
ignorenulls=True,
)
for idx, index in enumerate(where)
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think maybe we can unify the cond and leave some comment to improve a bit of readability.

e.g.

        if np.nan in where:
            # When `where` is np.nan, pandas returns the last index value.
            max_index = self._internal.spark_frame.select(F.last(index_scol)).take(1)[0][0]
            modified_where = [max_index if x is np.nan else x for x in where]
        else:
            modified_where = where

        cond = [
            F.last(
                F.when(index_scol <= SF.lit(index).cast(index_type), self.spark.column),
                ignorenulls=True,
            )
            for idx, index in enumerate(modified_where)
        ]

@ueshin
Copy link
Member

ueshin commented Mar 11, 2022

@pralabhkumar I think it's already fixed at 54abb85. Could you merge the latest master branch and push the commit?

@ueshin
Copy link
Member

ueshin commented Mar 11, 2022

@itholic @Yikun @HyukjinKwon @xinrong-databricks Could you take another look? Thanks.

@HyukjinKwon
Copy link
Member

Should be good to go if it looks fine to you, Takuya.

Copy link
Member

@Yikun Yikun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@pralabhkumar
Copy link
Contributor Author

@ueshin
Build is passing . please find some time review it .

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@ueshin
Copy link
Member

ueshin commented Mar 14, 2022

Thanks! merging to master.

@ueshin ueshin closed this in f6c4634 Mar 14, 2022
@itholic
Copy link
Contributor

itholic commented Mar 14, 2022

Thanks for your efforts :-)

@pralabhkumar
Copy link
Contributor Author

Thanks merging to master

HyukjinKwon pushed a commit that referenced this pull request Apr 12, 2024
### What changes were proposed in this pull request?

Use the monotonically ID as a sorting condition for `max_by` instead of a literal string.

### Why are the changes needed?
#35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID.

### Does this PR introduce _any_ user-facing change?
Fixes nondeterminism in `asof`

### How was this patch tested?
In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46018 from markj-db/SPARK-47824.

Authored-by: Mark Jarvin <mark.jarvin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Apr 12, 2024
### What changes were proposed in this pull request?

Use the monotonically ID as a sorting condition for `max_by` instead of a literal string.

### Why are the changes needed?
#35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID.

### Does this PR introduce _any_ user-facing change?
Fixes nondeterminism in `asof`

### How was this patch tested?
In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46018 from markj-db/SPARK-47824.

Authored-by: Mark Jarvin <mark.jarvin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit a0ccdf2)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Apr 12, 2024
### What changes were proposed in this pull request?

Use the monotonically ID as a sorting condition for `max_by` instead of a literal string.

### Why are the changes needed?
#35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID.

### Does this PR introduce _any_ user-facing change?
Fixes nondeterminism in `asof`

### How was this patch tested?
In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46018 from markj-db/SPARK-47824.

Authored-by: Mark Jarvin <mark.jarvin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit a0ccdf2)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
### What changes were proposed in this pull request?

Use the monotonically ID as a sorting condition for `max_by` instead of a literal string.

### Why are the changes needed?
apache#35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID.

### Does this PR introduce _any_ user-facing change?
Fixes nondeterminism in `asof`

### How was this patch tested?
In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46018 from markj-db/SPARK-47824.

Authored-by: Mark Jarvin <mark.jarvin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit a0ccdf2)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants