Skip to content

Commit

Permalink
Add support for AWS Fargate Spot clusters (#312)
Browse files Browse the repository at this point in the history
  • Loading branch information
anovv committed Oct 6, 2021
1 parent 3a62e90 commit e1e6a0f
Showing 1 changed file with 35 additions and 2 deletions.
37 changes: 35 additions & 2 deletions dask_cloudprovider/aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class Task:
log group when launching this task.
fargate: bool
Whether or not to launch with the Fargate launch type.
Whether or not to launch on Fargate.
environment: dict
Environment variables to set when launching the task.
Expand All @@ -97,6 +97,12 @@ class Task:
Whether to use a private IP (if True) or public IP (if False) with Fargate.
Defaults to False, i.e. public IP.
fargate_capacity_provider: str (optional)
If cluster is launched on Fargate with `fargate_spot=True`, use this capacity provider
(should be either `FARGATE` or `FARGATE_SPOT`).
If not set, `launchType=FARGATE` will be used.
Defaults to None.
task_kwargs: dict (optional)
Additional keyword arguments for the ECS task.
Expand Down Expand Up @@ -125,6 +131,7 @@ def __init__(
name=None,
platform_version=None,
fargate_use_private_ip=False,
fargate_capacity_provider=None,
task_kwargs=None,
**kwargs
):
Expand Down Expand Up @@ -152,6 +159,7 @@ def __init__(
self._find_address_timeout = find_address_timeout
self.platform_version = platform_version
self._fargate_use_private_ip = fargate_use_private_ip
self._fargate_capacity_provider = fargate_capacity_provider
self.kwargs = kwargs
self.task_kwargs = task_kwargs
self.status = Status.created
Expand Down Expand Up @@ -271,7 +279,13 @@ async def start(self):
# so that the default capacity provider of the ECS cluster or an alternate
# capacity provider can be specified. (dask/dask-cloudprovider#261)
if self.fargate:
kwargs["launchType"] = "FARGATE"
# Use launchType only if capacity provider is not specified
if not self._fargate_capacity_provider:
kwargs["launchType"] = "FARGATE"
else:
kwargs["capacityProviderStrategy"] = [
{"capacityProvider": self._fargate_capacity_provider}
]

async with self._client("ecs") as ecs:
response = await ecs.run_task(**kwargs)
Expand Down Expand Up @@ -452,6 +466,13 @@ class ECSCluster(SpecCluster):
fargate_workers: bool (optional)
Select whether or not to use fargate for the workers.
Defaults to ``False``. You must provide an existing cluster.
fargate_spot: bool (optional)
Select whether or not to run cluster using Fargate Spot with workers running on spot capacity.
If `fargate_scheduler=True` and `fargate_workers=True`, this will make sure worker tasks will use
`fargate_capacity_provider=FARGATE_SPOT` and scheduler task will use
`fargate_capacity_provider=FARGATE` capacity providers.
Defaults to ``False``. You must provide an existing cluster.
image: str (optional)
The docker image to use for the scheduler and worker tasks.
Expand Down Expand Up @@ -655,6 +676,7 @@ def __init__(
self,
fargate_scheduler=False,
fargate_workers=False,
fargate_spot=False,
image=None,
scheduler_cpu=None,
scheduler_mem=None,
Expand Down Expand Up @@ -694,6 +716,7 @@ def __init__(
):
self._fargate_scheduler = fargate_scheduler
self._fargate_workers = fargate_workers
self._fargate_spot = fargate_spot
self.image = image
self._scheduler_cpu = scheduler_cpu
self._scheduler_mem = scheduler_mem
Expand Down Expand Up @@ -764,6 +787,8 @@ async def _start(
self._fargate_scheduler = self.config.get("fargate_scheduler")
if self._fargate_workers is None:
self._fargate_workers = self.config.get("fargate_workers")
if self._fargate_spot is None:
self._fargate_spot = self.config.get("fargate_spot")

if self._tags is None:
self._tags = self.config.get("tags")
Expand Down Expand Up @@ -905,12 +930,14 @@ async def _start(
scheduler_options = {
"task_definition_arn": self.scheduler_task_definition_arn,
"fargate": self._fargate_scheduler,
"fargate_capacity_provider": "FARGATE" if self._fargate_spot else None,
"task_kwargs": self._scheduler_task_kwargs,
**options,
}
worker_options = {
"task_definition_arn": self.worker_task_definition_arn,
"fargate": self._fargate_workers,
"fargate_capacity_provider": "FARGATE_SPOT" if self._fargate_spot else None,
"cpu": self._worker_cpu,
"mem": self._worker_mem,
"gpu": self._worker_gpu,
Expand Down Expand Up @@ -1266,6 +1293,12 @@ class FargateCluster(ECSCluster):
>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster(image="<hub-user>/<repo-name>[:<tag>]")
To run cluster with workers using Fargate Spot
(<https://aws.amazon.com/blogs/aws/aws-fargate-spot-now-generally-available/>) set ``fargate_spot=True``
>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster(fargate_spot=True)
One strategy to ensure that package versions match between your custom environment and the Docker container is to
create your environment from an ``environment.yml`` file, export the exact package list for that environment using
``conda list --export > package-list.txt``, and then use the pinned package versions contained in
Expand Down

0 comments on commit e1e6a0f

Please sign in to comment.