Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality #24095

Closed

Conversation

BryanCutler
Copy link
Member

What changes were proposed in this pull request?

This change is a cleanup and consolidation of 3 areas related to Pandas UDFs:

  1. ArrowStreamPandasSerializer now inherits from ArrowStreamSerializer and uses the base class dump_stream, load_stream to create Arrow reader/writer and send Arrow record batches. ArrowStreamPandasSerializer makes the conversions to/from Pandas and converts to Arrow record batch iterators. This change removed duplicated creation of Arrow readers/writers.

  2. createDataFrame with Arrow now uses ArrowStreamPandasSerializer instead of doing its own conversions from Pandas to Arrow and sending record batches through ArrowStreamSerializer.

  3. Grouped Map UDFs now reuse existing logic in ArrowStreamPandasSerializer to send Pandas DataFrame results as a StructType instead of separating each column from the DataFrame. This makes the code a little more consistent with the Python worker, but does require that the returned StructType column is flattened out in FlatMapGroupsInPandasExec in Scala.

How was this patch tested?

Existing tests and ran tests with pyarrow 0.12.0

@BryanCutler
Copy link
Member Author

I think (1) and (2) from above are pretty straightforward. With (3) I think it makes the Python worker a bit better now that StructType can be returned from a scalar udf, but it does require the struct to be flattened out in Scala, so I don't feel too strongly about pushing this in. Please take a look @HyukjinKwon @ueshin @icexelloss and let me know what you think, thanks!

// Grouped Map UDF returns a StructType column in ColumnarBatch, select the children here
// TODO: ColumnVector getChild is protected, so use ArrowColumnVector which is public
val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
val outputVectors = output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a better way to do this, or at least make getChild public in ColumnVector to avoid the casting here. wdyt @ueshin ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic itself is fine. But doesn't this mean we cannot support nested structs in grouped map Pandas UDFs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another concern is tho .. I think all of Arrow implementations (including SparkR ones) dont modify the batch's outputs but use the batch as are.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ColumnVector is a very public interface compared to ArrowColumnVector now, I'm not sure we should make it public.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to modify the batch's outputs here, but we need to think about the previous concern from @HyukjinKwon. WDYT about that @BryanCutler ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic itself is fine. But doesn't this mean we cannot support nested structs in grouped map Pandas UDFs?

Nested structs were never supported in grouped map UDFs (I verified with code prior to #23900). Part of the reason for this is there is no explicit logical type for a struct in a Pandas DataFrame. When creating a nested struct in pyarrow, then converting to pandas, the struct column gets converted to a column of dictionaries, which Spark could handle but brings some other complications. So this cleanup should keep the functionality the same.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another concern is tho .. I think all of Arrow implementations (including SparkR ones) dont modify the batch's outputs but use the batch as are.

Yeah, makes the Scala side a bit different but I think it is worth it to make things in worker.py more consistent. With this cleanup, all of the Pandas UDFs go through the same logic to be serialized.


# Assign result columns by schema name if user labeled with strings, else use position
if assign_cols_by_name and any(isinstance(name, basestring) for name in result.columns):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, @BryanCutler, sorry if I runshed to read but where did this logic go?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, okie. The logic was actually duplicated with _create_batch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, that's correct

if writer is not None:
writer.close()
batches = (_create_batch(series, self._timezone, self._safecheck, self._assign_cols_by_name)
for series in iterator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm ... @BryanCutler, seems _init_dump_stream was added to handle write_int(SpecialLengths.START_ARROW_STREAM, stream) this case alone TBH. Could we just only pull out write_int(SpecialLengths.START_ARROW_STREAM, stream) into here and remove _init_dump_stream to make this logic isolated here?

It looks tricky to do it since it's all generators. I thought we could at least do something like:

        batches = (_create_batch(series, self._timezone, self._safecheck, self._assign_cols_by_name)
                   for series in iterator)
        def arrow_start_written_batches():
            should_write_start_length = True
            for batch in batches:
                if should_write_start_length:
                    write_int(SpecialLengths.START_ARROW_STREAM, stream)
                    should_write_start_length = False
                yield batch
        super(ArrowStreamPandasSerializer, self).dump_stream(arrow_start_written_batches(), stream)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @HyukjinKwon to have write_int(SpecialLengths.START_ARROW_STREAM, stream) only in this class and I'd prefer not to have the extra parameter send_start_stream for ArrowStreamSerializer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually did this way first, but was worried it was more confusing. Sounds fine to me if you guys are ok with it too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only issue with this is that createDataFrame would need to handle SpecialLengths.START_ARROW_STREAM for regular and encrypted options and I'm a little hesitant to change that. I'll keep an option in ArrowStreamPandasSerializer to not write the start stream for these cases.


def load_stream(self, stream):
"""
Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
"""
batch_iter = super(ArrowStreamPandasSerializer, self).load_stream(stream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal but I would name it batches

@HyukjinKwon
Copy link
Member

Looks pretty good to me otherwise.

@SparkQA
Copy link

SparkQA commented Mar 15, 2019

Test build #103508 has finished for PR 24095 at commit 93bb831.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can go ahead about (1) and (2), but we have some concerns about (3) so far.
We might be better to separate PR for (3).

if writer is not None:
writer.close()
batches = (_create_batch(series, self._timezone, self._safecheck, self._assign_cols_by_name)
for series in iterator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @HyukjinKwon to have write_int(SpecialLengths.START_ARROW_STREAM, stream) only in this class and I'd prefer not to have the extra parameter send_start_stream for ArrowStreamSerializer.

// Grouped Map UDF returns a StructType column in ColumnarBatch, select the children here
// TODO: ColumnVector getChild is protected, so use ArrowColumnVector which is public
val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
val outputVectors = output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ColumnVector is a very public interface compared to ArrowColumnVector now, I'm not sure we should make it public.

// Grouped Map UDF returns a StructType column in ColumnarBatch, select the children here
// TODO: ColumnVector getChild is protected, so use ArrowColumnVector which is public
val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
val outputVectors = output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to modify the batch's outputs here, but we need to think about the previous concern from @HyukjinKwon. WDYT about that @BryanCutler ?

@BryanCutler
Copy link
Member Author

BryanCutler commented Mar 18, 2019

Thanks for reviewing @ueshin and @HyukjinKwon!

For (2), I used your suggestion to write START_ARROW_STREAM in a generator but to keep from changing things for createDataFrame, I have it as an option in the serializer.

For (3) I don't think it changes functionality because nested structs weren't supported before with any Pandas UDFs. Let me know what you guys think.

@SparkQA
Copy link

SparkQA commented Mar 19, 2019

Test build #103635 has finished for PR 24095 at commit bc08d1b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


# Create the Spark schema from list of names passed in with Arrow types
if isinstance(schema, (list, tuple)):
arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only since pyarrow 0.12.0, I can check into a workaround although it might be a good time to bump the minimum pyarrow version

val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
flattenedBatch.setNumRows(batch.numRows())
flattenedBatch.rowIterator.asScala
}.map(UnsafeProjection.create(output, output))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, also, let's make a separate variable for UnsafeProjection.create(output, output). I think we talked before in my PR 😉

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do

# Create the Spark schema from list of names passed in with Arrow types
if isinstance(schema, (list, tuple)):
if LooseVersion(pa.__version__) < LooseVersion("0.12.0"):
temp_batch = pa.RecordBatch.from_pandas(pdf[0:100], preserve_index=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too thrilled with creating a record batch just to get the Arrow schema, but this was the most reliable way I could figure to do it pre v0.12.0. I will propose bumping the pyarrow version soon, and then this could be removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okie

@SparkQA
Copy link

SparkQA commented Mar 21, 2019

Test build #103743 has finished for PR 24095 at commit f6b0e30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):

@BryanCutler
Copy link
Member Author

Apologies, I moved things around again for item (2) because I didn't really like having an option in ArrowStreamPandasSerializer to send the START_ARROW_STREAM either.

Now, I have _create_batch(...) as a method in ArrowStreamPandasSerializer (where it belongs I think), and then have a subclass used for Pandas UDFs that overrides dump_stream that can send START_ARROW_STREAM.

I think it's clearer this way because it's easier to see what serializer is used where, and I also tried to improve the docs. Let me know what you think when you get the chance to take another look @HyukjinKwon @ueshin . Thanks!

@HyukjinKwon
Copy link
Member

Took a quick look and looks okie in general to me. Will take a closer look later so don't block by me.

@HyukjinKwon
Copy link
Member

Merged to master.

@BryanCutler
Copy link
Member Author

Thanks @HyukjinKwon and @ueshin !

@BryanCutler BryanCutler deleted the arrow-refactor-cleanup-UDFs branch March 22, 2019 04:57
simon-slowik pushed a commit to simon-slowik/spark that referenced this pull request Jun 26, 2019
## What changes were proposed in this pull request?

This change is a cleanup and consolidation of 3 areas related to Pandas UDFs:

1) `ArrowStreamPandasSerializer` now inherits from `ArrowStreamSerializer` and uses the base class `dump_stream`, `load_stream` to create Arrow reader/writer and send Arrow record batches.  `ArrowStreamPandasSerializer` makes the conversions to/from Pandas and converts to Arrow record batch iterators. This change removed duplicated creation of Arrow readers/writers.

2) `createDataFrame` with Arrow now uses `ArrowStreamPandasSerializer` instead of doing its own conversions from Pandas to Arrow and sending record batches through `ArrowStreamSerializer`.

3) Grouped Map UDFs now reuse existing logic in `ArrowStreamPandasSerializer` to send Pandas DataFrame results as a `StructType` instead of separating each column from the DataFrame. This makes the code a little more consistent with the Python worker, but does require that the returned StructType column is flattened out in `FlatMapGroupsInPandasExec` in Scala.

## How was this patch tested?

Existing tests and ran tests with pyarrow 0.12.0

Closes apache#24095 from BryanCutler/arrow-refactor-cleanup-UDFs.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
simon-slowik pushed a commit to simon-slowik/spark that referenced this pull request Jun 26, 2019
## What changes were proposed in this pull request?

This change is a cleanup and consolidation of 3 areas related to Pandas UDFs:

1) `ArrowStreamPandasSerializer` now inherits from `ArrowStreamSerializer` and uses the base class `dump_stream`, `load_stream` to create Arrow reader/writer and send Arrow record batches.  `ArrowStreamPandasSerializer` makes the conversions to/from Pandas and converts to Arrow record batch iterators. This change removed duplicated creation of Arrow readers/writers.

2) `createDataFrame` with Arrow now uses `ArrowStreamPandasSerializer` instead of doing its own conversions from Pandas to Arrow and sending record batches through `ArrowStreamSerializer`.

3) Grouped Map UDFs now reuse existing logic in `ArrowStreamPandasSerializer` to send Pandas DataFrame results as a `StructType` instead of separating each column from the DataFrame. This makes the code a little more consistent with the Python worker, but does require that the returned StructType column is flattened out in `FlatMapGroupsInPandasExec` in Scala.

## How was this patch tested?

Existing tests and ran tests with pyarrow 0.12.0

Closes apache#24095 from BryanCutler/arrow-refactor-cleanup-UDFs.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants