Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas UDFs #24965

Closed
wants to merge 11 commits into from

Conversation

d80tb7
Copy link

@d80tb7 d80tb7 commented Jun 25, 2019

This is a rough first cut of a Pandas Udf cogroup implementation. Currently implemented is:

  • JVM serialisation for interleaved dataframes.
  • Python deserialisation for interleaved dataframes
  • A skeleton cogroup implementation

The code is still pretty rough with the main caveats being:

  • The data passing is pretty minimal (e.g. it only supports exactly two dataframes, there's no ability distinguish on the python side between key and value columns etc)
  • The cogroup implementation doesn't work properly in the case of grouping by a string as attribute resolution fails.

At this point I think I'd like to focus on:

  • Does the Data passing mechanism (i.e. the deviation from arrow streaming) make sense.
  • If we are going to introduce such a data passing mechanism how complex should it be?
  • Does the high level implementation of the cogroup here make sense.

import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}


abstract class BaseArrowPythonRunner[T](
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just some common stuff that I needed for both the new Data passing mechanism and the existing (Arrow Streaming mechanism). I've broken it out her mainly because made it easier for me to track what new functionality I'd actually added. I don't think a proper solution would really have this class hierarchy.

import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer}


class InterleavedArrowWriter( leftRoot: VectorSchemaRoot,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is analagous to org.apache.arrow.vector.ipc.ArrowWriter but allows for interleaved dataframes to be sent. I suspect it could all be more memory efficient if we had a different interface which allowed for left batch to be sent before right batch is loaded.


def __init__(self, stream):
import pyarrow as pa
self._schema1 = pa.read_schema(stream)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to read these also using the message reader but for some reason pa.read_schema(self_reader.read_next_message()) didn't work.

def merge_pandas(left, right):
return pd.merge(left, right, how='outer', on=['k', 'id'])

# TODO: Grouping by a string fails to resolve here as analyzer cannot determine side
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the comment says- l.groupBy('id').cogroup(r.groupBy('id')) will fail as the resolver can't work out whether each 'id' col should be resolved from l or r. I'm a bit unsure as to the best way of solving this.

…7463-poc

# Conflicts:
#	core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
#	python/pyspark/rdd.py
#	python/pyspark/worker.py
@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106892 has finished for PR 24965 at commit c86b2bf.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


def wrapped(left, right):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is left a list of pd.Series here? Probably name them left_series and value_series to be more readable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes they are- they are value series for left and right sides of the cogroup respectively. Agreed that the names aren't the best. I'll improve them when I do a tidy up.

@@ -47,8 +47,8 @@ import org.apache.spark.sql.types.{NumericType, StructType}
*/
@Stable
class RelationalGroupedDataset protected[sql](
df: DataFrame,
groupingExprs: Seq[Expression],
private val df: DataFrame,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they are- I need the accessors in flatMapCoGroupsInPandas.

@icexelloss
Copy link
Contributor

@d80tb7 Thanks for working on this! I think at the high level this makes sense.

I am slight +1 for introducing a new way of serializing data other than using the arrow steam format. The code doesn't seem too complicated to me. I think doing this in other ways will be more complicated.

@BryanCutler @HyukjinKwon WDYT?

@BryanCutler
Copy link
Member

Thanks for working on this @d80tb7 , I have been busy this week but will try to take a look soon. I would really prefer to stick with the Arrow stream format if at all possible. Could the Scala side send 2 complete Arrow streams to Python sequentially for each group? Then the worker would convert each stream into a Pandas DataFrame to evaluate the cogroup UDF. It would add some overhead since it will be sending more streams, but I think it will be minimal. WDYT?

@d80tb7
Copy link
Author

d80tb7 commented Jun 26, 2019

Hi @BryanCutler

Yes, that seems like a valid approach. Let me see if I can produce another prototype based on that approach and see if we can compare them. I think this solution is probably more flexible in the long run, but there would obviously be a cost to defining and maintaining our own custom streaming format (even if it is largely the same as the arrow format). If we have code examples for both it'll be easier to see.

@icexelloss
Copy link
Contributor

icexelloss commented Jun 26, 2019

@BryanCutler I think the main issue with the approach that you suggested is that the python worker needs to hold much more data. For example, assuming each Arrow Stream has 10 batches, in order to process the first cogroup, the worker will need to read all 10 batches from the left table and the first batch from the right table. So in total of 11 batches instead of 2. I think that could be significant more memory usage in the python worker.

If we want to send two Arrow stream, I think we would need to do it with two separate connections between Python and Java so the Python worker can alternate between the two streams. I think could be more complicated but not entirely sure. Is this what you prefer?

@d80tb7
Copy link
Author

d80tb7 commented Jun 26, 2019

@icexelloss - my understanding of @BryanCutler's idea is that he wants a completely separate arrow stream for every group. In this case we would only have to hold 2 batches in memory at any one time, albeit at the cost of paying the stream overhead (schema etc) for every group.

Assuming that the stream overhead isn't significant (and I think it's reasonable to assume it won't be be)- then this should work. I think the implementation might be a bit more tricky (you'd have to send some sort of marker to indicate that all the arrow streams have finished), but hopefully a poc could help with this.

@icexelloss
Copy link
Contributor

Yeah I am happy as long as we don't hold more than 2 batches in memory at the same time :)

@BryanCutler
Copy link
Member

a completely separate arrow stream for every group. In this case we would only have to hold 2 batches in memory at any one time, albeit at the cost of paying the stream overhead (schema etc) for every group.

Yes, this is what I was getting at. Each complete stream is one group, so there wouldn't be more than the 2 groups required in memory at a time. Btw, the reason I prefer to stick to the stream protocol is that Arrow already tests this thoroughly and while sending messages piecemeal will probably work fine, it's just not actively being tested between Java and C++/Python.

@d80tb7
Copy link
Author

d80tb7 commented Jun 27, 2019

Ok I've put another PR (#24981) up to show an implementation that uses the Arrow Stream format. For what it's worth I marginally prefer it to this implementation, but would be happy with either. Let me know what you think.

.merge(l.toPandas(), r.toPandas(), how='outer', on=['k', 'id'])

assert_frame_equal(expected, result, check_column_type=_check_column_type)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @d80tb7, I work with Li and am also interested in cogroup.

Can I ask how you were able to get your test to run? I wasn't able to run it without the following snippet:

if __name__ == "__main__":
    from pyspark.sql.tests.test_pandas_udf_grouped_map import *

    try:
        import xmlrunner
        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
    except ImportError:
        testRunner = None
    unittest.main(testRunner=testRunner, verbosity=2)

taken from the other similar tests like test_pandas_udf_grouped_map.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hjoo

So far I've just been running it via PyCharm's unit test runner under python 3. I suspect the problem you had was that the iterator I added wasn't compatible with python 2 (sorry!). I've fixed the iterator and added a similar snippet to the one you provided above. Now I can run using python/run-tests --testnames pyspark.sql.tests.test_pandas_udf_cogrouped_map

If you still have problems let me know the error you get and I'll take a look.

@SparkQA
Copy link

SparkQA commented Jul 2, 2019

Test build #107109 has finished for PR 24965 at commit 86d1385.

  • This patch fails RAT tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 2, 2019

Test build #107111 has finished for PR 24965 at commit d15dabb.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

functionExpr: Expression,
output: Seq[Attribute],
left: LogicalPlan,
right: LogicalPlan) extends BinaryNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon
Copy link
Member

