Skip to content

Commit

Permalink
MB-7030 Check mail box for new group snapshot
Browse files Browse the repository at this point in the history
When stopping the updater, check if there's currently
any new group snapshot sent by him to us. If so, process
it.
Before this change it was ignored, meaning that the next
time the updater was restarted, it would repeat some work,
wasting CPU and IO.

Change-Id: I840bc797567a1d7c81f9970aaf4e1b9d20271b1e
Reviewed-on: http://review.couchbase.org/22008
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
Reviewed-by: Damien Katz <damien@couchbase.com>
Tested-by: Damien Katz <damien@couchbase.com>
  • Loading branch information
fdmanana authored and steveyen committed Oct 26, 2012
1 parent a2bef2f commit c2dd289
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 21 deletions.
54 changes: 35 additions & 19 deletions src/couch_set_view/src/couch_set_view_group.erl
Expand Up @@ -795,23 +795,6 @@ handle_cast(_Msg, State) when not ?is_defined(State) ->
handle_cast({log_eof, LogEof}, State) ->
{noreply, State#state{log_eof = LogEof}, ?TIMEOUT};

handle_cast({partial_update, Pid, NewGroup}, #state{updater_pid = Pid} = State) ->
case ?have_pending_transition(State) andalso
(?set_cbitmask(NewGroup) =:= 0) andalso
(?set_cbitmask(State#state.group) =/= 0) andalso
(State#state.waiting_list =:= []) of
true ->
State2 = process_partial_update(State, NewGroup),
State3 = stop_updater(State2),
NewState = maybe_apply_pending_transition(State3);
false ->
NewState = process_partial_update(State, NewGroup)
end,
{noreply, NewState};
handle_cast({partial_update, _, _}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State, ?TIMEOUT};

handle_cast({ddoc_updated, NewSig, Aliases}, State) ->
#state{
waiting_list = Waiters,
Expand Down Expand Up @@ -941,6 +924,23 @@ handle_info(timeout, #state{group = Group} = State) ->
{noreply, start_updater(State)}
end;

handle_info({partial_update, Pid, NewGroup}, #state{updater_pid = Pid} = State) ->
case ?have_pending_transition(State) andalso
(?set_cbitmask(NewGroup) =:= 0) andalso
(?set_cbitmask(State#state.group) =/= 0) andalso
(State#state.waiting_list =:= []) of
true ->
State2 = process_partial_update(State, NewGroup),
State3 = stop_updater(State2),
NewState = maybe_apply_pending_transition(State3);
false ->
NewState = process_partial_update(State, NewGroup)
end,
{noreply, NewState};
handle_info({partial_update, _, _}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State, ?TIMEOUT};

handle_info({updater_info, Pid, {state, UpdaterState}}, #state{updater_pid = Pid} = State) ->
#state{
group = Group,
Expand Down Expand Up @@ -2473,12 +2473,13 @@ stop_updater(#state{updater_pid = Pid} = State) when is_pid(Pid) ->
unlink(Pid),
?LOG_INFO("Stopping updater for set view `~s`, ~s group `~s`",
[?set_name(State), ?type(State), ?group_id(State)]),
State2 = process_last_updater_group(State, nil),
NewState = receive
{'EXIT', Pid, Reason} ->
after_updater_stopped(State, Reason);
after_updater_stopped(State2, Reason);
{'DOWN', MRef, process, Pid, Reason} ->
receive {'EXIT', Pid, _} -> ok after 0 -> ok end,
after_updater_stopped(State, Reason)
after_updater_stopped(State2, Reason)
end,
erlang:demonitor(MRef, [flush]),
NewState.
Expand Down Expand Up @@ -2533,6 +2534,21 @@ after_updater_stopped(State, Reason) ->
}.


-spec process_last_updater_group(#state{}, 'nil' | #set_view_group{}) -> #state{}.
process_last_updater_group(#state{updater_pid = Pid} = State, Group) ->
receive
{partial_update, Pid, NewGroup} ->
process_last_updater_group(State, NewGroup)
after 0 ->
case Group of
nil ->
State;
_ ->
process_partial_update(State, Group)
end
end.


-spec start_updater(#state{}) -> #state{}.
start_updater(#state{updater_pid = Pid} = State) when is_pid(Pid) ->
State;
Expand Down
4 changes: 2 additions & 2 deletions src/couch_set_view/src/couch_set_view_updater.erl
Expand Up @@ -1005,7 +1005,7 @@ maybe_checkpoint(WriterAcc) ->
put(last_header_commit_ts, Now);
false ->
#writer_acc{owner = Owner, parent = Parent, group = Group} = WriterAcc,
ok = gen_server:cast(Owner, {partial_update, Parent, Group})
Owner ! {partial_update, Parent, Group}
end.


Expand All @@ -1018,7 +1018,7 @@ checkpoint(#writer_acc{owner = Owner, parent = Parent, group = Group}, DoFsync)
?LOG_INFO("Updater checkpointing set view `~s` update for ~s group `~s`",
[SetName, Type, DDocId]),
write_header(Group, DoFsync),
ok = gen_server:cast(Owner, {partial_update, Parent, Group}).
Owner ! {partial_update, Parent, Group}.


write_header(#set_view_group{fd = Fd} = Group, DoFsync) ->
Expand Down

0 comments on commit c2dd289

Please sign in to comment.