-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixup cluster_info
sync handling
#5488
Conversation
Previously a network blip would cause this periodic callback to log an error condition to the user every second. This fixes that in the following way: - We now catch error conditions and only log a warning after a few consecutive errors (so a single network blip will go unnoticed). - In the case of an error, we backoff a bit before retrying.
Test failures are unrelated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch @jcrist -- I can confirm this fixed the issue in #5472. Interestingly this async task + while
-loop approach is similar to what @jacobtomlinson initially started with in #5033.
@jacobtomlinson out of curiosity, is this periodic syncing behavior actually being used anywhere? Would it be sufficient to have a single message sent during cluster startup?
cc @jacobtomlinson @fjetter who may have thoughts on the changes here
While I left some comments, I don't mean for them to be blocking. This PR could be merged as is and would be to have included in the release tomorrow (xref dask/community#197)
self._sync_cluster_info, self._sync_interval * 1000 | ||
) | ||
# Start a background task for syncing cluster info with the scheduler | ||
self._sync_cluster_info_task = asyncio.ensure_future(self._sync_cluster_info()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think asyncio.create_task
is preferred over asyncio.ensure_future
for Python 3.7+
self._sync_cluster_info_task = asyncio.ensure_future(self._sync_cluster_info()) | |
self._sync_cluster_info_task = asyncio.create_task(self._sync_cluster_info()) |
self.status = Status.closing | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
|
||
async def error(*args, **kwargs): | ||
nonlocal error_called | ||
await asyncio.sleep(0.001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this sleep needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thanks for fixing this up @jcrist.
@jrbourbeau I haven't opened a PR that makes use of this yet, but it's on my backlog. We need to periodically sync because state on the cluster object changes. If you call scale for instance the number of workers changes. When reconstructing this object we need to know about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jcrist
Previously a network blip would cause this periodic callback to log an
error condition to the user every second. This fixes that in the
following way:
consecutive errors (so a single network blip will go unnoticed).
Fixes #5472.