Yea .. I prefer to stick to Arrow format as well .. it will make it easier to understand as well.

@HyukjinKwon
Copy link
Member

Shall we leave this closed and go ahead with #24981?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@d80tb7
Copy link
Author

d80tb7 commented Aug 19, 2019

yes- I'll close this. Essentially this has been superseded by #24981.

@HyukjinKwon
Copy link
Member

I opened a PR to address the refactoring stuff at #25989

HyukjinKwon added a commit that referenced this pull request Oct 3, 2019
…oup arrow runner and its plan

### What changes were proposed in this pull request?

This PR proposes to avoid abstract classes introduced at #24965 but instead uses trait and object.

- `abstract class BaseArrowPythonRunner` -> `trait PythonArrowOutput` to allow mix-in

    **Before:**

    ```
    BasePythonRunner
    ├── BaseArrowPythonRunner
    │   ├── ArrowPythonRunner
    │   └── CoGroupedArrowPythonRunner
    ├── PythonRunner
    └── PythonUDFRunner
    ```

    **After:**

    ```
    └── BasePythonRunner
        ├── ArrowPythonRunner
        ├── CoGroupedArrowPythonRunner
        ├── PythonRunner
        └── PythonUDFRunner
    ```
- `abstract class BasePandasGroupExec ` -> `object PandasGroupUtils` to decouple

    **Before:**

    ```
    └── BasePandasGroupExec
        ├── FlatMapGroupsInPandasExec
        └── FlatMapCoGroupsInPandasExec
    ```

    **After:**

    ```
    ├── FlatMapGroupsInPandasExec
    └── FlatMapCoGroupsInPandasExec
    ```

### Why are the changes needed?

The problem is that R code path is being matched with Python side:

**Python:**

```
└── BasePythonRunner
    ├── ArrowPythonRunner
    ├── CoGroupedArrowPythonRunner
    ├── PythonRunner
    └── PythonUDFRunner
```

**R:**

```
└── BaseRRunner
    ├── ArrowRRunner
    └── RRunner
```

I would like to match the hierarchy and decouple other stuff for now if possible. Ideally we should deduplicate both code paths. Internal implementation is also similar intentionally.

`BasePandasGroupExec` case is similar as well. R (with Arrow optimization, in particular) has some duplicated codes with Pandas UDFs.

`FlatMapGroupsInRWithArrowExec` <> `FlatMapGroupsInPandasExec`
`MapPartitionsInRWithArrowExec` <> `ArrowEvalPythonExec`

In order to prepare deduplication here as well, it might better avoid changing hierarchy alone in Python side.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Locally tested existing tests. Jenkins tests should verify this too.

Closes #25989 from HyukjinKwon/SPARK-29317.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants