Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions runner/internal/shim/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand All @@ -24,6 +25,7 @@ import (
docker "github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/go-connections/nat"
"github.com/docker/go-units"
"github.com/dstackai/dstack/runner/consts"
"github.com/dstackai/dstack/runner/internal/shim/backends"
"github.com/icza/backscanner"
Expand Down Expand Up @@ -498,6 +500,7 @@ func createContainer(ctx context.Context, client docker.APIClient, runnerDir str
Tmpfs: tmpfs,
}
configureGpuIfAvailable(hostConfig)
configureHpcNetworkingIfAvailable(hostConfig)

log.Printf("Creating container %s:\nconfig: %v\nhostConfig:%v", taskConfig.ContainerName, containerConfig, hostConfig)
resp, err := client.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, taskConfig.ContainerName)
Expand Down Expand Up @@ -632,6 +635,31 @@ func configureGpuIfAvailable(hostConfig *container.HostConfig) {
}
}

func configureHpcNetworkingIfAvailable(hostConfig *container.HostConfig) {
// Although AWS EFA is not InfiniBand, EFA adapters are exposed as /dev/infiniband/uverbsN (N=0,1,...)
if _, err := os.Stat("/dev/infiniband"); !errors.Is(err, os.ErrNotExist) {
hostConfig.Resources.Devices = append(
hostConfig.Resources.Devices,
container.DeviceMapping{
PathOnHost: "/dev/infiniband",
PathInContainer: "/dev/infiniband",
CgroupPermissions: "rwm",
},
)
// Set max locked memory (ulimit -l) to unlimited. Fixes "Libfabric error: (-12) Cannot allocate memory".
// See: https://github.com/ofiwg/libfabric/issues/6437
// See: https://aws.amazon.com/blogs/compute/leveraging-efa-to-run-hpc-and-ml-workloads-on-aws-batch/
hostConfig.Ulimits = append(
hostConfig.Ulimits,
&units.Ulimit{
Name: "memlock",
Soft: -1,
Hard: -1,
},
)
}
}

func getVolumeMounts(mountPoints []MountPoint) ([]mount.Mount, error) {
mounts := []mount.Mount{}
for _, mountPoint := range mountPoints {
Expand Down
22 changes: 22 additions & 0 deletions src/dstack/_internal/core/backends/aws/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def create_instance(
{"Key": "dstack_user", "Value": instance_config.user},
]
disk_size = round(instance_offer.instance.resources.disk.size_mib / 1024)
max_efa_interfaces = get_maximum_efa_interfaces(
ec2_client=ec2_client, instance_type=instance_offer.instance.name
)
enable_efa = max_efa_interfaces > 0
try:
vpc_id, subnet_ids = get_vpc_id_subnet_id_or_error(
ec2_client=ec2_client,
Expand Down Expand Up @@ -183,6 +187,7 @@ def create_instance(
subnet_id=subnet_id,
allocate_public_ip=allocate_public_ip,
placement_group_name=instance_config.placement_group_name,
enable_efa=enable_efa,
)
)
instance = response[0]
Expand Down Expand Up @@ -579,6 +584,23 @@ def detach_volume(self, volume: Volume, instance_id: str):
logger.debug("Detached EBS volume %s from instance %s", volume.volume_id, instance_id)


def get_maximum_efa_interfaces(ec2_client: botocore.client.BaseClient, instance_type: str) -> int:
try:
response = ec2_client.describe_instance_types(
InstanceTypes=[instance_type],
Filters=[{"Name": "network-info.efa-supported", "Values": ["true"]}],
)
except botocore.exceptions.ClientError as e:
if e.response.get("Error", {}).get("Code") == "InvalidInstanceType":
# "The following supplied instance types do not exist: [<instance_type>]"
return 0
raise
instance_types = response["InstanceTypes"]
if not instance_types:
return 0
return instance_types[0]["NetworkInfo"]["EfaInfo"]["MaximumEfaInterfaces"]


def get_vpc_id_subnet_id_or_error(
ec2_client: botocore.client.BaseClient,
config: AWSConfig,
Expand Down
25 changes: 24 additions & 1 deletion src/dstack/_internal/core/backends/aws/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import botocore.exceptions

import dstack.version as version
from dstack._internal.core.errors import ComputeResourceNotFoundError
from dstack._internal.core.errors import ComputeError, ComputeResourceNotFoundError


def get_image_id(ec2_client: botocore.client.BaseClient, cuda: bool) -> str:
Expand Down Expand Up @@ -91,6 +91,15 @@ def create_security_group(
security_group_id=security_group_id,
rule={"IpProtocol": "-1"},
)
_add_egress_security_group_rule_if_missing(
ec2_client=ec2_client,
security_group=security_group,
security_group_id=security_group_id,
rule={
"IpProtocol": "-1",
"UserIdGroupPairs": [{"GroupId": security_group_id}],
},
)
return security_group_id


Expand All @@ -106,6 +115,7 @@ def create_instances_struct(
subnet_id: Optional[str] = None,
allocate_public_ip: bool = True,
placement_group_name: Optional[str] = None,
enable_efa: bool = False,
) -> Dict[str, Any]:
struct: Dict[str, Any] = dict(
BlockDeviceMappings=[
Expand Down Expand Up @@ -139,15 +149,28 @@ def create_instances_struct(
"InstanceInterruptionBehavior": "terminate",
},
}
if enable_efa and not subnet_id:
raise ComputeError("EFA requires subnet")
# AWS allows specifying either NetworkInterfaces for specific subnet_id
# or instance-level SecurityGroupIds in case of no specific subnet_id, not both.
if subnet_id is not None:
# Even if the instance type supports multiple cards, we always request only one interface
# due to the limitation: "AssociatePublicIpAddress [...] You cannot specify more than one
# network interface in the request".
# Error message: "(InvalidParameterCombination) when calling the RunInstances operation:
# The associatePublicIPAddress parameter cannot be specified when launching with
# multiple network interfaces".
# See: https://stackoverflow.com/questions/49882121
# If we need more than one card, we should either use Elastic IP (AWS-recommended way) or
# create the instance with one interface and add the rest later (the latter is not tested
# and may or may not work).
struct["NetworkInterfaces"] = [
{
"AssociatePublicIpAddress": allocate_public_ip,
"DeviceIndex": 0,
"SubnetId": subnet_id,
"Groups": [security_group_id],
"InterfaceType": "efa" if enable_efa else "interface",
},
]
else:
Expand Down