Skip to content

Keep the taskBinary in Stage to avoid deserializing it multiple times, and send binary to executor instead of task.#43621

Closed
hui730 wants to merge 6 commits intoapache:masterfrom
hui730:master
Closed

Keep the taskBinary in Stage to avoid deserializing it multiple times, and send binary to executor instead of task.#43621
hui730 wants to merge 6 commits intoapache:masterfrom
hui730:master

Conversation

@hui730
Copy link

@hui730 hui730 commented Nov 1, 2023

What changes were proposed in this pull request?

My modification is a performance optimization, which keep the taskBinary in Stage to avoid deserializing it multiple times, and send binary to executor instead of task. Here are specific modifications:
a. Add stageID info to TaskDescription.
b. Add taskBinary info to taskSet. Remove taskBinary info to ShuffleMapTask and ResultTask.
c. add binary getter/setter of Task. Get binary will create a new copy of binary, do not use same binary of executor.
d. Store binary broadcast when submitTasks.
e. Add new Netty event message LaunchBinary, before executor run task, launch binary to new executor first.

Why are the changes needed?

the taskBinary for each task of the same stage is the same. The current situation is that each task will bring a copy of the taskBinary's broadcast variables to the executor, and my modification will send this taskBinary to each executor that has executed the corresponding stage, reducing the repetition of broadcast variables and avoid deserializing binary multiple times.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Because this modification is a modification of the Spark Core, and it also involves some parameter changes for unit testing, so I conducted all unit testing on all modules base on master branch.

Was this patch authored or co-authored using generative AI tooling?

No

… for keeping the taskBinary in Stage instead of Task
@github-actions github-actions bot added the CORE label Nov 1, 2023
@HyukjinKwon
Copy link
Member

Mind filing a JIRA please?

@hui730
Copy link
Author

hui730 commented Nov 2, 2023

Mind filing a JIRA please?
Sure, I'll complete it today

@mridulm
Copy link
Contributor

mridulm commented Nov 2, 2023

Each task has a reference to a broadcast'ed task binary Array[Byte] - not the object.
This Array[Byte] is pulled to each executor once - and for each subsequent task, this Array[Byte] is reused to deserialize and create a new task closure. So essentially we are trying to save on a jvm local deserialization cost.

Explicit use of broadcast variables is documented to be read only - but with task closures, I would be very reluctant to assume this behavior. If the instance is reused, task closures can start having side effects on other tasks in the same executor.

@hui730
Copy link
Author

hui730 commented Nov 2, 2023

Each task has a reference to a broadcast'ed task binary Array[Byte] - not the object. This Array[Byte] is pulled to each executor once - and for each subsequent task, this Array[Byte] is reused to deserialize and create a new task closure. So essentially we are trying to save on a jvm local deserialization cost.

Explicit use of broadcast variables is documented to be read only - but with task closures, I would be very reluctant to assume this behavior. If the instance is reused, task closures can start having side effects on other tasks in the same executor.

Okay, I get what you mean. Especially in Hadoop.
My plan is to create a new binary from executor binary,use System.arraycopy().
How do you like it, sir?:)

@mridulm
Copy link
Contributor

mridulm commented Nov 2, 2023

My plan is to create a new binary from executor binary,use System.arraycopy().

This is what is effectively happening currently, right ?
The underlying serialzed task array array is immutable - and is repeatedly read to deserialize into the task closure.

I want to make sure I understand the proposal, and how it is different from what Spark is currently doing.

@hui730
Copy link
Author

hui730 commented Nov 2, 2023

My plan is to create a new binary from executor binary,use System.arraycopy().

This is what is effectively happening currently, right ? The underlying serialzed task array array is immutable - and is repeatedly read to deserialize into the task closure.

I want to make sure I understand the proposal, and how it is different from what Spark is currently doing.

Assuming that there are currently n tasks (same stage) running simultaneously in the executor.
the current situation is to read the remote broadcast, deserializing it, and then start running these n tasks. This step is serial.
In my modification, the step of reading the remote broadcast and deserializing it into array [byte], which is asynchronous. It is parallel to the launchTask. This can save time.

@mridulm
Copy link
Contributor

mridulm commented Nov 2, 2023

The broadcast variable is read once - not n times, and the deserialization n times prevents side effect between tasks, which would be a behavior change if we move away from it.

On the performance aspect , as currently formulated, I would expect negligible (if any) difference - though would be happy to see numbers to the contrary to evaluate effectiveness !
Additionally, given the possibility of side effect, these benefits should be very compelling to entertain if it is worth going down this path.

@hui730 hui730 closed this Nov 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants