Skip to content

Commit

Permalink
Backing off within _update_task to avoid ThrottlingException (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
nmerket committed Sep 27, 2021
1 parent fcc6cf3 commit 69897be
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions dask_cloudprovider/aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,22 @@ async def _is_long_arn_format_enabled(self):

async def _update_task(self):
async with self._client("ecs") as ecs:
[self.task] = (
await ecs.describe_tasks(
cluster=self.cluster_arn, tasks=[self.task_arn]
)
)["tasks"]
wait_duration = 1
while True:
try:
[self.task] = (
await ecs.describe_tasks(
cluster=self.cluster_arn, tasks=[self.task_arn]
)
)["tasks"]
except ClientError as e:
if e.response["Error"]["Code"] == "ThrottlingException":
wait_duration = min(wait_duration * 2, 20)
else:
raise
else:
break
await asyncio.sleep(wait_duration)

async def _set_address_from_logs(self):
timeout = Timeout(
Expand Down Expand Up @@ -275,17 +286,8 @@ async def start(self):
await asyncio.sleep(1)

self.task_arn = self.task["taskArn"]
wait_duration = 1
while self.task["lastStatus"] in ["PENDING", "PROVISIONING"]:
try:
await self._update_task()
wait_duration = 1
except ClientError as e:
if e.response["Error"]["Code"] == "ThrottlingException":
wait_duration = wait_duration * 2
else:
raise
await asyncio.sleep(min(wait_duration, 20))
await self._update_task()
if not await self._task_is_running():
raise RuntimeError("%s failed to start" % type(self).__name__)
[eni] = [
Expand Down

0 comments on commit 69897be

Please sign in to comment.