Skip to content

Conversation

@icexelloss
Copy link
Contributor

What changes were proposed in this pull request?

Currently, pandas_udf supports "grouped aggregate" type that can be used with unbounded and unbounded windows. There is another set of use cases that can benefit from a "grouped transform" type pandas_udf.

Grouped transform is defined as a N -> N mapping over a group. For example, "compute zscore for values in the group using the grouped mean and grouped stdev", or "rank the values in the group".

Currently, in order to do this, user needs to use "grouped apply", for example:

@pandas_udf(schema, GROUPED_MAP)
def subtract_mean(pdf)
    v = pdf['v']
    pdf['v'] = v - v.mean()
    return pdf

df.groupby('id').apply(subtract_mean)
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

This approach has a few downside:

Specifying the full return schema is complicated for the user although the function only changes one column.

  • The column name 'v' inside as part of the udf, makes the udf less reusable.
  • The entire dataframe is serialized to pass to Python although only one column is needed.
  • Here we propose a new type of pandas_udf to work with these types of use cases:

Here we propose a new type of pandas_udf to work with these types of use cases:

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf('double', GROUPED_XFORM)
def subtract_mean(v):
    return v - v.mean()

w = Window.partitionBy('id')

df = df.withColumn('v', subtract_mean(df['v']).over(w))
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Which addresses the above downsides.

  • The user only needs to specify the output type of a single column.
  • The column being zscored is decoupled from the udf implementation
  • We only need to send one column to Python worker and concat the result with the original dataframe (this is what grouped aggregate is doing already)

This is similar to groupby transform in pandas, hence the name "grouped transform"

>>> df = pd.DataFrame({'id': [1, 1, 2, 2, 2], 'value': [1., 2., 3., 5., 10.]})

>>> df
   id  value

0   1    1.0
1   1    2.0
2   2    3.0
3   2    5.0
4   2   10.0

>>> df['value_demean'] = df.groupby('id')['value'].transform(lambda x: x - x.mean())
>>> df

   id  value  value_demean
0   1    1.0          -0.5
1   1    2.0           0.5
2   2    3.0          -3.0
3   2    5.0          -1.0
4   2   10.0           4.0

How was this patch tested?

Add new tests in test_pandas_udf_window

@icexelloss icexelloss changed the title [SPARK-28006] User-defined grouped transform pandas_udf for window operations [WIP][SPARK-28006] User-defined grouped transform pandas_udf for window operations Jun 17, 2019
@icexelloss
Copy link
Contributor Author

Still WIP, but the implementation looks pretty straight forward. Need to make WindowExecInPandas handle multiple eval types...

@SparkQA
Copy link

SparkQA commented Jun 18, 2019

Test build #106597 has finished for PR 24896 at commit fe80821.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_GROUPED_XFORM_PANDAS_UDF
}

// This is currently same as GroupedAggPandasUDF, but we might support new types in the future,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the comment together?

@SparkQA
Copy link

SparkQA commented Aug 23, 2019

Test build #109638 has finished for PR 24896 at commit fe80821.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@icexelloss icexelloss force-pushed the SPARK-28006-window-xform-udf branch from fe80821 to 9bb37ba Compare October 7, 2019 21:13
@icexelloss icexelloss force-pushed the SPARK-28006-window-xform-udf branch from 9bb37ba to 43eba25 Compare October 7, 2019 21:13
@SparkQA
Copy link

SparkQA commented Oct 7, 2019

Test build #111855 has finished for PR 24896 at commit 43eba25.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@icexelloss
Copy link
Contributor Author

icexelloss commented Oct 8, 2019

Just want to update a quick status here. I still think this is a useful addition to the current Pandas UDF and a quite easy one to implement. Internally we have quite a few use cases that could use grouped transform.

Currently I am not pushing for this change because of SPARK-28264. @HyukjinKwon @BryanCutler what's your thought on getting this into 3.0 as well? I am happy to do the work here.

@BryanCutler
Copy link
Member

@icexelloss I'm a little confused about a couple things

  1. is this specific to operations over a window only?
  2. can the same functionality be achieved by the GROUPED_MAP pandas_udf, only it is slower and awkward because it requires the whole dataframe even though only 1 column is used?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Oct 23, 2019

@BryanCutler, I think yes for both questions. BTW, I guess the name should be something similar with GROUPED_MAP.

@HyukjinKwon
Copy link
Member

I think I don't strongly feel about this. I will defer to other people here.

@BryanCutler
Copy link
Member

If the answer is yes to question (2) above, then I think we should hold off on this until SPARK-28264 is sorted out, and it will be good to keep this use case in mind too.

@icexelloss
Copy link
Contributor Author

@BryanCutler @HyukjinKwon Thanks both for the feedback. I am hoping we could reach some agreement about the functionality here. The spelling of course will depend on SPARK-28264.

Do we think that because grouped_map already exists this functionality is not so useful?

@HyukjinKwon
Copy link
Member

I roughly think like that for now so don't feel strongly .. I or somebody else will probably give a try for SPARK-28264 soon ..

@github-actions
Copy link

github-actions bot commented Feb 7, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 7, 2020
@github-actions github-actions bot closed this Feb 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants