Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas UDFs #24981

Closed
wants to merge 41 commits into from

Conversation

d80tb7
Copy link

@d80tb7 d80tb7 commented Jun 27, 2019

What changes were proposed in this pull request?

Adds a new cogroup Pandas UDF. This allows two grouped dataframes to be cogrouped together and apply a (pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame UDF to each cogroup.

Example usage

import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df1 = spark.createDataFrame(
   [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
   ("time", "id", "v1"))

df2 = spark.createDataFrame(
   [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

@pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
def asof_join(l, r):
      return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+

How was this patch tested?

Added unit test test_pandas_udf_cogrouped_map


protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
while (inputIterator.hasNext) {
dataOut.writeInt(SpecialLengths.START_ARROW_STREAM)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need this so that the python side knows whether it should expect another group. I'm using SpecialLengths.START_ARROW_STREAM to signal that there will be another group coming and SpecialLengths.END_OF_DATA_SECTION to indicate that we've finished sending all the arrow data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These SpecialLengths have slightly different meanings elsewhere and I think it gets confusing trying to sort out how they are used. I think it would be clearer if we just say we are sending an integer before the data which will indicate how many groups to read, with a value of 0 to represent the end-of-stream. So we would send 2 before writing the left and right groups, and if we end up sending more groups in the future, then it wouldn't change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that makes sense. Changed

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jun 27, 2019

Test build #106961 has finished for PR 24981 at commit d4cf6d0.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 27, 2019

Test build #106970 has finished for PR 24981 at commit 87aeb92.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 2, 2019

Test build #107125 has finished for PR 24981 at commit d2da787.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

Chris Martin added 3 commits July 2, 2019 21:29
…7463-poc-arrow-stream

# Conflicts:
#	core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
#	python/pyspark/rdd.py
#	python/pyspark/worker.py
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
#	sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@SparkQA
Copy link

SparkQA commented Jul 2, 2019

Test build #107129 has finished for PR 24981 at commit b444ff7.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class ExecutorDiskUtils
  • case class SparkListenerSpeculativeTaskSubmitted(
  • case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String)
  • case class BlockLocationsAndStatus(
  • class KafkaTable extends Table with SupportsRead with SupportsWrite
  • class RankingEvaluator (override val uid: String)
  • class RankingEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol,
  • public final class ColumnarBatch implements AutoCloseable
  • case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode
  • abstract class QuaternaryExpression extends Expression
  • case class CheckOverflow(
  • case class Overlay(input: Expression, replace: Expression, pos: Expression, len: Expression)
  • case class MapPartitionsInPandas(
  • class ColumnarRule
  • case class ColumnarToRowExec(child: SparkPlan)
  • case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode
  • case class ApplyColumnarRulesAndInsertTransitions(conf: SQLConf, columnarRules: Seq[ColumnarRule])
  • case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean)
  • case class MapPartitionsInPandasExec(

while (inputIterator.hasNext) {
dataOut.writeInt(SpecialLengths.START_ARROW_STREAM)
val (nextLeft, nextRight) = inputIterator.next()
writeGroup(nextLeft, leftSchema, dataOut)
Copy link
Contributor

@icexelloss icexelloss Jul 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. In this implementation we are writing out the complete arrow stream for each group. I am ok with this one but is a little concerned about performance. I think I'd like to understand the performance diffs between the two POCs. Is it possible to do a microbenchmark of maybe 100M of data with very small to very large groups?

Copy link
Contributor

@icexelloss icexelloss Jul 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. comparing the performance with:

  • Each group has 1 row
  • Each group has 10 row
  • Each group has 100 row
    ...
  • The dataframe contains a single group.

@BryanCutler @HyukjinKwon WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we should at least know the rough estimate about the performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's easy to run some numbers between the two POCs then that would be nice to see, but I think there would have to be a significant difference to break from the Arrow stream protocol. I would rather stick with this PR for now and leave performance improvements for followups.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it should be too bad to test- I'll try and get one in the next few days. That said, I'll try and tidy up this code first as Bryan suggests.

@HyukjinKwon
Copy link
Member

@d80tb7, sorry there have been some changes in Pandas UDFs so it caused some conflicts, and design discussion about Pandas UDFs in general. But I think we can go ahead for cogroup separately for now.

I am still positive about cogroup in general. Mind resolving conflicts and going ahead?

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @d80tb7 ! Apologies for taking so long to review. I think it is going to be a bit of a moving target with some of the recent proposals about pandas_udfs types, but we should be able to get all the plumbing sorted out first. If the proposed changes get merged first, I don't think there will need to be much updating here, probably just changes in the pandas_udf declaration. WDYT?

Overall, this looks pretty good to me. I think we can clean up some things with the serializer in python, and I'll have to take a closer look at the Scala later.

@@ -401,6 +427,22 @@ def __repr__(self):
return "ArrowStreamPandasUDFSerializer"


class InterleavedArrowStreamPandasSerializer(ArrowStreamPandasUDFSerializer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call this PandasCogroupSerializer or something like that where it is obvious what it is being used for?

python/pyspark/serializers.py Outdated Show resolved Hide resolved
python/pyspark/serializers.py Outdated Show resolved Hide resolved
python/pyspark/sql/cogroup.py Show resolved Hide resolved
python/pyspark/sql/cogroup.py Show resolved Hide resolved


protected def executePython[T]
(data: Iterator[T], runner: BasePythonRunner[T, ColumnarBatch]): Iterator[InternalRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this file has some formatting issues also


protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
while (inputIterator.hasNext) {
dataOut.writeInt(SpecialLengths.START_ARROW_STREAM)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These SpecialLengths have slightly different meanings elsewhere and I think it gets confusing trying to sort out how they are used. I think it would be clearer if we just say we are sending an integer before the data which will indicate how many groups to read, with a value of 0 to represent the end-of-stream. So we would send 2 before writing the left and right groups, and if we end up sending more groups in the future, then it wouldn't change.

>>> df2 = spark.createDataFrame(
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
>>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should skip this test and run the doctests

  1. add this to dev/sparktestsupport/modules.py at pyspark_sql

  2. add:

    def main():
        doctest.testmod(...)
        ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are currently skipping all doctests for Pandas UDFs right? We could add the module but then need to skip each test individually, which might be more consistent with the rest of PySpark.

... ("time", "id", "v1"))
>>> df2 = spark.createDataFrame(
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation nit


"""
Tests below use pd.DataFrame.assign that will infer mixed types (unicode/str) for column names
from kwargs w/ Python 2, so need to set check_column_type=False and avoid this check
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave this as a comment, not a string. Since it's not the top of the module currently, it's not docstring either.

from kwargs w/ Python 2, so need to set check_column_type=False and avoid this check
"""
if sys.version < '3':
_check_column_type = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

_check_column_type = sys.version >= '3'

>>> df2 = spark.createDataFrame(
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
>>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document when arguments are three? (when it includes the grouping key)

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Sep 22, 2019

While it looks good in general, let me leave some comments per my review just for recording purpose.

#24981 (comment) this comment at least seems critical - let me make a quick followup. If tests cannot pass, technically we might have to revert this PR.

For clarification, I have no objection for the fact it was merged.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Sep 22, 2019

Let me review further tomorrow in KST.

HyukjinKwon added a commit that referenced this pull request Sep 22, 2019
### What changes were proposed in this pull request?
This is a followup for #24981
Seems we mistakenly didn't added `test_pandas_udf_cogrouped_map` into `modules.py`. So we don't have official test results against that PR.

```
...
Starting test(python3.6): pyspark.sql.tests.test_pandas_udf
...
Starting test(python3.6): pyspark.sql.tests.test_pandas_udf_grouped_agg
...
Starting test(python3.6): pyspark.sql.tests.test_pandas_udf_grouped_map
...
Starting test(python3.6): pyspark.sql.tests.test_pandas_udf_scalar
...
Starting test(python3.6): pyspark.sql.tests.test_pandas_udf_window
Finished test(python3.6): pyspark.sql.tests.test_pandas_udf (21s)
...
Finished test(python3.6): pyspark.sql.tests.test_pandas_udf_grouped_map (49s)
...
Finished test(python3.6): pyspark.sql.tests.test_pandas_udf_window (58s)
...
Finished test(python3.6): pyspark.sql.tests.test_pandas_udf_scalar (82s)
...
Finished test(python3.6): pyspark.sql.tests.test_pandas_udf_grouped_agg (105s)
...
```

If tests fail, we should revert that PR.

### Why are the changes needed?

Relevant tests should be ran.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Jenkins tests.

Closes #25890 from HyukjinKwon/SPARK-28840.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@@ -0,0 +1,98 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we don't generate documentation for this:

Screen Shot 2019-09-22 at 9 41 48 PM

cannot click.

It should be either documented at python/docs/pyspark.sql.rst or imported at pyspark/sql/__init__.py with including it at __all__.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 adding it to pyspark/sql/__init__.py with including it at __all__ since this is what group.py does

/**
* Common functionality for a udf runner that exchanges data with Python worker via Arrow stream.
*/
abstract class BaseArrowPythonRunner[T](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we do such refactoring in this PR? it should better be separate; otherwise, it's hard to follow the changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we should consider is, R vectorized code path is matched with Python side. We should think about that before generalizing it - my goal was that deduplicating both code paths.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the changes were pretty straightforward to support the feature, mainly to be able to read back results the same way as group map udfs. I didn't consider this to be major refactoring, so making a complete copy of the python runner seemed a little excessive. Otherwise I would agree to keep things separate.

We should think about that before generalizing it - my goal was that deduplicating both code paths.

This sounds like a good idea, it shouldn't really matter if writing to Python or R, and would be good to deduplicate.

@@ -47,8 +47,8 @@ import org.apache.spark.sql.types.{NumericType, StructType}
*/
@Stable
class RelationalGroupedDataset protected[sql](
df: DataFrame,
groupingExprs: Seq[Expression],
val df: DataFrame,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is non-trivial since it exposes another API. It should better be private[spark] to hide; otherwise, we will have to keep compatibility.

/**
* Base functionality for plans which execute grouped python udfs.
*/
abstract class BasePandasGroupExec(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I think I am pretty against this refactoring. There are multiple duplicated codes in R vectorization too (which I added). I didn't intentionally yet refactor those. Plus, I don't think it's good idea to have both refactoring and feature implementation in one PR.

@d80tb7
Copy link
Author

d80tb7 commented Sep 23, 2019

Thanks for the review @HyukjinKwon. I'm happy to prepare a PR with the changes you requested, please just let me know how to proceed. Specifically:

  1. Do you want to back out this change and then I can put in a new PR? Or do you want me to simply put in a new PR for the changes?
  2. Regarding the refactorings of BasePandasGroupExec and BaseArrowPythonRunner do you want me to remove the base clasess and instead duplicate the code in the group/cogroup code paths? FWIW I don't think this is the correct thing to do as this would lead to a high ratio of duplicated code:unique functionality but I understand your point of leaving the refactoring until later so if you think we should duplicate then I am fine with that.

many thanks,

Chris

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Sep 23, 2019

@d80tb7, can you address other comments except refactoring BasePandasGroupExec and BaseArrowPythonRunner?

Let me take a closer look and see if we can avoid to introduce some hierarchy if you don't mind. From a cursory look, I think we can just put those methods in companion objects and reuse it. I got it why you did like this. Let me take a closer look before I ask something.

BTW, I am now doing reserve force training 3 more days from now one (yes, in Korea we need to do it for some days as mandatory :D) so my progress might be a bit slow.

@d80tb7
Copy link
Author

d80tb7 commented Sep 26, 2019

Hi @HyukjinKwon

I've raised #25939 to address the comments here (excluding those around the class hierarchy). Let me know what you think.

Chris

HyukjinKwon pushed a commit that referenced this pull request Sep 30, 2019
… cleanup of cogroup pandas UDF

Follow up from #24981 incorporating some comments from HyukjinKwon.

Specifically:

- Adding `CoGroupedData` to `pyspark/sql/__init__.py __all__` so that documentation is generated.
- Added pydoc, including example, for the use case whereby the user supplies a cogrouping function including a key.
- Added the boilerplate for doctests to cogroup.py.  Note that cogroup.py only contains the apply() function which has doctests disabled as per the  other Pandas Udfs.
- Restricted the newly exposed RelationalGroupedDataset constructor parameters to access only by the sql package.
- Some minor  formatting tweaks.

This was tested by running the appropriate unit tests.  I'm unsure as to how to check that my change will cause the documentation to be generated correctly, but it someone can describe how I can do this I'd be happy to check.

Closes #25939 from d80tb7/SPARK-27463-fixes.

Authored-by: Chris Martin <chris@cmartinit.co.uk>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Oct 31, 2019
This PR adds some extra documentation for the new Cogrouped map Pandas udfs.  Specifically:

- Updated the usage guide for the new `COGROUPED_MAP` Pandas udfs added in #24981
- Updated the docstring for pandas_udf to include the COGROUPED_MAP type as suggested by HyukjinKwon in #25939

Closes #26110 from d80tb7/SPARK-29126-cogroup-udf-usage-guide.

Authored-by: Chris Martin <chris@cmartinit.co.uk>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet