New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
"Rate exceeded" error on FargateCluster #56
Comments
Thanks for raising this @rsignell-usgs. Sounds like we aren't honouring the throttling backoffs well enough. We should be catching this exception and trying again with an exponential backoff. #44 applied this to log retrieval but we also should do something similar in other places. |
I'm having the same issue. |
I'm having a similar issue, except for me it's |
Would something like this be helpful? diff --git a/dask_cloudprovider/providers/aws/ecs.py b/dask_cloudprovider/providers/aws/ecs.py
index d178b3a..f115d08 100644
--- a/dask_cloudprovider/providers/aws/ecs.py
+++ b/dask_cloudprovider/providers/aws/ecs.py
@@ -28,6 +28,30 @@ DEFAULT_TAGS = {
} # Package tags to apply to all resources
+MAX_THROTTLING_TRIES = 10 # arbitrary...
+
+
+async def retry_when_throttled(
+ func, *args, max_tries=MAX_THROTTLING_TRIES, **kwargs,
+):
+ current_try = 0
+
+ while current_try < max_tries:
+ try:
+ return await func(*args, **kwargs)
+ except ClientError as e:
+ if e.response["Error"]["Code"] == "ThrottlingException":
+ warnings.warn(
+ "get_log_events rate limit exceeded, retrying after delay.",
+ RuntimeWarning,
+ )
+ backoff_duration = get_sleep_duration(current_try)
+ await asyncio.sleep(backoff_duration)
+ current_try += 1
+ else:
+ raise
+
+
class Task:
""" A superclass for managing ECS Tasks
Parameters
@@ -296,7 +320,6 @@ class Task:
)
async def logs(self, follow=False):
- current_try = 0
next_token = None
read_from = 0
@@ -304,13 +327,15 @@ class Task:
try:
async with self._client("logs") as logs:
if next_token:
- l = await logs.get_log_events(
+ l = await retry_when_throttled(
+ logs.get_log_events,
logGroupName=self.log_group,
logStreamName=self._log_stream_name,
nextToken=next_token,
)
else:
- l = await logs.get_log_events(
+ l = await retry_when_throttled(
+ logs.get_log_events,
logGroupName=self.log_group,
logStreamName=self._log_stream_name,
startTime=read_from,
@@ -327,18 +352,6 @@ class Task:
for event in l["events"]:
read_from = event["timestamp"]
yield event["message"]
- except ClientError as e:
- if e.response["Error"]["Code"] == "ThrottlingException":
- warnings.warn(
- "get_log_events rate limit exceeded, retrying after delay.",
- RuntimeWarning,
- )
- backoff_duration = get_sleep_duration(current_try)
- await asyncio.sleep(backoff_duration)
- current_try += 1
- else:
- raise
-
def __repr__(self):
return "<ECS Task %s: status=%s>" % (type(self).__name__, self.status)
|
This version is meant to honor diff --git a/dask_cloudprovider/providers/aws/ecs.py b/dask_cloudprovider/providers/aws/ecs.py
index d178b3a..503fbcf 100644
--- a/dask_cloudprovider/providers/aws/ecs.py
+++ b/dask_cloudprovider/providers/aws/ecs.py
@@ -28,6 +28,32 @@ DEFAULT_TAGS = {
} # Package tags to apply to all resources
+MAX_THROTTLING_TRIES = 10 # arbitrary...
+
+
+async def retry_when_throttled(
+ func, *args, max_tries=MAX_THROTTLING_TRIES, **kwargs,
+):
+ current_try = 0
+
+ while True:
+ try:
+ return await func(*args, **kwargs)
+ except ClientError as e:
+ if e.response["Error"]["Code"] == "ThrottlingException":
+ backoff_duration = get_sleep_duration(current_try)
+ current_try += 1
+ if current_try == max_tries:
+ raise
+ warnings.warn(
+ "get_log_events rate limit exceeded, retrying after delay.",
+ RuntimeWarning,
+ )
+ await asyncio.sleep(backoff_duration)
+ else:
+ raise
+
+
class Task:
""" A superclass for managing ECS Tasks
Parameters
@@ -296,7 +322,6 @@ class Task:
)
async def logs(self, follow=False):
- current_try = 0
next_token = None
read_from = 0
@@ -304,13 +329,15 @@ class Task:
try:
async with self._client("logs") as logs:
if next_token:
- l = await logs.get_log_events(
+ l = await retry_when_throttled(
+ logs.get_log_events,
logGroupName=self.log_group,
logStreamName=self._log_stream_name,
nextToken=next_token,
)
else:
- l = await logs.get_log_events(
+ l = await retry_when_throttled(
+ logs.get_log_events,
logGroupName=self.log_group,
logStreamName=self._log_stream_name,
startTime=read_from,
@@ -327,18 +354,6 @@ class Task:
for event in l["events"]:
read_from = event["timestamp"]
yield event["message"]
- except ClientError as e:
- if e.response["Error"]["Code"] == "ThrottlingException":
- warnings.warn(
- "get_log_events rate limit exceeded, retrying after delay.",
- RuntimeWarning,
- )
- backoff_duration = get_sleep_duration(current_try)
- await asyncio.sleep(backoff_duration)
- current_try += 1
- else:
- raise
-
def __repr__(self):
return "<ECS Task %s: status=%s>" % (type(self).__name__, self.status)
|
@lukeorland yes I think that kind of thing would be a great PR. I have a feeling the exception from AWS may contain information on how long to wait. It's pretty common for APIs that tell you to back off to suggest how long you should wait. |
I'm not sure how aiobotocore interacts with normal boto but you probably already have appropriate retry logic from boto and just need to tweak the parameters? See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html and perhaps try using adaptive mode with more retries. |
These features are not available in botocore or aiobotocore. And the higher level boto3 doesn't support aiobotocore. However we could definitely copy that behaviour here. |
Hmm, the code appears to exist in botocore: https://github.com/boto/botocore/tree/e0fc11c3785437368435a59c41021c0bcb86275f/botocore/retries |
Ah fantastic, thanks for pointing to that @zflamig! In that case yes we should go for it. |
@jacobtomlinson I've a reproducible example import pandas as pd
import numpy as np
import time
from dask.distributed import Client, progress
from dask import compute, delayed
from dask_cloudprovider import FargateCluster
def fun(fn):
m = int(1e5)
n = 10
columns = [f"col_{i+1:02d}" for i in range(n)]
df = pd.DataFrame(np.random.rand(m, n),
columns=columns)
df = df.astype(str)
time.sleep(2)
df.to_csv(fn, index=False)
my_vpc = # your vpc
my_subnets = # your subnets
bucket = # a bucket you have access to write
fldr = "tests"
# start FargateCluster
cpu = 0.5
ram = 1
cluster = FargateCluster(n_workers=1,
image='rpanai/feats-worker:2020-08-24',
vpc=my_vpc,
subnets=my_subnets,
worker_cpu=int(cpu * 1024),
worker_mem=int(ram * 1024),
cloudwatch_logs_group="my_log_group",
task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
scheduler_timeout='20 minutes'
# skip_cleanup=True
)
cluster.adapt(minimum=1,
maximum=100)
client = Client(cluster)
client
fns = [f"s3://{bucket}/{fldr}/{i+1:04d}.csv" for i in range(2000)]
to_process = [delayed(fun)(fn)
for fn in fns]
out = compute(to_process) where the image After a while I start to see these warnings
|
Yes so it looks like ECS is taking a long time to start your container and we are polling the status every second, which is eventually resulting in a It also seems that the boto libraries will automatically retry on their own four times before giving up and raising the exception. In this instance we probably want to rety forever as we need to just wait until the container has started. The So it would probably make sense for us to also retry with an exponential backoff to a maximum of 20 seconds but do so forever. @lukeorland seems to have proposed a suitable solution, however I don't think we need to worry about the max retries as that is just for boto to decide when to raise an exception. We are currently checking the status until the container stops pending or provisioning. So it is likely we will continuously poll, eventually hit an exception and increase our polling times until the request is successful. However once the request is successful that doesn't mean that the container is running, it may still be provisioning. In which case we will continue polling. I'm going to raise a quick PR to add a backoff here, but would support follow up PRs to make a more generic function to handle this. |
I tried the PR version and now the output is:
|
Thanks @rpanai. Does that happen consistently? |
I tried with cluster.adapt(minimum=1,
maximum=40) # reducing from 100
client = Client(cluster)
client And I didn't have any warning. In another experiment when I read and write to S3 I get the warning even when In the example with client.close()
cluster.close() takes forever and I still can see many workers (~80) still on several minutes after the computation is done. |
I tried with tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f8a513e04f0>>, <Task finished name='Task-3002' coro=<SpecCluster._correct_state_internal() done, defined at /home/ec2-user/SageMaker/kernels/features2/lib/python3.8/site-packages/distributed/deploy/spec.py:320> exception=RuntimeError({'tasks': [], 'failures': [{'reason': "You've reached the limit on the number of tasks you can run concurrently"}], 'ResponseMetadata': {'RequestId': '79d015b6-6cdc-451d-a6e4-69b081eda0cb', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '79d015b6-6cdc-451d-a6e4-69b081eda0cb', 'content-type': 'application/x-amz-json-1.1', 'content-length': '111', 'date': 'Tue, 25 Aug 2020 15:00:02 GMT'}, 'RetryAttempts': 0}})>)
Traceback (most recent call last):
File "/home/ec2-user/SageMaker/kernels/features2/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/home/ec2-user/SageMaker/kernels/features2/lib/python3.8/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/home/ec2-user/SageMaker/kernels/features2/lib/python3.8/site-packages/distributed/deploy/spec.py", line 355, in _correct_state_internal
await w # for tornado gen.coroutine support
File "/home/ec2-user/SageMaker/kernels/features2/lib/python3.8/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 144, in _
await self.start()
File "/home/ec2-user/SageMaker/kernels/features2/lib/python3.8/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 202, in start
while timeout.run():
File "/home/ec2-user/SageMaker/kernels/features2/lib/python3.8/site-packages/dask_cloudprovider/utils/timeout.py", line 74, in run
raise self.exception
File "/home/ec2-user/SageMaker/kernels/features2/lib/python3.8/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 241, in start
raise RuntimeError(response) # print entire response
RuntimeError: {'tasks': [], 'failures': [{'reason': "You've reached the limit on the number of tasks you can run concurrently"}], 'ResponseMetadata': {'RequestId': '79d015b6-6cdc-451d-a6e4-69b081eda0cb', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '79d015b6-6cdc-451d-a6e4-69b081eda0cb', 'content-type': 'application/x-amz-json-1.1', 'content-length': '111', 'date': 'Tue, 25 Aug 2020 15:00:02 GMT'}, 'RetryAttempts': 0}} |
Yes that error is expected. In order to avoid that you will need to request AWS to increase your service limits. |
Hi @jacobtomlinson, should this issue be resolved with #124 ? I'm running Here's the traceback: Task exception was never retrieved
future: <Task finished name='Task-492' coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.8/asyncio/tasks.py:688> exception=RuntimeError('Worker failed to start')>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 163, in _
await self.start()
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 290, in start
raise RuntimeError("%s failed to start" % type(self).__name__)
RuntimeError: Worker failed to start
Task exception was never retrieved
future: <Task finished name='Task-494' coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.8/asyncio/tasks.py:688> exception=RuntimeError('Worker failed to start')>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 163, in _
await self.start()
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 290, in start
raise RuntimeError("%s failed to start" % type(self).__name__)
RuntimeError: Worker failed to start
Task exception was never retrieved
future: <Task finished name='Task-457' coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.8/asyncio/tasks.py:688> exception=RuntimeError('Worker failed to start')>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 163, in _
await self.start()
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 290, in start
raise RuntimeError("%s failed to start" % type(self).__name__)
RuntimeError: Worker failed to start
Task exception was never retrieved
future: <Task finished name='Task-451' coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.8/asyncio/tasks.py:688> exception=RuntimeError('Worker failed to start')>
Traceback (most recent call last):
File "/usr/local/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 163, in _
await self.start()
File "/usr/local/lib/python3.8/site-packages/dask_cloudprovider/aws/ecs.py", line 290, in start
raise RuntimeError("%s failed to start" % type(self).__name__)
RuntimeError: Worker failed to start The example code: from dask_cloudprovider.aws import FargateCluster
cluster = FargateCluster(
n_workers=60,
vpc="VPC-ID",
security_groups=["SG-ID"],
subnets=["SN-ID"],
worker_cpu=512,
worker_mem=1024
) Thanks! |
@valpesendorfer can you share any errors from ECS on why they failed to start? |
Apologies, should have checked that myself. Looks like I've ran out of public IPs within my subnet - some workers didn't even spin up with the error:
So it's an AWS issue, not a dask-cloudprovider one. Thanks! |
Hi I'm still having the same problem despite using the latest version of Dask and dask-cloudprovider.
long after all the computation is done. As consequence my script never ends and, as I'm using Airflow in order to manage my pipeline, I got stucked. |
Hi - I'm getting the same error as a previous user in this thread.
To my knowledge, there isn't anything I can do to increase this from Service Quotas. This only happens when I send off a large Batch array job on Fargate. |
@michaellee1 would you mind opening a new issue, this one is closed and hasn't seen any activity for over a year. |
Changes by create-pull-request action
worked on my SageMaker instance on uswest-2, but failed my useast-1 instance with:
It looks like the PR #44 was designed to address at these problems, but it seems I'm still having them despite running v0.1.1 which includes that PR. I'm wondering whether others are still experiencing this?
The text was updated successfully, but these errors were encountered: