Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42669][CONNECT] Short circuit local relation RPCs #40782

Closed
wants to merge 9 commits into from

Conversation

beliefer
Copy link
Contributor

What changes were proposed in this pull request?

Operations on LocalRelation can mostly be done locally (without sending RPCs).
We should leverage this.

Why are the changes needed?

Avoid sending RPCs for LocalRelation.

Does this PR introduce any user-facing change?

'No'.
New feature.

How was this patch tested?

Exists test cases.

@beliefer
Copy link
Contributor Author

beliefer commented Apr 14, 2023

The failure GA is unrelated to this PR.
ping @hvanhovell @zhengruifeng cc @dongjoon-hyun @amaliujia

@zhengruifeng
Copy link
Contributor

sorry, test_parity_torch_distributor became flaky, I am going to disable related cases for now

@amaliujia
Copy link
Contributor

So this happens when

val df = createDataFrame(...)
df.collect()

?

// Short circuit local relation RPCs
val localRelation = plan.getRoot.getLocalRelation
new SparkResult(
Seq.empty[proto.ExecutePlanResponse].iterator.asJava,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if you can populate the data from the LocalRelation into an instance of proto.ExecutePlanResponse thus we do not need to branch the code in SparkResult?

Copy link
Contributor Author

@beliefer beliefer Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

@beliefer
Copy link
Contributor Author

So this happens when

It happens when the root plan is LocalRelation.

Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

The code changes are reduced much less now.

reader.loadNextBatch()
val batch = proto.ExecutePlanResponse.ArrowBatch
.newBuilder()
.setRowCount(reader.getVectorSchemaRoot.getRowCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to set row count? You can just modify SparkResult to make the assert a bit more lenient.

val localRelation = plan.getRoot.getLocalRelation
val response = proto.ExecutePlanResponse.newBuilder()
val reader = new ArrowStreamReader(localRelation.getData.newInput(), allocator)
reader.loadNextBatch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to close the reader, otherwise the vector root is never cleaned up (memory leak).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I forgot it.

@beliefer
Copy link
Contributor Author

@hvanhovell The failed GA is unrelated to this PR.

.newBuilder()
.setData(localRelation.getData)
.build()
response.setArrowBatch(batch).setIsLocalBuilt(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? I don't think we should change the protocol for this.

assert(root.getRowCount == response.getArrowBatch.getRowCount) // HUH!
assert(
response.getIsLocalBuilt ||
root.getRowCount == response.getArrowBatch.getRowCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just make the row count optional. It is kinda weird it is in the protocol anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You means change the protocol that make the row count optional ?

Copy link
Contributor Author

@beliefer beliefer Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error: Field "1" on message "ArrowBatch" moved from outside to inside a oneof.
[8](https://github.com/beliefer/spark/actions/runs/4761585011/jobs/8462987474#step:5:9)
Error: buf found 1 breaking changes.

It seems 1 breaking change. @hvanhovell

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess adding the row count as optional to LocalRelation message is more useful.
The server can check if the expected number of rows are provided, and we can reuse it here to set the row count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell What do you think about my idea if making the row count optional causes the breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Could you take a close look ?

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to implement this in Python client, but I'm not sure whether this is useful or not.
We need at least one roundtrip to retrieve some configs to provide a right result, e.g., spark.sql.session.timeZone.

The test below won't pass without retrieving the config:
https://github.com/apache/spark/blob/d7712776bf28d7c9c0d523624bec70a4bc2d97a0/python/pyspark/sql/tests/test_arrow.py#L335-L358

I'm not sure it's the same in Scala client, though.
Just FYI.

After some more investigation, I realized we need to retrieve some configs in some cases anyway.
I think we should go ahead with this. @beliefer

assert(root.getRowCount == response.getArrowBatch.getRowCount) // HUH!
assert(
response.getIsLocalBuilt ||
root.getRowCount == response.getArrowBatch.getRowCount
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess adding the row count as optional to LocalRelation message is more useful.
The server can check if the expected number of rows are provided, and we can reuse it here to set the row count.

@beliefer
Copy link
Contributor Author

Because there are difference suggestion from @hvanhovell and @ueshin, I don't know how to continue this job.

@beliefer
Copy link
Contributor Author

@ueshin @hvanhovell Recently, #41064 added the rowCount statistics to LocalRelation. In this PR, @ueshin also suggested to add the row count as optional to LocalRelation message.
So I think this is a chance to add optional row count to LocalRelation message.

@beliefer
Copy link
Contributor Author

cc @HyukjinKwon

@beliefer
Copy link
Contributor Author

ping @hvanhovell cc @ueshin

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 23, 2023
@github-actions github-actions bot closed this Sep 24, 2023
@beliefer beliefer removed the Stale label Oct 14, 2023
@beliefer beliefer reopened this Oct 14, 2023
@github-actions github-actions bot removed the CORE label Oct 14, 2023
@beliefer
Copy link
Contributor Author

@hvanhovell Do we need this change?

Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 23, 2024
@beliefer beliefer removed the Stale label Jan 23, 2024
Copy link

github-actions bot commented May 3, 2024

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label May 3, 2024
@github-actions github-actions bot closed this May 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants