Skip to content

Conversation

icexelloss
Copy link
Contributor

What changes were proposed in this pull request?

See https://issues.apache.org/jira/browse/SPARK-23011

How was this patch tested?

Add more tests in test_complex_groupby

TODO:

  • Document the usage in groupby apply

@icexelloss
Copy link
Contributor Author

@icexelloss icexelloss changed the title [SPARK-23011][PYTHON][SQL] Prepend missing grouping key in groupby apply [SPARK-23011][PYTHON][SQL] Prepend missing grouping columns in groupby apply Jan 9, 2018
@SparkQA
Copy link

SparkQA commented Jan 9, 2018

Test build #85877 has finished for PR 20211 at commit f2822b5.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

| 2| 1.1094003924504583|
+---+-------------------+
Notes on grouping column:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This explains the general idea. I plan to improve the doc if people think this change is good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks more reasonable to me to pass the grouping columns to UDF and let the UDF to decide if it wants to include the grouping columns or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for ^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan That's what I thought too initially. Let's consider this use case,

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)

This is a simple pandas UDF that does a linear regression. The issue is, although the UDF (linear regression) has nothing to do with the grouping column, the user needs to deal with grouping column in the UDF. In other words, the UDF is coupled with the grouping column.

If we make it such that grouping columns are prepend to UDF result, then the user can write something like this:

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(*x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[model.params[i] for i in   x_columns]], columns=x_columns)

beta = df.groupby(group_column).apply(ols)

Now the UDF is cleaner because it only deals with columns that are relevant to the regression. It also make the UDF more reusable, as the user can now do something like:

beta1 = df.groupby('a').apply(ols)
beta2 = df.groupby('a', 'b').apply(ols)

Because the UDF is now decoupled with the grouping column, the user can reuse the same udf with different grouping, which is not possible with the current API.

@cloud-fan @HyukjinKwon What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I saw this usecase as described in the JIRA and I got that the specific case can be simplified; however, I am not sure if it's straightforward to the end users.

For example, if I use pandas_udf I think I would simply expect the return schema is matched as described in returnType. I think pandas_udf already need some background and I think we should make it simpler as possible as we can.

It might be convenient to make the guarantee on grouping columns in some cases vs this might be a kind of magic inside.

I would prefer to let the UDF to specify the grouping columns to make this more straightforward more ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon's proposal sounds good to me too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to confirm what the result schema will be like finally.
If users want to include the keys, the udf should include the keys in its output and the keys will not be prepended automatically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, sounds good. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks all for the discussion. I will update the Jira and open a new PR.

@SparkQA
Copy link

SparkQA commented Jan 10, 2018

Test build #85879 has finished for PR 20211 at commit 46dc9e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

expected = expected.sort_values(['id', 'v']).reset_index(drop=True)
expected = expected.assign(norm=expected.norm.astype('float64'))
self.assertFramesEqual(expected, result)
return pd.DataFrame({'v': v + 1, 'v2': v - v.mean()})[:]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we copy here by the way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for simplifying the test - pandas has very complicated behavior when it comes to what's the index of the return value when using groupby apply

If interested, take a look at http://nbviewer.jupyter.org/gist/mbirdi/05f8a83d340476e5f03a

case (k, groupedRowIter) =>
val additionalGrouping = additionalGroupingProj(k)
queue.add(additionalGrouping)
(additionalGrouping, groupedRowIter.map(dropGrouping))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return only groupedRowIter.map(dropGrouping).

val additionalGroupingAttributes = mutable.ArrayBuffer[Attribute]()

for (attribute <- groupingAttributes) {
if (!udfOutput.map(_.name).contains(attribute.name)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we should decide the additional grouping attributes by only their names?

For example from tests:

result3 = df.groupby('id', 'v').apply(foo).sort('id', 'v').toPandas()

The column v in result3 is not the actual grouping value, which is overwritten by the returned value from the UDF because the returned column name contains the name. I'm not sure it is the desired behavior.

Copy link
Contributor Author

@icexelloss icexelloss Jan 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin You brought up a very good point about an issue I struggle a bit with - conflicting column names in grouping column and UDF output.

When this happens, we have a few choices:

  1. Keep both columns and rename one of them
    The benefit of this approach is that it gives the user the most information, but might result in arbitrary column names such like v_. Also another downside is if the UDF just adds or replace columns, this will result duplicate columns.
  2. Keep both columns and don't rename
    This is consistent with groupby agg behavior, so probably better than (1), but still, will result in duplicate columns if the UDF only adds or replaces columns on input
  3. Drop conflict group columns
    This is the approach implemented in this PR. The reason I choose this is because I think it's a rare case that the user want to change the grouping column and at the same time, want the original grouping column. Therefore, I think it makes most sense to make the user do a bit extra work - explicitly create a another column rather than overriding the grouping column.
  4. Drop conflict UDF columns
    I don't think drop UDF output is reasonable behavior.

@ueshin which one do you prefer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this relates to the discussion above (#20211 (comment)).
Let's wait and see for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's do that.

@viirya
Copy link
Member

viirya commented Jan 16, 2018

We should add a description to the PR, instead of just jira ticket link.

@HyukjinKwon
Copy link
Member

Yea, at least to me, PR description is helpful a lot when I actually review something or track the history :).

@icexelloss
Copy link
Contributor Author

Yeah my bad. I am going to close this PR but I will keep that in mind for future reference. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants