New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Raise when reaching max chidren in Task.Supervisor.async_stream #8239
Conversation
lib/elixir/lib/task/supervised.ex
Outdated
@@ -411,6 +411,19 @@ defmodule Task.Supervised do | |||
send(pid, {self(), {monitor_ref, spawned}}) | |||
Map.put(waiting, spawned, {pid, :running}) | |||
|
|||
{:max_children, ^monitor_ref} -> | |||
stream_cleanup_inbox(monitor_pid, monitor_ref) | |||
Process.demonitor(monitor_ref, [:flush]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the thing I'm the least sure about. Basically, if the monitor process can't spawn a task because :max_children
, it will kill all other running tasks and send {:max_children, monitor_ref}
to the parent, then it will exit(:normal)
. The only other cases it exit(:normal)
s is when the parent process tells it to stop, but here we can't use stream_close
(and tell the monitor process to stop) because we need to stop all processes and the monitor process right away. Demonitoring here works but I'm not sure it's what we want since we don't do it anywhere else. Definitely open to better solutions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no guarantee the parent process is dead by the time we call stream_cleanup_inbox
. Therefore, if someone rescues this exception, we will have leaked messages to the inbox, since we are trapping exits. I would prefer if we could trigger stream_close
from here and go through the usual shutdown procedure.
In a nutshell, the monitor will deliver max_children
and start a new loop. Then we invoke stream_close
here, which will deliver the stop
message to terminate. And then we raise the error below. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a better solution yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@josevalim pushed a commit that fixes this. I think I did what you suggested. There's a bit of code repetition but it should work. Thoughts?
lib/elixir/lib/task/supervised.ex
Outdated
reached the maximum number of tasks for this task supervisor. The maximum number \ | ||
of tasks that are allowed to run at the same time under this supervisor can be \ | ||
configured with the :max_children option passed to Task.Supervisor.start_link/1. When \ | ||
using async_stream, make sure to configure :max_concurrency to be lower or equal to \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nitpick: should async_stream_nolink
be mentioned here?
|
||
{:error, :max_children} -> | ||
send(parent_pid, {:max_children, monitor_ref}) | ||
stream_waiting_for_stop_loop(running_tasks, config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove the duplication by adding a catch all
to this receive, something like:
...
other ->
handle_stop_or_exit(other, running_tasks, config)
stream_monitor_loop(running_tasks, config)
And then you just need to create a stream_stop_loop
that reuses the same logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I am not a huge fan of this solution because 1. it complicates the loop by adding indirection since now you don't see all possible messages and 2. it implies that handle_stop_or_exit
exits the monitor process. But it works. I pushed a commit, if you like it we can get that in in the meantime and I can try and think of a cleaner solution.
lib/elixir/lib/task/supervised.ex
Outdated
end | ||
end | ||
|
||
defp stream_waiting_for_stop_loop(running_tasks, config) do | ||
%{ | ||
parent_ref: parent_ref, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are no longer used
} = config | ||
|
||
receive do | ||
message -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to double check: this was matching before bbef098 on:
{:stop, ^monitor_ref}
{:DOWN, ^parent_ref, _, _, reason}
_other
(match all)
However, I think it now only matches on:
{:stop, ^monitor_ref}
(the first clause ofhandle_stop_or_parent_down
){:DOWN, ^parent_ref, _, _, reason}
(the second clause ofhandle_stop_or_parent_down
)
The previous match-all is gone. Could that produce undesired FunctionClauseError
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will lead to undefined FunctionClauseError. We need to ignore the message in case it is not the one we are looking for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with this is that before the stream_monitor_loop
didn't accept random messages as the catch-all clause was {:EXIT, ...}
. Not sure how to approach this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make handle_stop_or_parent_down
return true (if handled) or false (if unknown) and raise accordingly. handle_stop... || raise "unexpected message #{inspect other}"
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense. I'm now raising in stream_monitor_loop
but not in stream_waiting_for_stop_loop
. Should work :)
lib/elixir/lib/task/supervised.ex
Outdated
|
||
defp handle_stop_or_parent_down({:DOWN, parent_ref, _, _, reason}, running_tasks, %{ | ||
parent_ref: parent_ref | ||
}) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We rarely use this pattern in Elixir, can we please keep all arguments in a single line or one per line? Thanks. Same for the clauses above and then tests.
lib/elixir/lib/task/supervised.ex
Outdated
|
||
other -> | ||
handle_stop_or_parent_down(other, running_tasks, config) || | ||
raise "unexpected message: #{inspect(other)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this may not be a good idea. Could observer or other tools for example try to send it system message? 🤔 /cc @fishcakez
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well we weren't handling those before either, so might be something for another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before they would sit in the process inbox, now they will cause a crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is best if we revert the last commit and simply discard messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argh, that's correct. Reverted.
This reverts commit c31d0aa.
Closes #7786.