diff --git a/docs/docs/concepts/tasks.md b/docs/docs/concepts/tasks.md
index c7d10d1f4..cd40565cf 100644
--- a/docs/docs/concepts/tasks.md
+++ b/docs/docs/concepts/tasks.md
@@ -136,7 +136,7 @@ resources:
Nodes can communicate using their private IP addresses.
-Use `DSTACK_MASTER_NODE_IP`, `$DSTACK_NODE_RANK`, and other
+Use `DSTACK_MASTER_NODE_IP`, `DSTACK_NODES_IPS`, `DSTACK_NODE_RANK`, and other
[System environment variables](#system-environment-variables)
to discover IP addresses and other details.
@@ -159,6 +159,11 @@ to discover IP addresses and other details.
# ... The rest of the commands
```
+??? info "SSH"
+ You can log in to any node from any node via SSH on port 10022 using the `~/.ssh/dstack_job` private key.
+ For convenience, `~/.ssh/config` is preconfigured with these options, so a simple `ssh ` is enough.
+ For a list of nodes IPs check the `DSTACK_NODES_IPS` environment variable.
+
!!! info "Fleets"
Distributed tasks can only run on fleets with
[cluster placement](fleets.md#cloud-placement).
diff --git a/docs/examples.md b/docs/examples.md
index c039fda4d..fcf8c0bcf 100644
--- a/docs/examples.md
+++ b/docs/examples.md
@@ -155,4 +155,14 @@ hide:
Use Docker and Docker Compose inside runs
+
+
+ NCCL Tests
+
+
+
+ Run multi-node NCCL Tests with MPI
+
+
diff --git a/docs/examples/misc/nccl-tests/index.md b/docs/examples/misc/nccl-tests/index.md
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/misc/nccl-tests/.dstack.yml b/examples/misc/nccl-tests/.dstack.yml
new file mode 100644
index 000000000..2e3376f5c
--- /dev/null
+++ b/examples/misc/nccl-tests/.dstack.yml
@@ -0,0 +1,42 @@
+type: task
+name: nccl-tests
+
+image: un1def/aws-efa-test
+nodes: 2
+
+env:
+ - NCCL_DEBUG=INFO
+
+commands:
+ - |
+ # We use FIFO for inter-node communication
+ FIFO=/tmp/dstack_job
+ if [ ${DSTACK_NODE_RANK} -eq 0 ]; then
+ cd /root/nccl-tests/build
+ echo "${DSTACK_NODES_IPS}" > hostfile
+ MPIRUN='mpirun --allow-run-as-root --hostfile hostfile'
+ # Wait for other nodes
+ while true; do
+ if ${MPIRUN} -n ${DSTACK_NODES_NUM} -N 1 true >/dev/null 2>&1; then
+ break
+ fi
+ echo 'Waiting for nodes...'
+ sleep 5
+ done
+ # Run NCCL Tests
+ ${MPIRUN} \
+ -n $((DSTACK_NODES_NUM * DSTACK_GPUS_PER_NODE)) -N ${DSTACK_GPUS_PER_NODE} \
+ --mca btl_tcp_if_exclude lo,docker0 \
+ --bind-to none \
+ ./all_reduce_perf -b 8 -e 8G -f 2 -g 1
+ # Notify nodes the job is done
+ ${MPIRUN} -n ${DSTACK_NODES_NUM} -N 1 sh -c "echo done > ${FIFO}"
+ else
+ mkfifo ${FIFO}
+ # Wait for a message from the first node
+ cat ${FIFO}
+ fi
+
+resources:
+ gpu: nvidia:4:16GB
+ shm_size: 16GB
diff --git a/examples/misc/nccl-tests/README.md b/examples/misc/nccl-tests/README.md
new file mode 100644
index 000000000..318312573
--- /dev/null
+++ b/examples/misc/nccl-tests/README.md
@@ -0,0 +1,79 @@
+# NCCL Tests
+
+This example shows how to run distributed [NCCL Tests :material-arrow-top-right-thin:{ .external }](https://github.com/NVIDIA/nccl-tests){:target="_blank"} with MPI using `dstack`.
+
+??? info "AWS EFA"
+ The used image is optimized for AWS [EFA :material-arrow-top-right-thin:{ .external }](https://aws.amazon.com/hpc/efa/){:target="_blank"} but works with regular TCP/IP network adapters as well.
+
+## Configuration
+
+This configuration runs AllReduce test on 2 nodes with 4 GPUs each (8 processes total), but you can adjust both `nodes` and `resources.gpu` without modifying the script.
+
+
+
+```yaml
+type: task
+name: nccl-tests
+
+image: un1def/aws-efa-test
+nodes: 2
+
+env:
+ - NCCL_DEBUG=INFO
+
+commands:
+ - |
+ # We use FIFO for inter-node communication
+ FIFO=/tmp/dstack_job
+ if [ ${DSTACK_NODE_RANK} -eq 0 ]; then
+ cd /root/nccl-tests/build
+ echo "${DSTACK_NODES_IPS}" > hostfile
+ MPIRUN='mpirun --allow-run-as-root --hostfile hostfile'
+ # Wait for other nodes
+ while true; do
+ if ${MPIRUN} -n ${DSTACK_NODES_NUM} -N 1 true >/dev/null 2>&1; then
+ break
+ fi
+ echo 'Waiting for nodes...'
+ sleep 5
+ done
+ # Run NCCL Tests
+ ${MPIRUN} \
+ -n $((DSTACK_NODES_NUM * DSTACK_GPUS_PER_NODE)) -N ${DSTACK_GPUS_PER_NODE} \
+ --mca btl_tcp_if_exclude lo,docker0 \
+ --bind-to none \
+ ./all_reduce_perf -b 8 -e 8G -f 2 -g 1
+ # Notify nodes the job is done
+ ${MPIRUN} -n ${DSTACK_NODES_NUM} -N 1 sh -c "echo done > ${FIFO}"
+ else
+ mkfifo ${FIFO}
+ # Wait for a message from the first node
+ cat ${FIFO}
+ fi
+
+resources:
+ gpu: nvidia:4:16GB
+ shm_size: 16GB
+
+```
+
+
+
+### Running a configuration
+
+To run a configuration, use the [`dstack apply`](https://dstack.ai/docs/reference/cli/dstack/apply/) command.
+
+
+
+```shell
+$ dstack apply -f examples/misc/nccl-tests/.dstack.yml
+
+ # BACKEND REGION INSTANCE RESOURCES SPOT PRICE
+ 1 aws us-east-1 g4dn.12xlarge 48xCPU, 192GB, 4xT4 (16GB), 100.0GB (disk) no $3.912
+ 2 aws us-west-2 g4dn.12xlarge 48xCPU, 192GB, 4xT4 (16GB), 100.0GB (disk) no $3.912
+ 3 aws us-east-2 g4dn.12xlarge 48xCPU, 192GB, 4xT4 (16GB), 100.0GB (disk) no $3.912
+
+Submit the run nccl-tests? [y/n]: y
+```
+
+
diff --git a/mkdocs.yml b/mkdocs.yml
index 961097f02..b146eec38 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -273,6 +273,7 @@ nav:
- Llama 3.2: examples/llms/llama32/index.md
- Misc:
- Docker Compose: examples/misc/docker-compose/index.md
+ - NCCL Tests: examples/misc/nccl-tests/index.md
# - Community: community.md
- Partners: partners.md
- Blog:
diff --git a/runner/internal/executor/executor.go b/runner/internal/executor/executor.go
index 6b4b6374d..3b6ee1cef 100644
--- a/runner/internal/executor/executor.go
+++ b/runner/internal/executor/executor.go
@@ -1,6 +1,7 @@
package executor
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -30,6 +31,7 @@ type RunExecutor struct {
tempDir string
homeDir string
workingDir string
+ sshPort int
uid uint32
run schemas.RunSpec
@@ -74,6 +76,7 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort i
tempDir: tempDir,
homeDir: homeDir,
workingDir: workingDir,
+ sshPort: sshPort,
uid: uid,
mu: mu,
@@ -322,15 +325,18 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
log.Warning(ctx, "failed to write SSH environment", "path", ex.homeDir, "err", err)
}
}
+ userSSHDir := ""
+ uid := -1
+ gid := -1
if user != nil && *user.Uid != 0 {
// non-root user
- uid := int(*user.Uid)
- gid := int(*user.Gid)
+ uid = int(*user.Uid)
+ gid = int(*user.Gid)
homeDir, isHomeDirAccessible := prepareHomeDir(ctx, uid, gid, user.HomeDir)
envMap["HOME"] = homeDir
if isHomeDirAccessible {
log.Trace(ctx, "provisioning homeDir", "path", homeDir)
- userSSHDir, err := prepareSSHDir(uid, gid, homeDir)
+ userSSHDir, err = prepareSSHDir(uid, gid, homeDir)
if err != nil {
log.Warning(ctx, "failed to prepare ssh dir", "home", homeDir, "err", err)
} else {
@@ -354,6 +360,17 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
} else {
// root user
envMap["HOME"] = ex.homeDir
+ userSSHDir = filepath.Join(ex.homeDir, ".ssh")
+ }
+
+ if ex.jobSpec.SSHKey != nil && userSSHDir != "" {
+ err := configureSSH(
+ ex.jobSpec.SSHKey.Private, ex.jobSpec.SSHKey.Public, ex.clusterInfo.JobIPs, ex.sshPort,
+ uid, gid, userSSHDir,
+ )
+ if err != nil {
+ log.Warning(ctx, "failed to configure SSH", "err", err)
+ }
}
cmd.Env = envMap.Render()
@@ -712,6 +729,56 @@ func writeSSHEnvironment(env map[string]string, uid int, gid int, envPath string
return nil
}
+func configureSSH(private string, public string, ips []string, port int, uid int, gid int, sshDir string) error {
+ privatePath := filepath.Join(sshDir, "dstack_job")
+ privateFile, err := os.OpenFile(privatePath, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0o600)
+ if err != nil {
+ return err
+ }
+ defer privateFile.Close()
+ if err := os.Chown(privatePath, uid, gid); err != nil {
+ return err
+ }
+ if _, err := privateFile.WriteString(private); err != nil {
+ return err
+ }
+
+ akPath := filepath.Join(sshDir, "authorized_keys")
+ akFile, err := os.OpenFile(akPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o600)
+ if err != nil {
+ return err
+ }
+ defer akFile.Close()
+ if err := os.Chown(akPath, uid, gid); err != nil {
+ return err
+ }
+ if _, err := akFile.WriteString(public); err != nil {
+ return err
+ }
+
+ configPath := filepath.Join(sshDir, "config")
+ configFile, err := os.OpenFile(configPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o600)
+ if err != nil {
+ return err
+ }
+ defer configFile.Close()
+ if err := os.Chown(configPath, uid, gid); err != nil {
+ return err
+ }
+ var configBuffer bytes.Buffer
+ for _, ip := range ips {
+ configBuffer.WriteString(fmt.Sprintf("\nHost %s\n", ip))
+ configBuffer.WriteString(fmt.Sprintf(" Port %d\n", port))
+ configBuffer.WriteString(" StrictHostKeyChecking no\n")
+ configBuffer.WriteString(" UserKnownHostsFile /dev/null\n")
+ configBuffer.WriteString(fmt.Sprintf(" IdentityFile %s\n", privatePath))
+ }
+ if _, err := configFile.Write(configBuffer.Bytes()); err != nil {
+ return err
+ }
+ return nil
+}
+
// A makeshift solution to deliver authorized_keys to a non-root user
// without modifying the existing API/bootstrap process
// TODO: implement key delivery properly, i.e. sumbit keys to and write by the runner,
diff --git a/runner/internal/schemas/schemas.go b/runner/internal/schemas/schemas.go
index 8d370e253..86318b0ec 100644
--- a/runner/internal/schemas/schemas.go
+++ b/runner/internal/schemas/schemas.go
@@ -54,6 +54,7 @@ type JobSpec struct {
Env map[string]string `json:"env"`
SingleBranch bool `json:"single_branch"`
MaxDuration int `json:"max_duration"`
+ SSHKey *SSHKey `json:"ssh_key"`
WorkingDir *string `json:"working_dir"`
}
@@ -63,6 +64,11 @@ type ClusterInfo struct {
GPUSPerJob int `json:"gpus_per_job"`
}
+type SSHKey struct {
+ Private string `json:"private"`
+ Public string `json:"public"`
+}
+
type RepoCredentials struct {
CloneURL string `json:"clone_url"`
PrivateKey *string `json:"private_key"`
diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py
index c12c81918..13fef3fc7 100644
--- a/src/dstack/_internal/core/models/runs.py
+++ b/src/dstack/_internal/core/models/runs.py
@@ -178,6 +178,11 @@ class Gateway(CoreModel):
options: dict = {}
+class JobSSHKey(CoreModel):
+ private: str
+ public: str
+
+
class JobSpec(CoreModel):
replica_num: int = 0 # default value for backward compatibility
job_num: int
@@ -198,6 +203,7 @@ class JobSpec(CoreModel):
requirements: Requirements
retry: Optional[Retry]
volumes: Optional[List[MountPoint]] = None
+ ssh_key: Optional[JobSSHKey] = None
# For backward compatibility with 0.18.x when retry_policy was required.
# TODO: remove in 0.19
retry_policy: ProfileRetryPolicy = ProfileRetryPolicy(retry=False)
diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py
index 9b5052964..0ab4387b7 100644
--- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py
+++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py
@@ -127,7 +127,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
run_model = res.unique().scalar_one()
repo_model = run_model.repo
project = run_model.project
- run = run_model_to_run(run_model)
+ run = run_model_to_run(run_model, include_sensitive=True)
job_submission = job_model_to_job_submission(job_model)
job_provisioning_data = job_submission.job_provisioning_data
if job_provisioning_data is None:
diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py
index 847fa0624..e0c69c750 100644
--- a/src/dstack/_internal/server/schemas/runner.py
+++ b/src/dstack/_internal/server/schemas/runner.py
@@ -64,6 +64,7 @@ class SubmitBody(CoreModel):
"gateway",
"single_branch",
"max_duration",
+ "ssh_key",
"working_dir",
}
),
diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py
index d144063fa..4bb00b289 100644
--- a/src/dstack/_internal/server/services/jobs/configurators/base.py
+++ b/src/dstack/_internal/server/services/jobs/configurators/base.py
@@ -21,6 +21,7 @@
from dstack._internal.core.models.runs import (
AppSpec,
JobSpec,
+ JobSSHKey,
Requirements,
Retry,
RunSpec,
@@ -30,6 +31,7 @@
from dstack._internal.core.services.profiles import get_retry
from dstack._internal.core.services.ssh.ports import filter_reserved_ports
from dstack._internal.server.services.docker import ImageConfig, get_image_config
+from dstack._internal.utils import crypto
from dstack._internal.utils.common import run_async
from dstack._internal.utils.interpolator import InterpolatorError, VariablesInterpolator
@@ -57,6 +59,8 @@ class JobConfigurator(ABC):
TYPE: RunConfigurationType
_image_config: Optional[ImageConfig] = None
+ # JobSSHKey should be shared for all jobs in a replica for inter-node communitation.
+ _job_ssh_key: Optional[JobSSHKey] = None
def __init__(self, run_spec: RunSpec):
self.run_spec = run_spec
@@ -123,6 +127,7 @@ async def _get_job_spec(
retry=self._retry(),
working_dir=self._working_dir(),
volumes=self._volumes(job_num),
+ ssh_key=self._ssh_key(jobs_per_replica),
)
return job_spec
@@ -238,6 +243,17 @@ def _python(self) -> str:
def _volumes(self, job_num: int) -> List[MountPoint]:
return interpolate_job_volumes(self.run_spec.configuration.volumes, job_num)
+ def _ssh_key(self, jobs_per_replica: int) -> Optional[JobSSHKey]:
+ if jobs_per_replica < 2:
+ return None
+ if self._job_ssh_key is None:
+ private, public = crypto.generate_rsa_key_pair_bytes(comment="dstack_job")
+ self._job_ssh_key = JobSSHKey(
+ private=private.decode(),
+ public=public.decode(),
+ )
+ return self._job_ssh_key
+
def interpolate_job_volumes(
run_volumes: List[Union[MountPoint, str]],
diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py
index 44f5520a6..d4f3afce5 100644
--- a/src/dstack/_internal/server/services/runs.py
+++ b/src/dstack/_internal/server/services/runs.py
@@ -342,8 +342,11 @@ async def get_plan(
job_offers.extend(offer for _, offer in offers)
job_offers.sort(key=lambda offer: not offer.availability.is_available())
+ job_spec = job.job_spec
+ _remove_job_spec_sensitive_info(job_spec)
+
job_plan = JobPlan(
- job_spec=job.job_spec,
+ job_spec=job_spec,
offers=job_offers[:50],
total_offers=len(job_offers),
max_price=max((offer.price for offer in job_offers), default=None),
@@ -619,7 +622,10 @@ async def delete_runs(
def run_model_to_run(
- run_model: RunModel, include_job_submissions: bool = True, return_in_api: bool = False
+ run_model: RunModel,
+ include_job_submissions: bool = True,
+ return_in_api: bool = False,
+ include_sensitive: bool = False,
) -> Run:
jobs: List[Job] = []
run_jobs = sorted(run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num))
@@ -634,6 +640,8 @@ def run_model_to_run(
for job_model in job_submissions:
if job_spec is None:
job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data)
+ if not include_sensitive:
+ _remove_job_spec_sensitive_info(job_spec)
if include_job_submissions:
job_submission = job_model_to_job_submission(job_model)
if return_in_api:
@@ -1046,3 +1054,7 @@ async def retry_run_replica_jobs(
# dirty hack to avoid passing all job submissions
new_job_model.submission_num = job_model.submission_num + 1
session.add(new_job_model)
+
+
+def _remove_job_spec_sensitive_info(spec: JobSpec):
+ spec.ssh_key = None
diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py
index 5795eaddc..a8d142934 100644
--- a/src/tests/_internal/server/routers/test_runs.py
+++ b/src/tests/_internal/server/routers/test_runs.py
@@ -214,6 +214,7 @@ def get_dev_env_run_plan_dict(
},
"retry": None,
"volumes": volumes,
+ "ssh_key": None,
"retry_policy": {"retry": False, "duration": None},
"working_dir": ".",
},
@@ -374,6 +375,7 @@ def get_dev_env_run_dict(
},
"retry": None,
"volumes": [],
+ "ssh_key": None,
"retry_policy": {"retry": False, "duration": None},
"working_dir": ".",
},
diff --git a/src/tests/_internal/server/services/jobs/configurators/test_task.py b/src/tests/_internal/server/services/jobs/configurators/test_task.py
new file mode 100644
index 000000000..4515209e6
--- /dev/null
+++ b/src/tests/_internal/server/services/jobs/configurators/test_task.py
@@ -0,0 +1,35 @@
+from unittest.mock import patch
+
+import pytest
+
+from dstack._internal.core.models.configurations import TaskConfiguration
+from dstack._internal.core.models.runs import JobSSHKey
+from dstack._internal.server.services.jobs.configurators.task import TaskJobConfigurator
+from dstack._internal.server.testing.common import get_run_spec
+
+
+@pytest.mark.asyncio
+@pytest.mark.usefixtures("image_config_mock")
+class TestTaskJobConfigurator:
+ async def test_ssh_key_single_node(self):
+ configuration = TaskConfiguration(nodes=1, image="debian")
+ run_spec = get_run_spec(run_name="run", repo_id="id", configuration=configuration)
+ configurator = TaskJobConfigurator(run_spec)
+
+ job_specs = await configurator.get_job_specs(replica_num=0)
+
+ assert len(job_specs) == 1
+ assert job_specs[0].ssh_key is None
+
+ async def test_ssh_key_multi_node(self):
+ configuration = TaskConfiguration(nodes=2, image="debian")
+ run_spec = get_run_spec(run_name="run", repo_id="id", configuration=configuration)
+ configurator = TaskJobConfigurator(run_spec)
+
+ with patch("dstack._internal.utils.crypto.generate_rsa_key_pair_bytes") as gen_mock:
+ gen_mock.side_effect = [(b"private1", b"public1"), (b"private2", b"public2")]
+ job_specs = await configurator.get_job_specs(replica_num=0)
+
+ assert len(job_specs) == 2
+ assert job_specs[0].ssh_key == JobSSHKey(private="private1", public="public1")
+ assert job_specs[1].ssh_key == JobSSHKey(private="private1", public="public1")