Skip to content

[SPARK-39840][SQL][PYTHON] Factor PythonArrowInput out as a symmetry to PythonArrowOutput#37253

Closed
HyukjinKwon wants to merge 3 commits intoapache:masterfrom
HyukjinKwon:pyarrow-output-trait
Closed

[SPARK-39840][SQL][PYTHON] Factor PythonArrowInput out as a symmetry to PythonArrowOutput#37253
HyukjinKwon wants to merge 3 commits intoapache:masterfrom
HyukjinKwon:pyarrow-output-trait

Conversation

@HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

This PR factors the Arrow input code path out as PythonArrowInput as symmetry to PythonArrowOutput. The current hierarchy is not affected:

    └── BasePythonRunner
        ├── ArrowPythonRunner with PythonArrowOutput with PythonArrowInput
        ├── CoGroupedArrowPythonRunner with PythonArrowOutput
        ├── PythonRunner
        └── PythonUDFRunner

In addition, this PR also factors out handleMetadataAfterExec and handleMetadataBeforeExec which contains the logic to send and receive the metadata such as runtime configurations specific to Arrow in/out.

Why are the changes needed?

40485f4 factored PythonArrowOutput out. It's better to factor PythonArrowInput out too to be consistent

Does this PR introduce any user-facing change?

No, this is refactoring.

How was this patch tested?

Existing test cases should cover.

@HyukjinKwon
Copy link
Member Author

BTW, this is a base work for the support of arbitrary stateful processing in Structured Streaming with Python (Dataset.groupByKey().flatMapGroupsWithState).

cc @HeartSaVioR @ueshin @viirya any review would be appreciated.

@HyukjinKwon HyukjinKwon force-pushed the pyarrow-output-trait branch 2 times, most recently from c6ea53c to a6c59df Compare July 22, 2022 09:58
Comment on lines +44 to +51
protected def handleMetadataBeforeExec(stream: DataOutputStream): Unit = {
// Write config for the worker as a number of key -> value pairs of strings
stream.writeInt(workerConf.size)
for ((k, v) <- workerConf) {
PythonRDD.writeUTF(k, stream)
PythonRDD.writeUTF(v, stream)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Dataset.groupByKey().flatMapGroupsWithState, you are planning to override this method?
In that case, I guess we should implement this in ArrowPythonRunner and leave this empty the same as PythonArrowOutput.handleMetadataAfterExec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe yes .. but my thought is that this configuration passing applies to all Arrow specific executions so we can share by calling super.. Here's the draft version I am working on: master...HeartSaVioR:spark:WIP-flatmapgroupswithstate-pyspark (see ArrowPythonRunnerWithState).


protected val timeZoneId: String

protected def handleMetadataBeforeExec(stream: DataOutputStream): Unit = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I'm open to other suggestions about naming ..

Comment on lines +40 to +41
protected def handleMetadataAfterExec(stream: DataInputStream): Unit = { }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default we don't use it. I don't see it is used in other place too. Do you have any plan for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon
Copy link
Member Author

Will merge this in few days if there are no more comments .. I believe this refactoring is pretty much consistent with the current code base, structure and hierarchy (also given the symmetry).

@HyukjinKwon
Copy link
Member Author

Thank you @viirya !!!!

@HyukjinKwon
Copy link
Member Author

Merged to master.

@HyukjinKwon HyukjinKwon deleted the pyarrow-output-trait branch January 15, 2024 00:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants