Skip to content

Commit

Permalink
Merge pull request #200 from mrocklin/status
Browse files Browse the repository at this point in the history
Use Status enum
  • Loading branch information
quasiben committed Nov 30, 2020
2 parents fef4279 + d9b8400 commit d4a650b
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
13 changes: 7 additions & 6 deletions dask_cloudprovider/aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from distributed.deploy.spec import SpecCluster
from distributed.utils import warn_on_duration
from distributed.core import Status

try:
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -153,7 +154,7 @@ def __init__(
self._fargate_use_private_ip = fargate_use_private_ip
self.kwargs = kwargs
self.task_kwargs = task_kwargs
self.status = "created"
self.status = Status.created

def __await__(self):
async def _():
Expand Down Expand Up @@ -301,7 +302,7 @@ async def start(self):
self.public_ip = interface["Association"]["PublicIp"]
self.private_ip = interface["PrivateIpAddresses"][0]["PrivateIpAddress"]
await self._set_address_from_logs()
self.status = "running"
self.status = Status.running

async def close(self, **kwargs):
if self.task:
Expand All @@ -311,7 +312,7 @@ async def close(self, **kwargs):
while self.task["lastStatus"] in ["RUNNING"]:
await asyncio.sleep(1)
await self._update_task()
self.status = "closed"
self.status = Status.closed

@property
def task_id(self):
Expand Down Expand Up @@ -737,11 +738,11 @@ def _client(self, name: str):
async def _start(
self,
):
while self.status == "starting":
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == "running":
if self.status == Status.running:
return
if self.status == "closed":
if self.status == Status.closed:
raise ValueError("Cluster is closed")

self.config = dask.config.get("cloudprovider.ecs", {})
Expand Down
6 changes: 3 additions & 3 deletions dask_cloudprovider/azureml/azureml.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import dask
from contextlib import suppress
from distributed.deploy.cluster import Cluster
from distributed.core import rpc
from distributed.core import rpc, Status
from distributed.utils import LoopRunner, log_errors, format_bytes
from tornado.ioloop import PeriodicCallback

Expand Down Expand Up @@ -1053,7 +1053,7 @@ def scale_down(self, workers=1):

# close cluster
async def _close(self):
if self.status == "closed":
if self.status == Status.closed:
return

while self.workers_list:
Expand All @@ -1065,7 +1065,7 @@ async def _close(self):
self.run.complete()
self.run.cancel()

self.status = "closed"
self.status = Status.closed
self.__print_message("Scheduler and workers are disconnected.")

if self.portforward_proc is not None:
Expand Down
3 changes: 2 additions & 1 deletion dask_cloudprovider/cli/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import click
from distributed.cli.utils import check_python_3, install_signal_handlers
from distributed.core import Status
from tornado.ioloop import IOLoop, TimeoutError

from dask_cloudprovider.aws import ECSCluster
Expand Down Expand Up @@ -214,7 +215,7 @@ def main(

async def run():
logger.info("Ready")
while cluster.status != "closed":
while cluster.status != Status.closed:
await sleep(0.2)

def on_signal(signum):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
dask>=2.2.0
dask>=2.30
distributed>=2.30
jinja2

0 comments on commit d4a650b

Please sign in to comment.