Skip to content

Commit

Permalink
Prevent chttpd multipart zombie processes
Browse files Browse the repository at this point in the history
Occasionally it's possible to lose track of our RPC workers in the main
multipart parsing code. This change monitors each worker process and
then exits if all workers have exited before the parser considers itself
finished.

Fixes part of #745
  • Loading branch information
janl authored and davisp committed Feb 23, 2018
1 parent 6d959a7 commit 0832393
Showing 1 changed file with 65 additions and 12 deletions.
77 changes: 65 additions & 12 deletions src/couch/src/couch_httpd_multipart.erl
Expand Up @@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
abort_parsing ->
ok;
{get_bytes, Ref, From} ->
C2 = orddict:update_counter(From, 1, Counters),
NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
mp_parse_atts(eof, NewAcc);
C2 = update_writer(From, Counters),
case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of
abort_parsing ->
ok;
NewAcc ->
mp_parse_atts(eof, NewAcc)
end;
{'DOWN', ParentRef, _, _, _} ->
exit(mp_reader_coordinator_died)
after 3600000 ->
exit(mp_reader_coordinator_died);
{'DOWN', WriterRef, _, WriterPid, _} ->
case remove_writer(WriterPid, WriterRef, Counters) of
abort_parsing ->
ok;
C2 ->
NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]},
mp_parse_atts(eof, NewAcc)
end
after 300000 ->
ok
end
end.
Expand All @@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) ->

maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
receive {get_bytes, Ref, From} ->
NewCounters = orddict:update_counter(From, 1, Counters),
NewCounters = update_writer(From, Counters),
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
after 0 ->
% reply to as many writers as possible
NewWaiting = lists:filter(fun(Writer) ->
WhichChunk = orddict:fetch(Writer, Counters),
{_, WhichChunk} = orddict:fetch(Writer, Counters),
ListIndex = WhichChunk - Offset,
if ListIndex =< length(Chunks) ->
Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
Expand All @@ -132,11 +144,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
end, Waiting),

% check if we can drop a chunk from the head of the list
case Counters of
SmallestIndex = case Counters of
[] ->
SmallestIndex = 0;
0;
_ ->
SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
lists:min([C || {_WPid, {_WRef, C}} <- Counters])
end,
Size = length(Counters),
N = num_mp_writers(),
Expand All @@ -149,7 +161,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
end,

% we should wait for a writer if no one has written the last chunk
LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- Counters]),
if LargestIndex >= (Offset + length(Chunks)) ->
% someone has written all possible chunks, keep moving
{Ref, NewChunks, NewOffset, Counters, NewWaiting};
Expand All @@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
abort_parsing;
{'DOWN', ParentRef, _, _, _} ->
exit(mp_reader_coordinator_died);
{'DOWN', WriterRef, _, WriterPid, _} ->
case remove_writer(WriterPid, WriterRef, Counters) of
abort_parsing ->
abort_parsing;
C2 ->
RestWaiting = NewWaiting -- [WriterPid],
NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
maybe_send_data(NewAcc)
end;
{get_bytes, Ref, X} ->
C2 = orddict:update_counter(X, 1, Counters),
C2 = update_writer(X, Counters),
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
after 300000 ->
abort_parsing
end
end
end.


update_writer(WriterPid, Counters) ->
UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
InitialValue = case orddict:find(WriterPid, Counters) of
{ok, IV} ->
IV;
error ->
WriterRef = erlang:monitor(process, WriterPid),
{WriterRef, 1}
end,
orddict:update(WriterPid, UpdateFun, InitialValue, Counters).


remove_writer(WriterPid, WriterRef, Counters) ->
case orddict:find(WriterPid, Counters) of
{ok, {WriterRef, _}} ->
case num_mp_writers() of
N when N > 1 ->
num_mp_writers(N - 1);
_ ->
abort_parsing
end;
{ok, _} ->
% We got a different ref fired for a known worker
abort_parsing;
error ->
% Unknown worker pid?
abort_parsing
end.


num_mp_writers(N) ->
erlang:put(mp_att_writers, N).

Expand Down

0 comments on commit 0832393

Please sign in to comment.