Skip to content

Commit

Permalink
Add GPU support (#24)
Browse files Browse the repository at this point in the history
* Fix private networking for EC2 mode

* Change public IP and fargate kwargs

For now we will assume that Fargate tasks need a public IP and EC2
tasks only use private IP. I've also separated out the fargate kwarg
into fargate_scheduler and fargate_workers to allow the scheduler
to have a Public IP on fargate and the workers to be private on EC2.

* Reinstate --fargate flag

* Undo public property

* Improve public ip method comments

* Update dask_cloudprovider/providers/aws/ecs.py

Co-Authored-By: Matthew Rocklin <mrocklin@gmail.com>

* Update documentation
  • Loading branch information
jacobtomlinson committed Aug 20, 2019
1 parent 6cdd707 commit 3fdb831
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 28 deletions.
15 changes: 14 additions & 1 deletion dask_cloudprovider/cli/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@

@click.command()
@click.option("--fargate", is_flag=True, help="Turn on fargate mode (default off)")
@click.option(
"--fargate-scheduler",
is_flag=True,
help="Turn on fargate mode for scheduler (default off)",
)
@click.option(
"--fargate-workers",
is_flag=True,
help="Turn on fargate mode for workers (default off)",
)
@click.option(
"--image",
type=str,
Expand Down Expand Up @@ -131,6 +141,8 @@
@click.version_option()
def main(
fargate,
fargate_scheduler,
fargate_workers,
image,
scheduler_cpu,
scheduler_mem,
Expand Down Expand Up @@ -163,7 +175,8 @@ def main(
logger.info("Starting ECS cluster")
try:
cluster = ECSCluster(
fargate=fargate,
fargate_scheduler=fargate_scheduler or fargate,
fargate_workers=fargate_workers or fargate,
image=image,
scheduler_cpu=scheduler_cpu,
scheduler_mem=scheduler_mem,
Expand Down
95 changes: 77 additions & 18 deletions dask_cloudprovider/providers/aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,14 @@ async def _():

@property
def _use_public_ip(self):
return True
# Fargate needs public IP for image pull, EC2 doesn't support public IP, therefore
# we will assume for now that we will use a public IP when in Fargate mode and not
# when in EC2 mode.

# TODO Fargate can also use a NAT to pull the image so we could allow this to be false
# when in Fargate provided there is a NAT

return self.fargate

async def _is_long_arn_format_enabled(self):
[response] = (
Expand Down Expand Up @@ -227,7 +234,8 @@ async def start(self):
NetworkInterfaceIds=[network_interface_id]
)
[interface] = eni["NetworkInterfaces"]
self.public_ip = interface["Association"]["PublicIp"]
if self._use_public_ip:
self.public_ip = interface["Association"]["PublicIp"]
self.private_ip = interface["PrivateIpAddresses"][0]["PrivateIpAddress"]
self.address = await self._get_address_from_logs()
self.status = "running"
Expand Down Expand Up @@ -340,14 +348,18 @@ class ECSCluster(SpecCluster):
Parameters
----------
fargate: bool (optional)
Select whether or not to use fargate.
fargate_scheduler: bool (optional)
Select whether or not to use fargate for the scheduler.
Defaults to ``False``. You must provide an existing cluster.
fargate_workers: bool (optional)
Select whether or not to use fargate for the workers.
Defaults to ``False``. You must provide an existing cluster.
image: str (optional)
The docker image to use for the scheduler and worker tasks.
Defaults to ``daskdev/dask:latest``.
Defaults to ``daskdev/dask:latest`` or ``rapidsai/rapidsai:latest`` if ``worker_gpu`` is set.
scheduler_cpu: int (optional)
The amount of CPU to request for the scheduler in milli-cpu (1/1024).
Expand All @@ -368,6 +380,14 @@ class ECSCluster(SpecCluster):
The amount of memory to request for worker tasks in MB.
Defaults to ``16384`` (16GB).
worker_gpu: int (optional)
The number of GPUs to expose to the worker.
To provide GPUs to workers you need to use a GPU ready docker image
that has ``dask-cuda`` installed and GPU nodes available in your ECS
cluster. Fargate is not supported at this time.
Defaults to `None`, no GPUs.
n_workers: int (optional)
Number of workers to start on cluster creation.
Expand Down Expand Up @@ -471,13 +491,15 @@ class ECSCluster(SpecCluster):

def __init__(
self,
fargate=False,
fargate_scheduler=False,
fargate_workers=False,
image=None,
scheduler_cpu=None,
scheduler_mem=None,
scheduler_timeout=None,
worker_cpu=None,
worker_mem=None,
worker_gpu=None,
n_workers=None,
cluster_arn=None,
cluster_name_template=None,
Expand All @@ -496,13 +518,15 @@ def __init__(
**kwargs
):
self._clients = None
self._fargate = fargate
self._fargate_scheduler = fargate_scheduler
self._fargate_workers = fargate_workers
self.image = image
self._scheduler_cpu = scheduler_cpu
self._scheduler_mem = scheduler_mem
self._scheduler_timeout = scheduler_timeout
self._worker_cpu = worker_cpu
self._worker_mem = worker_mem
self._worker_gpu = worker_gpu
self._n_workers = n_workers
self.cluster_arn = cluster_arn
self._cluster_name_template = cluster_name_template
Expand Down Expand Up @@ -541,14 +565,29 @@ async def _start(self,):
await _cleanup_stale_resources()

self._clients = await self._get_clients()
self._fargate = (
self.config.get("fargate", False)
if self._fargate is None
else self._fargate
self._fargate_scheduler = (
self.config.get("fargate_scheduler", False)
if self._fargate_scheduler is None
else self._fargate_scheduler
)
self._fargate_workers = (
self.config.get("fargate_workers", False)
if self._fargate_workers is None
else self._fargate_workers
)
self._tags = self.config.get("tags", {}) if self._tags is None else self._tags
self._worker_gpu = (
self.config.get("worker_gpu")
if self._worker_gpu is None
else self._worker_gpu
) # TODO Detect whether cluster is GPU capable
self.image = (
self.config.get("image", "daskdev/dask:latest")
self.config.get(
"image",
"rapidsai/rapidsai:latest"
if self._worker_gpu
else "daskdev/dask:latest",
)
if self.image is None
else self.image
)
Expand Down Expand Up @@ -669,16 +708,17 @@ async def _start(self,):
"security_groups": self._security_groups,
"log_group": self.cloudwatch_logs_group,
"log_stream_prefix": self._cloudwatch_logs_stream_prefix,
"fargate": self._fargate,
"environment": self._environment,
"tags": self.tags,
}
scheduler_options = {
"task_definition_arn": self.scheduler_task_definition_arn,
"fargate": self._fargate_scheduler,
**options,
}
worker_options = {
"task_definition_arn": self.worker_task_definition_arn,
"fargate": self._fargate_workers,
"cpu": self._worker_cpu,
"mem": self._worker_mem,
**options,
Expand Down Expand Up @@ -715,8 +755,13 @@ async def _close_clients(self):
await client.close()

async def _create_cluster(self):
if not self._fargate:
if not self._fargate_scheduler or not self._fargate_workers:
raise RuntimeError("You must specify a cluster when not using Fargate.")
if self._worker_gpu:
raise RuntimeError(
"It is not possible to use GPUs with Fargate. "
"Please provide an existing cluster with GPU instances available. "
)
self.cluster_name = dask.config.expand_environment_variables(
self._cluster_name_template
)
Expand Down Expand Up @@ -922,7 +967,7 @@ async def _create_scheduler_task_definition_arn(self):
}
],
volumes=[],
requiresCompatibilities=["FARGATE"] if self._fargate else [],
requiresCompatibilities=["FARGATE"] if self._fargate_scheduler else [],
cpu=str(self._scheduler_cpu),
memory=str(self._scheduler_mem),
tags=dict_to_aws(self.tags),
Expand All @@ -936,6 +981,11 @@ async def _delete_scheduler_task_definition_arn(self):
)

async def _create_worker_task_definition_arn(self):
resource_requirements = []
if self._worker_gpu:
resource_requirements.append(
{"type": "GPU", "value": str(self._worker_gpu)}
)
response = await self._clients["ecs"].register_task_definition(
family="{}-{}".format(self.cluster_name, "worker"),
taskRoleArn=self._task_role_arn,
Expand All @@ -948,8 +998,17 @@ async def _create_worker_task_definition_arn(self):
"cpu": self._worker_cpu,
"memory": self._worker_mem,
"memoryReservation": self._worker_mem,
"resourceRequirements": resource_requirements,
"essential": True,
"command": ["dask-worker"],
"command": [
"dask-cuda-worker" if self._worker_gpu else "dask-worker",
"--nthreads",
"{}".format(int(self._worker_cpu / 1024)),
"--memory-limit",
"{}MB".format(int(self._worker_mem)),
"--death-timeout",
"60",
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
Expand All @@ -962,7 +1021,7 @@ async def _create_worker_task_definition_arn(self):
}
],
volumes=[],
requiresCompatibilities=["FARGATE"] if self._fargate else [],
requiresCompatibilities=["FARGATE"] if self._fargate_workers else [],
cpu=str(self._worker_cpu),
memory=str(self._worker_mem),
tags=dict_to_aws(self.tags),
Expand Down Expand Up @@ -1009,7 +1068,7 @@ class FargateCluster(ECSCluster):
"""

def __init__(self, **kwargs):
super().__init__(fargate=True, **kwargs)
super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)


async def _cleanup_stale_resources():
Expand Down
47 changes: 38 additions & 9 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Welcome to dask-cloudprovider's documentation!
======================================
==============================================


Native Cloud integration for Dask. This library intends to allow people to
Expand All @@ -23,21 +23,50 @@ and region. The simplest way is to use the aws command line tool.
$ pip install awscli
$ aws configure
ECS
^^^
Fargate/ECS
^^^^^^^^^^^

The ``ECSCluster`` will create a new Fargate ECS cluster by default along with
all the IAM roles, security groups, etc that it needs to function.
The ``FargateCluster`` will create a new Fargate ECS cluster by default along
with all the IAM roles, security groups, etc that it needs to function.

.. code-block:: python
from dask_cloudprovider import ECSCluster
cluster = ECSCluster()
from dask_cloudprovider import FargateCluster
cluster = FargateCluster()
..
⚠ All AWS resources created by ``ECSCluster`` should be removed on garbage
collection. If the process is killed harshly this will not happen.
⚠ All AWS resources created by ``FargateCluster`` should be removed on
garbage collection. If the process is killed harshly this will not happen.

You can also create Dask clusters using EC2 based ECS clusters using
``ECSCluster``.

Creating the ECS cluster is out of scope for this library but you can pass in
the ARN of an existing one like this:

.. code-block:: python
from dask_cloudprovider import ECSCluster
cluster = ECSCluster(cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<clustername>")
All the other required resources such as roles, task definitions, tasks, etc
will be created automatically like in ``FargateCluster``.

GPU Support
~~~~~~~~~~~

There is also support in ``ECSCLuster`` for GPU aware dask clusters. To do
this you need to create an ECS cluster with GPU capable instances (from the
``p3`` or ``p3dn`` families) and specify the number of GPUs each worker task
should have.

.. code-block:: python
from dask_cloudprovider import ECSCluster
cluster = ECSCluster(
cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
worker_gpu=1)
.. toctree::
:maxdepth: 3
Expand Down

0 comments on commit 3fdb831

Please sign in to comment.