Permalink
Browse files

MB-5751 Improve stopping of the updater/cleaner processes

Monitor the processes before stopping them and log an explicit
message if the monitor message was caught. This will make it
easier to debug what happened when we attempt to stop any of
these processes and they had already finished and got their
exit message processed.

Change-Id: Ied04f8bab0b534c60a15b8d2530e07e57bfc3768
Reviewed-on: http://review.couchbase.org/17788
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information...
1 parent df44d6e commit f6a4ebf6bbdd609565ad2d220869eca1d8e63cf6 @fdmanana fdmanana committed with fdmanana Jul 1, 2012
Showing with 105 additions and 82 deletions.
  1. +105 −82 src/couch_set_view/src/couch_set_view_group.erl
@@ -2070,35 +2070,45 @@ maybe_start_cleaner(#state{group = Group} = State) ->
-spec stop_cleaner(#state{}) -> #state{}.
stop_cleaner(#state{cleaner_pid = nil} = State) ->
State;
-stop_cleaner(#state{cleaner_pid = Pid, group = OldGroup} = State) when is_pid(Pid) ->
+stop_cleaner(#state{cleaner_pid = Pid} = State) when is_pid(Pid) ->
+ Pid ! stop,
+ MRef = erlang:monitor(process, Pid),
?LOG_INFO("Stopping cleanup process for set view `~s`, group `~s`",
[?set_name(State), ?group_id(State)]),
- Pid ! stop,
- receive
- {'EXIT', Pid, {clean_group, NewGroup, Count, Time}} ->
- ?LOG_INFO("Stopped cleanup process for set view `~s`, ~s group `~s`.~n"
- "Removed ~p values from the index in ~.3f seconds~n"
- "New set of partitions to cleanup: ~w~n"
- "Old set of partitions to cleanup: ~w~n",
- [?set_name(State), ?type(State), ?group_id(State), Count, Time,
- couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup)),
- couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup))]),
- case ?set_cbitmask(NewGroup) of
- 0 ->
- inc_cleanups(State#state.group, Time, Count, false);
- _ ->
- ?inc_cleanup_stops(State#state.group)
- end,
- State#state{
- group = NewGroup,
- cleaner_pid = nil
- };
+ NewState = receive
{'EXIT', Pid, Reason} ->
- ?LOG_ERROR("Cleanup process ~p for set view `~s`, ~s group `~s`, died "
- "with reason: ~p",
- [Pid, ?set_name(State), ?type(State), ?group_id(State), Reason]),
- State#state{cleaner_pid = nil}
- end.
+ after_cleaner_stopped(State, Reason);
+ {'DOWN', MRef, process, Pid, Reason} ->
+ after_cleaner_stopped(State, Reason)
+ end,
+ erlang:demonitor(MRef, [flush]),
+ NewState.
+
+
+after_cleaner_stopped(State, {clean_group, NewGroup, Count, Time}) ->
+ #state{group = OldGroup} = State,
+ ?LOG_INFO("Stopped cleanup process for set view `~s`, ~s group `~s`.~n"
+ "Removed ~p values from the index in ~.3f seconds~n"
+ "New set of partitions to cleanup: ~w~n"
+ "Old set of partitions to cleanup: ~w~n",
+ [?set_name(State), ?type(State), ?group_id(State), Count, Time,
+ couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup)),
+ couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup))]),
+ case ?set_cbitmask(NewGroup) of
+ 0 ->
+ inc_cleanups(State#state.group, Time, Count, false);
+ _ ->
+ ?inc_cleanup_stops(State#state.group)
+ end,
+ State#state{
+ group = NewGroup,
+ cleaner_pid = nil
+ };
+after_cleaner_stopped(#state{cleaner_pid = Pid} = State, Reason) ->
+ ?LOG_ERROR("Cleanup process ~p for set view `~s`, ~s group `~s`, died "
+ "with reason: ~p",
+ [Pid, ?set_name(State), ?type(State), ?group_id(State), Reason]),
+ State#state{cleaner_pid = nil}.
-spec cleaner(#state{}) -> {'clean_group', #set_view_group{}, non_neg_integer(), float()}.
@@ -2195,65 +2205,78 @@ stop_updater(#state{updater_pid = nil} = State) ->
State;
stop_updater(#state{updater_pid = Pid} = State) when is_pid(Pid) ->
Pid ! stop_immediately,
+ MRef = erlang:monitor(process, Pid),
?LOG_INFO("Stopping updater for set view `~s`, ~s group `~s`",
[?set_name(State), ?type(State), ?group_id(State)]),
- receive
- {'EXIT', Pid, {updater_finished, Result}} ->
- #set_view_updater_result{
- group = NewGroup,
- state = UpdaterFinishState,
- indexing_time = IndexingTime,
- blocked_time = BlockedTime,
- inserted_ids = InsertedIds,
- deleted_ids = DeletedIds,
- inserted_kvs = InsertedKVs,
- deleted_kvs = DeletedKVs,
- cleanup_kv_count = CleanupKVCount
- } = Result,
- ?LOG_INFO("Set view `~s`, ~s group `~s`, updater stopped~n"
- "Indexing time: ~.3f seconds~n"
- "Blocked time: ~.3f seconds~n"
- "Inserted IDs: ~p~n"
- "Deleted IDs: ~p~n"
- "Inserted KVs: ~p~n"
- "Deleted KVs: ~p~n"
- "Cleaned KVs: ~p~n",
- [?set_name(State), ?type(State), ?group_id(State), IndexingTime, BlockedTime,
- InsertedIds, DeletedIds, InsertedKVs, DeletedKVs, CleanupKVCount]),
- State2 = process_partial_update(State, NewGroup),
- case UpdaterFinishState of
- updating_active ->
- inc_updates(State2#state.group, Result, true, true),
- WaitingList2 = State2#state.waiting_list;
- updating_passive ->
- PartialUpdate = (?set_pbitmask(NewGroup) =/= 0),
- inc_updates(State2#state.group, Result, PartialUpdate, false),
- reply_with_group(
- NewGroup, State2#state.replica_partitions, State2#state.waiting_list),
- WaitingList2 = []
- end,
- State2#state{
- updater_pid = nil,
- updater_state = not_running,
- waiting_list = WaitingList2
- };
+ NewState = receive
{'EXIT', Pid, Reason} ->
- Reply = case Reason of
- {updater_error, _} ->
- {error, element(2, Reason)};
- _ ->
- {error, Reason}
- end,
- ?LOG_ERROR("Updater, set view `~s`, ~s group `~s`, died with "
- "unexpected reason: ~p",
- [?set_name(State), ?type(State), ?group_id(State), Reason]),
- NewState = State#state{
- updater_pid = nil,
- updater_state = not_running
- },
- ?inc_updater_errors(NewState#state.group),
- reply_all(NewState, Reply)
- end.
+ after_updater_stopped(State, Reason);
+ {'DOWN', MRef, process, Pid, noproc} ->
+ ?LOG_ERROR("Updater, set view `~s`, ~s group `~s`, was not alive",
+ [?set_name(State), ?type(State), ?group_id(State)]),
+ State#state{updater_pid = nil, updater_state = not_running};
+ {'DOWN', MRef, process, Pid, Reason} ->
+ after_updater_stopped(State, Reason)
+ end,
+ erlang:demonitor(MRef, [flush]),
+ NewState.
+
+
+after_updater_stopped(State, {updater_finished, Result}) ->
+ #set_view_updater_result{
+ group = NewGroup,
+ state = UpdaterFinishState,
+ indexing_time = IndexingTime,
+ blocked_time = BlockedTime,
+ inserted_ids = InsertedIds,
+ deleted_ids = DeletedIds,
+ inserted_kvs = InsertedKVs,
+ deleted_kvs = DeletedKVs,
+ cleanup_kv_count = CleanupKVCount
+ } = Result,
+ ?LOG_INFO("Set view `~s`, ~s group `~s`, updater stopped~n"
+ "Indexing time: ~.3f seconds~n"
+ "Blocked time: ~.3f seconds~n"
+ "Inserted IDs: ~p~n"
+ "Deleted IDs: ~p~n"
+ "Inserted KVs: ~p~n"
+ "Deleted KVs: ~p~n"
+ "Cleaned KVs: ~p~n",
+ [?set_name(State), ?type(State), ?group_id(State), IndexingTime, BlockedTime,
+ InsertedIds, DeletedIds, InsertedKVs, DeletedKVs, CleanupKVCount]),
+ State2 = process_partial_update(State, NewGroup),
+ case UpdaterFinishState of
+ updating_active ->
+ inc_updates(State2#state.group, Result, true, true),
+ WaitingList2 = State2#state.waiting_list;
+ updating_passive ->
+ PartialUpdate = (?set_pbitmask(NewGroup) =/= 0),
+ inc_updates(State2#state.group, Result, PartialUpdate, false),
+ reply_with_group(
+ NewGroup, State2#state.replica_partitions, State2#state.waiting_list),
+ WaitingList2 = []
+ end,
+ State2#state{
+ updater_pid = nil,
+ updater_state = not_running,
+ waiting_list = WaitingList2
+ };
+after_updater_stopped(State, Reason) ->
+ Reply = case Reason of
+ {updater_error, _} ->
+ {error, element(2, Reason)};
+ _ ->
+ {error, Reason}
+ end,
+ ?LOG_ERROR("Updater, set view `~s`, ~s group `~s`, died with "
+ "unexpected reason: ~p",
+ [?set_name(State), ?type(State), ?group_id(State), Reason]),
+ State2 = State#state{
+ updater_pid = nil,
+ updater_state = not_running
+ },
+ ?inc_updater_errors(State2#state.group),
+ reply_all(State2, Reply).
-spec start_updater(#state{}) -> #state{}.

0 comments on commit f6a4ebf

Please sign in to comment.