Skip to content

Commit

Permalink
Extend GPU support to also support a Fargate scheduler (#25)
Browse files Browse the repository at this point in the history
* Fix private networking for EC2 mode

* Change public IP and fargate kwargs

For now we will assume that Fargate tasks need a public IP and EC2
tasks only use private IP. I've also separated out the fargate kwarg
into fargate_scheduler and fargate_workers to allow the scheduler
to have a Public IP on fargate and the workers to be private on EC2.

* Reinstate --fargate flag

* Undo public property

* Improve public ip method comments

* Add external address support

* Update documentation

* Bump minimum distributed version
  • Loading branch information
jacobtomlinson committed Aug 23, 2019
1 parent a554351 commit e16a730
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
13 changes: 9 additions & 4 deletions dask_cloudprovider/providers/aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def __init__(
):
self.lock = asyncio.Lock()
self.name = name
self.address = None
self.external_address = None
self._clients = clients
self.cluster_arn = cluster_arn
self.task_definition_arn = task_definition_arn
Expand Down Expand Up @@ -147,7 +149,7 @@ async def _update_task(self):
)
)["tasks"]

async def _get_address_from_logs(self):
async def _set_address_from_logs(self):
timeout = Timeout(
30, "Failed to find %s ip address after 30 seconds." % self.task_type
)
Expand All @@ -157,9 +159,12 @@ async def _get_address_from_logs(self):
if query_string in line:
address = line.split(query_string)[1].strip()
if self._use_public_ip:
address = address.replace(self.private_ip, self.public_ip)
self.external_address = address.replace(
self.private_ip, self.public_ip
)
logger.debug("%s", line)
return address
self.address = address
return
else:
if not await self._task_is_running():
raise RuntimeError("%s exited unexpectedly!" % type(self).__name__)
Expand Down Expand Up @@ -236,7 +241,7 @@ async def start(self):
if self._use_public_ip:
self.public_ip = interface["Association"]["PublicIp"]
self.private_ip = interface["PrivateIpAddresses"][0]["PrivateIpAddress"]
self.address = await self._get_address_from_logs()
await self._set_address_from_logs()
self.status = "running"

async def close(self, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
aiobotocore>=0.10.2
dask>=2.2.0
distributed>=2.2.0
distributed>=2.3.1

0 comments on commit e16a730

Please sign in to comment.