New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-42994][ML][CONNECT] PyTorch Distributor support Local Mode #40695
Conversation
9ae0b0d
to
c53737f
Compare
python/pyspark/sql/connect/client.py
Outdated
@@ -867,6 +878,8 @@ def _analyze(self, method: str, **kwargs: Any) -> AnalyzeResult: | |||
req.unpersist.blocking = cast(bool, kwargs.get("blocking")) | |||
elif method == "get_storage_level": | |||
req.get_storage_level.relation.CopyFrom(cast(pb2.Relation, kwargs.get("relation"))) | |||
elif method == "resources": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is a good approach as it overloads the semantics of analyze with generic system metadata information.
My suggestion is to implement resource metadata using a command instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel sc.resources
itself is not like a command, but similar to spark.version
which also uses the analyze rpc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the same issue. I don't appreciate the way it was implemented. The analyze RPC has no semantic relationship to either version nor resources. Please change the resources now as it's too late for version.
We also need to change The existing code it executes pytorch code in client side, but in spark connect case, we should execute pytorch code in server side (we can reuse _run_distributed_training code in the case, but local mode spark job does not support GPU schdeduling, instead, we can broadcast the selected driver GPU list to all tasks and each task select its GPU id via task rank). |
On second thought, I propose to make TorchDistributor._run_local_training only supports spark legacy mode, but for |
yes, I feel it is non-trivial to execute pytorch code in server side, since we need to launch a new Python process in the server side and then communicate with it.
agree
why not just failing it? |
I think we need to support this, because for spark ML algorithm implemented atop TorchDistributor, we hope to support either spark local mode or spark cluster mode. |
I think it does not require too much work, we can reuse most code of |
Summary: for spark connect mode: If torchDistributor.local_mode is True, raise error saying no support. If torchDistributor.local_mode is False, and spark server side is spark local mode, we need to fix the issue #40695 (comment) If torchDistributor.local_mode is False, and spark server side is spark cluster mode, current master code works fine either with GPU or without GPU config. |
What is local mode and why would you not support it on the client? |
Let me clarify it, TorchDistributer has "local mode" configure, if true, it just run torch program in client side, if False, it launches a spark job to run the torch program. So the TorchDistributer side "local mode" has nothing to do with spark master local mode.
I assume you mean to run torch program in spark connect client machine, we can support this, but I think it is less meaningful, because client machine should usually run lightweight workloads, but torch programs are heavy workloads and they often requires GPU, which client machine are hard to satisfy the condition. |
I don't think this is a valid assumption. With Spark Connect you can actually build an environment in which you have GPU locally but don't have a GPU on your cluster. In this case you still want to leverage the same execution flow. I've previously talked to users that were looking for an EC2 setup with a GPU attached and running the workloads from there against Sprk using Spark Connect. This is very similar to running sklearn locally on the client side. It's not the only way, but it's a very valid way. |
OK make sense, we can support it too @zhengruifeng , in this case, we can read client side environment variable |
c53737f
to
e06f435
Compare
if CUDA_VISIBLE_DEVICES in os.environ: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add this checking ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this, here follows the distributed mode which respects the CUDA_VISIBLE_DEVICES
env
which checking? do you mine fail it if CUDA_VISIBLE_DEVICES
is available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one last comment.
@grundprinzip would you mind taking another look at the changes in protos? |
else: | ||
|
||
def set_gpus(context: "BarrierTaskContext") -> None: | ||
if CUDA_VISIBLE_DEVICES in os.environ: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add this checking ?
|
||
env_vars = {"CUDA_VISIBLE_DEVICES": "3,4,5"} | ||
self.setup_env_vars(env_vars) | ||
self.assertEqual(get_gpus_owned(self.spark), ["3", "4", "5"]) | ||
self.assertEqual(_get_gpus_owned(self.spark), ["3", "4", "5"]) | ||
self.delete_env_vars(env_vars) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add spark connect mode tests for local_training with GPU on the following cases:
spark.master=local
and spark.master=local-cluster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TorchDistributorLocalUnitTests
and TorchDistributorLocalUnitTestsOnConnect
already test local-cluster
just add TorchDistributorLocalUnitTestsII
and TorchDistributorLocalUnitTestsIIOnConnect
for local[4]
ff9c690
to
da24f58
Compare
rename variables
57f3963
to
5bb2cb6
Compare
@WeichenXu123 mind taking another look? |
merged to master |
What changes were proposed in this pull request?
sc.resources
Why are the changes needed?
For functionality parity
After this PR, all UTs in
test_distributor
are reused and enabled in ConnectDoes this PR introduce any user-facing change?
Yes, new mode supported in Connect
How was this patch tested?
Enabled UTs