Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine: Do not let DuplicateSubcriberError except a Process #5715

Merged
merged 2 commits into from
Oct 27, 2022

Conversation

sphuber
Copy link
Contributor

@sphuber sphuber commented Oct 22, 2022

Fixes #4598
Fixes #3973

It is possible that when a daemon worker tries to continue a process, that a kiwipy.DuplicateSubscriberError is raised, which means that there already is another system process that has subscribed to be running that process. This can occur for at least two reasons:

  • The user manually recreated the process task, mistakenly thinking it had been lost.
  • The daemon worker running the original task failed to respond to heartbeats of RabbitMQ in time and so RabbitMQ assumed the worker to be dead and rescheduled the task, sending it to another worker.

In both cases, the actual task is still actually being run and we should not let this exception except the entire process. Unfortunately, these duplicate tasks still happen quite a lot when the daemon workers are under heavy load and we don't want a bunch of processes to be excepted because of this.

In most cases, just ignoring the task wil be the best solution. In the end, the original task is likely to complete. If it turns out that the task actually got lost and the process is stuck, the user can find this error message in the logs and manually recreate the task, using for example verdi devel revive.

@sphuber
Copy link
Contributor Author

sphuber commented Oct 22, 2022

@ltalirz @muhrin @giovannipizzi I think this is an important fix to ship with the next release. Although it doesn't fully address (part) of the real problem of process tasks getting lost sometimes, it does solve the most important symptom of processes getting randomly excepted when the daemon workers are under heavy load. This should hopefully contribute a lot to the stability of AiiDA.

@ltalirz
Copy link
Member

ltalirz commented Oct 22, 2022

Thanks @sphuber for looking into these stability issues, in my view these are the most important problems to solve in AiiDA at the moment.

I understand that your fix addresses the problem when the user manually interferes and resubmits a task although the original was still running [1]

However, when workers are missing heartbeats although they are "still working properly in some sense", I guess the problem we need to address is those missing heartbeats. Do we have an idea what might be causing this?

If I understand correctly, with your current fix, rabbitmq would continue to requeue those tasks whenever the next heartbeat is missed, and the new tasks would then continue to be dismissed.
If the original worker is indeed still capable of finishing the task (how do we know?), then we should try to get rabbitmq to understand this.

[1] By the way, why would they do that? Is it that the daemon worker who took the task is overloaded, and so never actually gets to do the task, which can then let the process e.g. be stuck in CREATED state in the process list?

@sphuber
Copy link
Contributor Author

sphuber commented Oct 22, 2022

I understand that your fix addresses the problem when the user manually interferes and resubmits a task although the original was still running.

Not just that, it fixes a process excepting whenever a second worker tries to run it, for whatever reason. A user manually interfering is just one known pathway to this situation.

By the way, why would they do that? Is it that the daemon worker who took the task is overloaded, and so never actually gets to do the task, which can then let the process e.g. be stuck in CREATED state in the process list?

The problem is that there is some pathway where process tasks can get lost. I have seen people report this happening after restarting the machine or the daemon or some weird situation happened, but what the exact cause is, is not clear. If the task gets lost, the process will never be resolved. This can happen at any point in the lifetime of the process and is not exclusive to newly created processes.

The workaround is to recreate the task, which is done with the get_manager().get_process_controller().continue_process() trick (now exposed through verdi devel revive) but if used when the task actually still present, it creates a duplicate, which causes the DuplicateSubscriberIdentifier error to be raised in the second worker and the process excepts. The problem is that currently it is difficult to make sure whether a task actually still exists or not. I have seen multiple users resort to "reviving" the process now, thinking the task was lost when it actually wasn't, and leading to the exception described, losing their process entirely.

However, when workers are missing heartbeats although they are "still working properly in some sense", I guess the problem we need to address is those missing heartbeats. Do we have an idea what might be causing this?

At some point, it was easy for a daemon worker to miss responding to heartbeats, because if it was running for example an IO-task (retrieving large files of completed CalcJob) it could do nothing else, also not respond to a ping from RabbitMQ. We improved this at some point, by putting the communicator of the runner on a separate thread, allowing it to be more responsive, even if the runner was doing heavy blocking work on the main thread. I have tried to find exactly when we introduced this changed, but cannot find it anymore. It is very well possible that this is no longer a problem in v2.0. Note that the issue that originally reported this was with v1.5.

I think the current situation then is maybe not so much the heartbeat, but this other "unknown" pathway that can get process tasks to be lost. Some hard shutdown of the daemon, RabbitMQ or machine running either of those. It is simply not known.

Long story short, we have stayed inactive for far too long, and not submitting a fix, because we couldn't find the real cause of the problem. However, I think we should now have a practical workaround that will at least prevent users from losing their processes to random exceptions.

I am working on additional functionality that will make it easier to detect whether a task actually still exists. Once we have this, we can reliably detect missing tasks and suggest users to safely use verdi devel revive to revive the task and fix the problem.

If I understand correctly, with your current fix, rabbitmq would continue to requeue those tasks whenever the next heartbeat is missed, and the new tasks would then continue to be dismissed. If the original worker is indeed still capable of finishing the task (how do we know?), then we should try to get rabbitmq to understand this.

No, with this change, as soon as the exception is hit, the task is acknowledged and discarded. We therefore do indeed assume that the original runner will still eventually complete the process. This might not be the case, but in that case, we simply have the situation where the process task is actually lost and we can revive it. As said before, I am working on additional tools to make this analysis more robust so we can hopefully give bullet proof advice to the user on when to revive a process or not.

@sphuber sphuber force-pushed the fix/4598/duplicate-subscriber-identifier branch 6 times, most recently from b3c35d1 to 22f136b Compare October 22, 2022 18:36
@codecov
Copy link

codecov bot commented Oct 22, 2022

Codecov Report

Base: 79.68% // Head: 79.69% // Increases project coverage by +0.02% 🎉

Coverage data is based on head (61f257a) compared to base (dc1c0a5).
Patch coverage: 100.00% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5715      +/-   ##
==========================================
+ Coverage   79.68%   79.69%   +0.02%     
==========================================
  Files         491      491              
  Lines       36668    36671       +3     
==========================================
+ Hits        29215    29222       +7     
+ Misses       7453     7449       -4     
Impacted Files Coverage Δ
aiida/manage/external/rmq.py 76.29% <100.00%> (+8.21%) ⬆️
aiida/manage/tests/pytest_fixtures.py 83.46% <100.00%> (-1.43%) ⬇️
aiida/orm/utils/log.py 81.49% <0.00%> (-14.81%) ⬇️
aiida/engine/processes/process.py 91.75% <0.00%> (-0.45%) ⬇️
aiida/orm/nodes/process/process.py 89.43% <0.00%> (+0.97%) ⬆️
aiida/engine/processes/calcjobs/tasks.py 66.89% <0.00%> (+1.01%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

@sphuber sphuber force-pushed the fix/4598/duplicate-subscriber-identifier branch 6 times, most recently from 9e267bf to e4ddca7 Compare October 23, 2022 13:16
@sphuber
Copy link
Contributor Author

sphuber commented Oct 23, 2022

@ltalirz After a lot of debugging, I finally found the source of the failure of the test I added. I opened a separate PR for it #5717 as to not drown it out. In essence, various other non-related tests messed with the global variables of the config and didn't properly reset them, causing the daemon fixtures to break, and the test I added in this PR to fail. It tried starting the daemon (which it actually was) but the DaemonClient.is_daemon_running relies on checking the PID file, and it was looking in the wrong place.

@sphuber sphuber force-pushed the fix/4598/duplicate-subscriber-identifier branch 2 times, most recently from e083e22 to 4dcd3c7 Compare October 24, 2022 08:19
Copy link
Member

@ltalirz ltalirz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sphuber !

After our discussion, I think I understand the thought process.

Whether caused by automatic requeuing or by user intervention, excepting a process because of a DuplicateSubscriberIdentifier is not user-friendly, and warning about it in the logs instead is a better way forward (until the root cause of the missing tasks / progress on tasks is fixed).

Let's first merge the PR that fixes the test fixture and then this one.

Also, as we discussed, before we merge this could you please check

  • what is RabbitMQ's mechanism for clearing the subscriber list of a channel?
  • is it conceivable that rabbitmq requeues tasks because of a lost heartbeat before it had time to update the subscriber list of the corresponding channel?
  • do we have a test of this "subscriber clearing" mechanism somewhere (not necessarily in AiiDA)?

aiida/manage/external/rmq.py Outdated Show resolved Hide resolved
@sphuber sphuber force-pushed the fix/4598/duplicate-subscriber-identifier branch from 23858c6 to 7e6c2af Compare October 24, 2022 18:38
@sphuber sphuber requested a review from ltalirz October 24, 2022 18:38
ltalirz
ltalirz previously approved these changes Oct 24, 2022
Copy link
Member

@ltalirz ltalirz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @sphuber , the PR now looks good to merge to me

just waiting for the input on the rabbitmq side

@sphuber
Copy link
Contributor Author

sphuber commented Oct 25, 2022

I did some further digging to be able to answer your questions and found out that the situation is not quite as I originally thought. It seems that the tracking of subscribers is not done by RabbitMQ but by aiormq and kiwipy. It are these libraries that track which processes already have subscribed to the broadcast and RPC queues and raise if the same process subscribes again. This means that the DuplicateSubscriberIdentifier is only ever raised if the same worker tries to subscribe to a process again. This is because the "cache" of registered subscribes is kept in memory by the libraries and so not shared across daemon workers.

This means that although this PR will protect against a single daemon worker from working on the same process twice, it appears that it is possible that another daemon worker could start working on a process that is already being worked on, without incurring the exception. If this were to happen, I would expect an exception on our ORM level, because one of the two workers would perform an action, and then the second would come along and do the same that would eventually violate some check, for example the process already having been terminated.

I have to say that I am really suprised that we have had mostly reports so far of DuplicateSubscriberIdentifier exceptions, and not so much other "random" exceptions suggesting a process has been run twice. This could be explained by most people only running with a single worker or RabbitMQ by chance always sending the duplicate task to the same worker, but that seems unlikely.

I have discussed all of this with @muhrin and he seems to agree with the analysis. I have updated the description in the changes of the PR and the commit messages. We advise that we still merge this, since it should still provide some protection against these problems. Another problem is that the reports that we are basing most of this on are from 2 years ago and older. It would be good to run new stress tests with v2.1 (once the upgraded comms stack has been merged with updated plumpy/kiwipy/aio-pika/aiormq which should also provide more stability against connection loss) and see if and what problems persist.

@sphuber sphuber requested a review from ltalirz October 25, 2022 14:25
Copy link
Member

@ltalirz ltalirz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the DuplicateSubscriberIdentifier is only ever raised if the same worker tries to subscribe to a process again. This is because the "cache" of registered subscribes is kept in memory by the libraries and so not shared across daemon workers.

I see. So, the DuplicateSubscriberIdentifier is anyhow a rather weak guard.
In effect, this PR ends up treating the case of one worker being assigned the same process twice the same as two different workers being assigned the same process (except for the log message).
This makes sense to me.

This means that although this PR will protect against a single daemon worker from working on the same process twice, it appears that it is possible that another daemon worker could start working on a process that is already being worked on, without incurring the exception. If this were to happen, I would expect an exception on our ORM level, because one of the two workers would perform an action, and then the second would come along and do the same that would eventually violate some check, for example the process already having been terminated.

It might make sense to check this once. But I assume you are right.

@sphuber
Copy link
Contributor Author

sphuber commented Oct 27, 2022

It might make sense to check this once. But I assume you are right.

I have thought about how to do this, but it is not trivial, since we cannot directly control to which subscriber RabbitMQ sends a message. So if we create a duplicate, it may very well end up with the original worker. Maybe we can connect a worker with a single slot (so it will only ever take one task) and then launch two workers. I think that might work.

It is possible that when a daemon worker tries to continue a process,
that a ``kiwipy.DuplicateSubscriberError`` is raised. This happens when
the current worker has already subscribed itself with this process
identifier. The call to ``_continue`` will call ``Process.init`` which
will add RPC and broadcast subscribers. ``kiwipy`` and ``aiormq`` further
down keep track of processes that are already subscribed and if
subscribed again, a ``DuplicateSubscriberIdentifier`` is raised.

Possible reasons for the worker receiving a process task that it already
has, include:

  1. The user mistakenly recreates the corresponding task, thinking the
     original task was lost.
  2. RabbitMQ requeues the task because the daemon worker lost its
     connection or did not respond to the heartbeat in time, and the
     task is sent to the same worker once it regains connection.

Here we assume that the existence of another subscriber indicates that
the process is still being run by this worker. We thus ignore the request
to have the worker take it on again and acknowledge the current task.

If our assumption was wrong and the original task was no longer being
worked on, the user can resubmit the task once the list of subscribers of
the process has been cleared. Note: In the second case we are deleting
the *original* task, and once the worker finishes running the process
there won't be a task in RabbitMQ to acknowledge anymore.
This, however, is silently ignored.

Note: the exception is raised by ``kiwipy`` based on an internal cache it
and ``aiormq`` keep of the current subscribers. This means that this will
only occur when the tasks is resent to the *same* daemon worker. If
another worker were to receive it, no exception would be raised as the
check is client and not server based.
This is more expensive than the original idea of having it session
scoped since now the daemon is stopped after each function usage,
however, not stopping the daemon can cause tests to fail if previous
tests leave it in an inconsistent state.
@sphuber sphuber force-pushed the fix/4598/duplicate-subscriber-identifier branch from 4bf486e to 61f257a Compare October 27, 2022 09:10
@sphuber sphuber merged commit e50c7f5 into main Oct 27, 2022
@sphuber sphuber deleted the fix/4598/duplicate-subscriber-identifier branch October 27, 2022 09:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants