New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41592][PYTHON][ML] Pytorch file Distributed Training #39267
[SPARK-41592][PYTHON][ML] Pytorch file Distributed Training #39267
Conversation
ead876c
to
1755625
Compare
Can one of the admins verify this patch? |
1755625
to
7706376
Compare
7706376
to
732e350
Compare
5fb8333
to
f8c464f
Compare
@@ -407,13 +418,6 @@ def _run_local_training( | |||
try: | |||
if self.use_gpu: | |||
gpus_owned = get_gpus_owned(self.sc) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually no longer needed since if num_processes > len(gpus_owned)
, then we set num_processes = len(gpus_owned)
|
||
CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES" | ||
|
||
# The idea of setting the random port to 0 doesn't seem to work? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like the following seems to error:
import socket
sock = socket.socket()
sock.bind((master_address, 0))
port = sock.getsockname()[1]
So I just find a port using randomness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened if two processes choose the same port?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it will raise a RuntimeError: Address already in use
if use_gpu: | ||
set_gpus(context) | ||
else: | ||
os.environ[CUDA_VISIBLE_DEVICES] = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be added because in the case the user runs training with TorchDistributor(use_gpu=False, **kwargs).run(train_fn)
but accidentally has some PyTorch Lightning code like pl.Trainer(accelerator="gpu")
in their train_fn
, an error should be raised saying no cuda devices available even though you specified a gpu accelerator.
We already have a check in get_num_tasks
that checks when use_gpu=True
but no GPUs are available, and I think this code addresses the case when use_gpu=False
but the internal code has usage of GPUs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeichenXu123 @lu-wang-dl is my logic reasonable here or did I misunderstand anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand. If the user run something like pl.Trainer(accelerator="gpu")
on a CPU cluster, what is the behavior from PyTorch lighting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PyTorch Lightning will raise a MisconfigurationException: No supported gpu backend found!
. This is what we expect to see if the user sets use_gpu=False
and calls pl.Trainer(accelerator="gpu")
My understanding is that if a user runs this code on a local cluster with GPUs on each node without os.environ[CUDA_VISIBLE_DEVICES] = ""
, then the task will be assigned a GPU even when use_gpu=True
.
) -> Optional[Any]: | ||
if not framework_wrapper_fn: | ||
raise RuntimeError("Unknown combination of parameters") | ||
spark_task_program = self._get_spark_task_program(framework_wrapper_fn, train_fn, *args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just define the function here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess just for the sake of modularity. We could just define the function here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Just some minor comments/questions.
import socket | ||
import random | ||
|
||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a sleep(0.1) in the loop body ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and I recommend to set maximum retry number (e.g. 100) for get_free_port loop, to avoid dead loop in some unexpected cases.
context = BarrierTaskContext.get() | ||
|
||
if use_gpu: | ||
set_gpus(context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify set_gpus
function:
if CUDA_VISIBLE_DEVICES
env var exists, do nothing (spark already set CUDA_VISIBLE_DEVICES properly
otherwise generates CUDA_VISIBLE_DEVICES
from taskcontext.resources["gpu"].addresses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/spark/pull/39267/files#diff-76c395a6b98138662faaec37460ccda966f5cc0df0bccd224dfefcd81b2a7a79R459 <- Is this what you were suggesting?
7e51d28
to
80d82bc
Compare
80d82bc
to
bfd6879
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
Merged to master. |
@rithwik-db Can I clarify that the |
If we are using CPUs for training, |
What changes were proposed in this pull request?
This is an addition to #39188 to add support for multi node training using PyTorch files. The users would follow the second workflow in the design document to run training on the executors. I added some new utility functions as well as built on top of current functions. This is largely WIP so testing will be added very soon.
Why are the changes needed?
Look at the main ticket for more details.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Tested with a pseudo-integration test. Integration tests will be added in a future PR.