Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27240][PYTHON] Use pandas DataFrame for struct type argument in Scalar Pandas UDF. #24177

Closed

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Mar 22, 2019

What changes were proposed in this pull request?

Now that we support returning pandas DataFrame for struct type in Scalar Pandas UDF.

If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas DataFrame, the argument of the chained UDF will be pandas DataFrame, but currently we don't support pandas DataFrame as an argument of Scalar Pandas UDF. That means there is an inconsistency between the chained UDF and the single UDF.

We should support taking pandas DataFrame for struct type argument in Scalar Pandas UDF to be consistent.
Currently pyarrow >=0.11 is supported.

How was this patch tested?

Modified and added some tests.

# TODO: remove version check once minimum pyarrow version is 0.11.0
if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
raise TypeError("Unsupported type in conversion from Arrow: " + str(at) +
"\nPlease install pyarrow >= 0.11.0 for StructType support.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently only support for pyarrow >=0.11 since I couldn't find a way to reconstruct pandas DataFrame from pyarrow.lib.StructArray.

if self._df_for_struct and type(data_type) == StructType:
import pandas as pd
import pyarrow as pa
column_arrays = zip(*[[chunk.field(i)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow.lib.StructArray.field() is only available in pyarrow >=0.11.

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103811 has finished for PR 24177 at commit d503aa2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented Mar 22, 2019

cc @BryanCutler @HyukjinKwon

@HyukjinKwon
Copy link
Member

(seems forgot to file a JIRA)

@ueshin ueshin changed the title Use pandas DataFrame for struct type argument in Scalar Pandas UDF. [SPARK-27240][PYTHON] Use pandas DataFrame for struct type argument in Scalar Pandas UDF. Mar 22, 2019
@ueshin
Copy link
Member Author

ueshin commented Mar 22, 2019

@HyukjinKwon Thanks!
Actually I had filed, but forgot to tag the JIRA ID and the category.

@@ -253,7 +253,9 @@ def read_udfs(pickleSer, infile, eval_type):
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true")\
.lower() == "true"

ser = ArrowStreamPandasUDFSerializer(timezone, safecheck, assign_cols_by_name)
df_for_struct = eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems hard to tell why when eval_type is PythonEvalType.SQL_SCALAR_PANDAS_UDF, then df_for_struct should be true. Maybe a well explained comment here is better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add a comment.

for arrays, field in zip(column_arrays, data_type)]
s = _check_dataframe_localize_timestamps(pd.concat(series, axis=1), self._timezone)
else:
s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column, data_type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this create a new serializer each time calling arrow_to_pandas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is just calling super class's method.

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103820 has finished for PR 24177 at commit f8b3404.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @ueshin ! If I understand correctly, this change means that any non-nested StructType column from Spark will be converted to Pandas DataFrame for input to a pandas_udf? So if a pandas_udf had 2 arguments with one being a LongType and one being a StructType, then the user would see one Pandas Series and one Pandas DataFrame as the function input?

That behavior sounds reasonable to me, but I think it is a little different for grouped map udfs that merge all columns into a single Pandas DataFrame, and then I'm not sure how this would handle a StructType column. I'm just wondering if this difference might end up confusing to the user, WDYT?

import pyarrow as pa
column_arrays = zip(*[[chunk.field(i)
for i in range(chunk.type.num_children)]
for chunk in arrow_column.data.iterchunks()])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be best to avoid dealing with array chunks and keep this high level if possible. Would it be possible to build the Pandas DataFrame by flattening the Arrow column, building a table from those and then converting that to pandas? Something like this I think:

pdf = pa.Table.from_arrays(arrow_column.flatten()).to_pandas()

I'm not sure if the column names in the pdf would end up as expected though...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrow_column.flatten() is great! Then we can support pyarrow>=0.10.

from pyspark.sql.types import StructType, \
_arrow_column_to_pandas, _check_dataframe_localize_timestamps

if self._df_for_struct and type(data_type) == StructType:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to check for a nested struct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. We can't construct pandas DataFrame with a nested DataFrame.
I might miss what you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if data_type has a nested struct, then is an error raised before it gets here? That could be addressed as a followup, I'm not sure if there is a test for it, but I'll check.

@HyukjinKwon
Copy link
Member

I think it is a little different for grouped map udfs that merge all columns into a single Pandas DataFrame

Yes, you were virtually referring wrap_grouped_map_pandas_udf in worker.py IIUC, @BryanCutler? I think we should better match.

@ueshin
Copy link
Member Author

ueshin commented Mar 25, 2019

@BryanCutler I'm sorry, but I couldn't figure out what you meant.
So, do you want to use multiple "flattened" arguments instead of a single DataFrame in Grouped Map Pandas UDFs?

@SparkQA
Copy link

SparkQA commented Mar 25, 2019

Test build #103892 has finished for PR 24177 at commit 4309d46.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member

@BryanCutler I'm sorry, but I couldn't figure out what you meant.
So, do you want to use multiple "flattened" arguments instead of a single DataFrame in Grouped Map Pandas UDFs?

Sorry, I think I wrote that a little too hastily and it might not have made much sense. Yes, I was referring to wrap_grouped_map_pandas_udf but actually I think it's not an issue since the user doesn't select columns in the same way with a groupby().apply() operation.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BryanCutler
Copy link
Member

merged to master, thanks @ueshin !

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A late LGTM as well :D

dlisuk pushed a commit to dlisuk/spark that referenced this pull request Jun 10, 2019
…n Scalar Pandas UDF.

Now that we support returning pandas DataFrame for struct type in Scalar Pandas UDF.

If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas DataFrame, the argument of the chained UDF will be pandas DataFrame, but currently we don't support pandas DataFrame as an argument of Scalar Pandas UDF. That means there is an inconsistency between the chained UDF and the single UDF.

We should support taking pandas DataFrame for struct type argument in Scalar Pandas UDF to be consistent.
Currently pyarrow >=0.11 is supported.

Modified and added some tests.

Closes apache#24177 from ueshin/issues/SPARK-27240/structtype_argument.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
simon-slowik pushed a commit to simon-slowik/spark that referenced this pull request Jun 26, 2019
…n Scalar Pandas UDF.

## What changes were proposed in this pull request?

Now that we support returning pandas DataFrame for struct type in Scalar Pandas UDF.

If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas DataFrame, the argument of the chained UDF will be pandas DataFrame, but currently we don't support pandas DataFrame as an argument of Scalar Pandas UDF. That means there is an inconsistency between the chained UDF and the single UDF.

We should support taking pandas DataFrame for struct type argument in Scalar Pandas UDF to be consistent.
Currently pyarrow >=0.11 is supported.

## How was this patch tested?

Modified and added some tests.

Closes apache#24177 from ueshin/issues/SPARK-27240/structtype_argument.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
simon-slowik pushed a commit to simon-slowik/spark that referenced this pull request Jun 26, 2019
…n Scalar Pandas UDF.

## What changes were proposed in this pull request?

Now that we support returning pandas DataFrame for struct type in Scalar Pandas UDF.

If we chain another Pandas UDF after the Scalar Pandas UDF returning pandas DataFrame, the argument of the chained UDF will be pandas DataFrame, but currently we don't support pandas DataFrame as an argument of Scalar Pandas UDF. That means there is an inconsistency between the chained UDF and the single UDF.

We should support taking pandas DataFrame for struct type argument in Scalar Pandas UDF to be consistent.
Currently pyarrow >=0.11 is supported.

## How was this patch tested?

Modified and added some tests.

Closes apache#24177 from ueshin/issues/SPARK-27240/structtype_argument.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants