Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/docs/concepts/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ resources:
</div>

Nodes can communicate using their private IP addresses.
Use `DSTACK_MASTER_NODE_IP`, `$DSTACK_NODE_RANK`, and other
Use `DSTACK_MASTER_NODE_IP`, `DSTACK_NODES_IPS`, `DSTACK_NODE_RANK`, and other
[System environment variables](#system-environment-variables)
to discover IP addresses and other details.

Expand All @@ -159,6 +159,11 @@ to discover IP addresses and other details.
# ... The rest of the commands
```

??? info "SSH"
You can log in to any node from any node via SSH on port 10022 using the `~/.ssh/dstack_job` private key.
For convenience, `~/.ssh/config` is preconfigured with these options, so a simple `ssh <node_ip>` is enough.
For a list of nodes IPs check the `DSTACK_NODES_IPS` environment variable.

!!! info "Fleets"
Distributed tasks can only run on fleets with
[cluster placement](fleets.md#cloud-placement).
Expand Down
10 changes: 10 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,14 @@ hide:
Use Docker and Docker Compose inside runs
</p>
</a>
<a href="/examples/misc/nccl-tests"
class="feature-cell sky">
<h3>
NCCL Tests
</h3>

<p>
Run multi-node NCCL Tests with MPI
</p>
</a>
</div>
Empty file.
42 changes: 42 additions & 0 deletions examples/misc/nccl-tests/.dstack.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
type: task
name: nccl-tests

image: un1def/aws-efa-test
nodes: 2

env:
- NCCL_DEBUG=INFO

commands:
- |
# We use FIFO for inter-node communication
FIFO=/tmp/dstack_job
if [ ${DSTACK_NODE_RANK} -eq 0 ]; then
cd /root/nccl-tests/build
echo "${DSTACK_NODES_IPS}" > hostfile
MPIRUN='mpirun --allow-run-as-root --hostfile hostfile'
# Wait for other nodes
while true; do
if ${MPIRUN} -n ${DSTACK_NODES_NUM} -N 1 true >/dev/null 2>&1; then
break
fi
echo 'Waiting for nodes...'
sleep 5
done
# Run NCCL Tests
${MPIRUN} \
-n $((DSTACK_NODES_NUM * DSTACK_GPUS_PER_NODE)) -N ${DSTACK_GPUS_PER_NODE} \
--mca btl_tcp_if_exclude lo,docker0 \
--bind-to none \
./all_reduce_perf -b 8 -e 8G -f 2 -g 1
# Notify nodes the job is done
${MPIRUN} -n ${DSTACK_NODES_NUM} -N 1 sh -c "echo done > ${FIFO}"
else
mkfifo ${FIFO}
# Wait for a message from the first node
cat ${FIFO}
fi

resources:
gpu: nvidia:4:16GB
shm_size: 16GB
79 changes: 79 additions & 0 deletions examples/misc/nccl-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# NCCL Tests

This example shows how to run distributed [NCCL Tests :material-arrow-top-right-thin:{ .external }](https://github.com/NVIDIA/nccl-tests){:target="_blank"} with MPI using `dstack`.

??? info "AWS EFA"
The used image is optimized for AWS [EFA :material-arrow-top-right-thin:{ .external }](https://aws.amazon.com/hpc/efa/){:target="_blank"} but works with regular TCP/IP network adapters as well.

## Configuration

This configuration runs AllReduce test on 2 nodes with 4 GPUs each (8 processes total), but you can adjust both `nodes` and `resources.gpu` without modifying the script.

<div editor-title="examples/misc/nccl-tests/.dstack.yml">

```yaml
type: task
name: nccl-tests

image: un1def/aws-efa-test
nodes: 2

env:
- NCCL_DEBUG=INFO

commands:
- |
# We use FIFO for inter-node communication
FIFO=/tmp/dstack_job
if [ ${DSTACK_NODE_RANK} -eq 0 ]; then
cd /root/nccl-tests/build
echo "${DSTACK_NODES_IPS}" > hostfile
MPIRUN='mpirun --allow-run-as-root --hostfile hostfile'
# Wait for other nodes
while true; do
if ${MPIRUN} -n ${DSTACK_NODES_NUM} -N 1 true >/dev/null 2>&1; then
break
fi
echo 'Waiting for nodes...'
sleep 5
done
# Run NCCL Tests
${MPIRUN} \
-n $((DSTACK_NODES_NUM * DSTACK_GPUS_PER_NODE)) -N ${DSTACK_GPUS_PER_NODE} \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@un-def Shouldn't we use DSTACK_GPUS_NUM instead of (DSTACK_NODES_NUM * DSTACK_GPUS_PER_NODE)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll update the docs in a separate PR. Thanks!

--mca btl_tcp_if_exclude lo,docker0 \
--bind-to none \
./all_reduce_perf -b 8 -e 8G -f 2 -g 1
# Notify nodes the job is done
${MPIRUN} -n ${DSTACK_NODES_NUM} -N 1 sh -c "echo done > ${FIFO}"
else
mkfifo ${FIFO}
# Wait for a message from the first node
cat ${FIFO}
fi

resources:
gpu: nvidia:4:16GB
shm_size: 16GB

```

</div>

### Running a configuration

To run a configuration, use the [`dstack apply`](https://dstack.ai/docs/reference/cli/dstack/apply/) command.

<div class="termy">

```shell
$ dstack apply -f examples/misc/nccl-tests/.dstack.yml

# BACKEND REGION INSTANCE RESOURCES SPOT PRICE
1 aws us-east-1 g4dn.12xlarge 48xCPU, 192GB, 4xT4 (16GB), 100.0GB (disk) no $3.912
2 aws us-west-2 g4dn.12xlarge 48xCPU, 192GB, 4xT4 (16GB), 100.0GB (disk) no $3.912
3 aws us-east-2 g4dn.12xlarge 48xCPU, 192GB, 4xT4 (16GB), 100.0GB (disk) no $3.912

Submit the run nccl-tests? [y/n]: y
```

</div>
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ nav:
- Llama 3.2: examples/llms/llama32/index.md
- Misc:
- Docker Compose: examples/misc/docker-compose/index.md
- NCCL Tests: examples/misc/nccl-tests/index.md
# - Community: community.md
- Partners: partners.md
- Blog:
Expand Down
73 changes: 70 additions & 3 deletions runner/internal/executor/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package executor

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -30,6 +31,7 @@ type RunExecutor struct {
tempDir string
homeDir string
workingDir string
sshPort int
uid uint32

run schemas.RunSpec
Expand Down Expand Up @@ -74,6 +76,7 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort i
tempDir: tempDir,
homeDir: homeDir,
workingDir: workingDir,
sshPort: sshPort,
uid: uid,

mu: mu,
Expand Down Expand Up @@ -322,15 +325,18 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
log.Warning(ctx, "failed to write SSH environment", "path", ex.homeDir, "err", err)
}
}
userSSHDir := ""
uid := -1
gid := -1
if user != nil && *user.Uid != 0 {
// non-root user
uid := int(*user.Uid)
gid := int(*user.Gid)
uid = int(*user.Uid)
gid = int(*user.Gid)
homeDir, isHomeDirAccessible := prepareHomeDir(ctx, uid, gid, user.HomeDir)
envMap["HOME"] = homeDir
if isHomeDirAccessible {
log.Trace(ctx, "provisioning homeDir", "path", homeDir)
userSSHDir, err := prepareSSHDir(uid, gid, homeDir)
userSSHDir, err = prepareSSHDir(uid, gid, homeDir)
if err != nil {
log.Warning(ctx, "failed to prepare ssh dir", "home", homeDir, "err", err)
} else {
Expand All @@ -354,6 +360,17 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
} else {
// root user
envMap["HOME"] = ex.homeDir
userSSHDir = filepath.Join(ex.homeDir, ".ssh")
}

if ex.jobSpec.SSHKey != nil && userSSHDir != "" {
err := configureSSH(
ex.jobSpec.SSHKey.Private, ex.jobSpec.SSHKey.Public, ex.clusterInfo.JobIPs, ex.sshPort,
uid, gid, userSSHDir,
)
if err != nil {
log.Warning(ctx, "failed to configure SSH", "err", err)
}
}

cmd.Env = envMap.Render()
Expand Down Expand Up @@ -712,6 +729,56 @@ func writeSSHEnvironment(env map[string]string, uid int, gid int, envPath string
return nil
}

func configureSSH(private string, public string, ips []string, port int, uid int, gid int, sshDir string) error {
privatePath := filepath.Join(sshDir, "dstack_job")
privateFile, err := os.OpenFile(privatePath, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0o600)
if err != nil {
return err
}
defer privateFile.Close()
if err := os.Chown(privatePath, uid, gid); err != nil {
return err
}
if _, err := privateFile.WriteString(private); err != nil {
return err
}

akPath := filepath.Join(sshDir, "authorized_keys")
akFile, err := os.OpenFile(akPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o600)
if err != nil {
return err
}
defer akFile.Close()
if err := os.Chown(akPath, uid, gid); err != nil {
return err
}
if _, err := akFile.WriteString(public); err != nil {
return err
}

configPath := filepath.Join(sshDir, "config")
configFile, err := os.OpenFile(configPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o600)
if err != nil {
return err
}
defer configFile.Close()
if err := os.Chown(configPath, uid, gid); err != nil {
return err
}
var configBuffer bytes.Buffer
for _, ip := range ips {
configBuffer.WriteString(fmt.Sprintf("\nHost %s\n", ip))
configBuffer.WriteString(fmt.Sprintf(" Port %d\n", port))
configBuffer.WriteString(" StrictHostKeyChecking no\n")
configBuffer.WriteString(" UserKnownHostsFile /dev/null\n")
configBuffer.WriteString(fmt.Sprintf(" IdentityFile %s\n", privatePath))
}
if _, err := configFile.Write(configBuffer.Bytes()); err != nil {
return err
}
return nil
}

// A makeshift solution to deliver authorized_keys to a non-root user
// without modifying the existing API/bootstrap process
// TODO: implement key delivery properly, i.e. sumbit keys to and write by the runner,
Expand Down
6 changes: 6 additions & 0 deletions runner/internal/schemas/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type JobSpec struct {
Env map[string]string `json:"env"`
SingleBranch bool `json:"single_branch"`
MaxDuration int `json:"max_duration"`
SSHKey *SSHKey `json:"ssh_key"`
WorkingDir *string `json:"working_dir"`
}

Expand All @@ -63,6 +64,11 @@ type ClusterInfo struct {
GPUSPerJob int `json:"gpus_per_job"`
}

type SSHKey struct {
Private string `json:"private"`
Public string `json:"public"`
}

type RepoCredentials struct {
CloneURL string `json:"clone_url"`
PrivateKey *string `json:"private_key"`
Expand Down
6 changes: 6 additions & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ class Gateway(CoreModel):
options: dict = {}


class JobSSHKey(CoreModel):
private: str
public: str


class JobSpec(CoreModel):
replica_num: int = 0 # default value for backward compatibility
job_num: int
Expand All @@ -198,6 +203,7 @@ class JobSpec(CoreModel):
requirements: Requirements
retry: Optional[Retry]
volumes: Optional[List[MountPoint]] = None
ssh_key: Optional[JobSSHKey] = None
# For backward compatibility with 0.18.x when retry_policy was required.
# TODO: remove in 0.19
retry_policy: ProfileRetryPolicy = ProfileRetryPolicy(retry=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
run_model = res.unique().scalar_one()
repo_model = run_model.repo
project = run_model.project
run = run_model_to_run(run_model)
run = run_model_to_run(run_model, include_sensitive=True)
job_submission = job_model_to_job_submission(job_model)
job_provisioning_data = job_submission.job_provisioning_data
if job_provisioning_data is None:
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/server/schemas/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class SubmitBody(CoreModel):
"gateway",
"single_branch",
"max_duration",
"ssh_key",
"working_dir",
}
),
Expand Down
16 changes: 16 additions & 0 deletions src/dstack/_internal/server/services/jobs/configurators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dstack._internal.core.models.runs import (
AppSpec,
JobSpec,
JobSSHKey,
Requirements,
Retry,
RunSpec,
Expand All @@ -30,6 +31,7 @@
from dstack._internal.core.services.profiles import get_retry
from dstack._internal.core.services.ssh.ports import filter_reserved_ports
from dstack._internal.server.services.docker import ImageConfig, get_image_config
from dstack._internal.utils import crypto
from dstack._internal.utils.common import run_async
from dstack._internal.utils.interpolator import InterpolatorError, VariablesInterpolator

Expand Down Expand Up @@ -57,6 +59,8 @@ class JobConfigurator(ABC):
TYPE: RunConfigurationType

_image_config: Optional[ImageConfig] = None
# JobSSHKey should be shared for all jobs in a replica for inter-node communitation.
_job_ssh_key: Optional[JobSSHKey] = None

def __init__(self, run_spec: RunSpec):
self.run_spec = run_spec
Expand Down Expand Up @@ -123,6 +127,7 @@ async def _get_job_spec(
retry=self._retry(),
working_dir=self._working_dir(),
volumes=self._volumes(job_num),
ssh_key=self._ssh_key(jobs_per_replica),
)
return job_spec

Expand Down Expand Up @@ -238,6 +243,17 @@ def _python(self) -> str:
def _volumes(self, job_num: int) -> List[MountPoint]:
return interpolate_job_volumes(self.run_spec.configuration.volumes, job_num)

def _ssh_key(self, jobs_per_replica: int) -> Optional[JobSSHKey]:
if jobs_per_replica < 2:
return None
if self._job_ssh_key is None:
private, public = crypto.generate_rsa_key_pair_bytes(comment="dstack_job")
self._job_ssh_key = JobSSHKey(
private=private.decode(),
public=public.decode(),
)
return self._job_ssh_key


def interpolate_job_volumes(
run_volumes: List[Union[MountPoint, str]],
Expand Down
Loading