MB-7522 Optimize index state transitions (rebalance)
Optimize 2 types of common state transitions:

1) If a state transition consists only of moving partitions
between the passive and active states (or vice-versa), that
is, no new partitions are added nor existing partitions are
marked for cleanup, don't restart the updater (at the expense
of more complex logic to correct snapshots received from a
currently running updater).

2) If a state transition only adds new partitions to the
passive state (as ns_server's rebalance frequently calls),
don't restart the updater, and send the updater the new
list of passive partitions, so that it has a chance to
iterate over those partitions' content and index them in
the same run.

As long as the rebalancer in ns_server doesn't keep changing
its logic regarding state transitions all the time, this
provides a significant reduction of the indexing time. However
for at least some scenarios, there's still significant portion
of rebalance time not spent on indexing nor index compaction.

Change-Id: I2b7960fd73052ce476fbacee6c1aca4d01490d5a
Reviewed-by: Filipe David Borba Manana <>
Tested-by: Filipe David Borba Manana <>
fdmanana committed Jan 11, 2013
1 parent e64bba5 commit ac82c60
Showing 10 changed files with 514 additions and 80 deletions.
3 changes: 2 additions & 1 deletion src/couch_set_view/
Expand Up @@ -61,7 +61,8 @@ test_files = \
test/20-debug-params.t \
test/21-updater-cleanup.t \
test/22-compactor-cleanup.t \
test/23-replica-group-missing.t \

compiled_files = \
ebin/ \
143 changes: 115 additions & 28 deletions src/couch_set_view/src/couch_set_view_group.erl
Expand Up @@ -607,9 +607,9 @@ handle_call(replica_pid, _From, #state{replica_group = Pid} = State) ->
% To be used only by unit tests.
{reply, {ok, Pid}, State, ?TIMEOUT};

handle_call(start_updater, _From, State) ->
handle_call({start_updater, Options}, _From, State) ->
% To be used only by unit tests.
State2 = start_updater(State),
State2 = start_updater(State, Options),
{reply, {ok, State2#state.updater_pid}, State2, ?TIMEOUT};

handle_call(start_cleaner, _From, State) ->
Expand Down Expand Up @@ -721,7 +721,7 @@ handle_call({compact_done, Result}, {Pid, _}, #state{compactor_pid = Pid} = Stat
CurSeqs = indexable_partition_seqs(State),
[self(), NewGroup2, CurSeqs, false, updater_tmp_dir(State)]);
[self(), NewGroup2, CurSeqs, false, updater_tmp_dir(State), []]);
true ->
Expand Down Expand Up @@ -875,7 +875,7 @@ handle_cast({before_partition_delete, PartId}, #state{group = Group} = State) ->
false ->
State2 = State
State3 = update_partition_states([], [], [PartId], State2),
State3 = update_partition_states([], [], [PartId], State2, true),
{noreply, State3, ?TIMEOUT};
false ->
case lists:member(PartId, ReplicaParts) of
Expand Down Expand Up @@ -908,7 +908,7 @@ handle_cast({update, MinNumChanges}, #state{group = Group} = State) ->
MissingCount = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
case (MissingCount >= MinNumChanges) andalso (MissingCount > 0) of
true ->
{noreply, do_start_updater(State, CurSeqs)};
{noreply, do_start_updater(State, CurSeqs, [])};
false ->
{noreply, State}
Expand Down Expand Up @@ -1818,17 +1818,39 @@ maybe_update_partition_states(ActiveList0, PassiveList0, CleanupList0, State) ->
true ->
false ->
update_partition_states(ActiveList, PassiveList, CleanupList, State)
RestartUpdater = updater_needs_restart(
Group, ActiveMask, PassiveMask, CleanupMask),
NewState = update_partition_states(
ActiveList, PassiveList, CleanupList, State, RestartUpdater),
#state{group = NewGroup, updater_pid = UpdaterPid} = NewState,
case RestartUpdater of
false when is_pid(UpdaterPid) ->
case missing_partitions(Group, NewGroup) of
[] ->
MissingPassive ->
UpdaterPid ! {new_passive_partitions, MissingPassive}
_ ->

-spec update_partition_states(ordsets:ordset(partition_id()),
#state{}) -> #state{}.
update_partition_states(ActiveList, PassiveList, CleanupList, State) ->
boolean()) -> #state{}.
update_partition_states(ActiveList, PassiveList, CleanupList, State, RestartUpdater) ->
State2 = stop_cleaner(State),
#state{group = Group3} = State3 = stop_updater(State2),
case RestartUpdater of
true ->
#state{group = Group3} = State3 = stop_updater(State2);
false ->
#state{group = Group3} = State3 = State2
UpdaterWasRunning = is_pid(State#state.updater_pid),
ActiveInCleanup = partitions_still_in_cleanup(ActiveList, Group3),
PassiveInCleanup = partitions_still_in_cleanup(PassiveList, Group3),
Expand Down Expand Up @@ -2590,12 +2612,15 @@ process_last_updater_group(#state{updater_pid = Pid} = State, Group) ->

-spec start_updater(#state{}) -> #state{}.
start_updater(#state{updater_pid = Pid} = State) when is_pid(Pid) ->
start_updater(State) ->
start_updater(State, []).

-spec start_updater(#state{}, [term()]) -> #state{}.
start_updater(#state{updater_pid = Pid} = State, _Options) when is_pid(Pid) ->
start_updater(#state{group = #set_view_group{views = []}} = State) ->
start_updater(#state{group = #set_view_group{views = []}} = State, _Options) ->
start_updater(#state{updater_pid = nil, updater_state = not_running} = State) ->
start_updater(#state{updater_pid = nil, updater_state = not_running} = State, Options) ->
group = Group,
replica_partitions = ReplicaParts,
Expand All @@ -2604,26 +2629,26 @@ start_updater(#state{updater_pid = nil, updater_state = not_running} = State) ->
CurSeqs = indexable_partition_seqs(State),
case CurSeqs > ?set_seqs(Group) of
true ->
do_start_updater(State, CurSeqs);
do_start_updater(State, CurSeqs, Options);
false ->
WaitList2 = reply_with_group(Group, ReplicaParts, WaitList),
State#state{waiting_list = WaitList2}

-spec do_start_updater(#state{}, partition_seqs()) -> #state{}.
do_start_updater(State, CurSeqs) ->
-spec do_start_updater(#state{}, partition_seqs(), [term()]) -> #state{}.
do_start_updater(State, CurSeqs, Options) ->
group = Group,
compactor_pid = CompactPid
} = State2 = stop_cleaner(State),
?LOG_INFO("Starting updater for set view `~s`, ~s group `~s`",
[?set_name(State), ?type(State), ?group_id(State)]),
[?set_name(State), ?type(State), ?group_id(State)]),
TmpDir = updater_tmp_dir(State),
CompactRunning = is_pid(CompactPid) andalso is_process_alive(CompactPid),
Pid = spawn_link(couch_set_view_updater, update,
[self(), Group, CurSeqs, CompactRunning, TmpDir]),
[self(), Group, CurSeqs, CompactRunning, TmpDir, Options]),
updater_pid = Pid,
initial_build = couch_set_view_util:is_group_empty(Group),
Expand Down Expand Up @@ -2723,30 +2748,34 @@ maybe_fix_replica_group(ReplicaPid, Group) ->

-spec process_partial_update(#state{}, #set_view_group{}) -> #state{}.
process_partial_update(State, NewGroup) ->
process_partial_update(State, NewGroup0) ->
group = Group,
group = #set_view_group{fd = Fd} = Group,
update_listeners = Listeners
} = State,
Listeners2 = notify_update_listeners(State, Listeners, NewGroup),
ReplicasTransferred = ordsets:subtract(
?set_replicas_on_transfer(Group), ?set_replicas_on_transfer(NewGroup)),
case ReplicasTransferred of
?set_replicas_on_transfer(Group), ?set_replicas_on_transfer(NewGroup0)),
NewState = case ReplicasTransferred of
[] ->
State#state{group = NewGroup, update_listeners = Listeners2};
NewGroup1 = fix_updater_group(NewGroup0, Group),
State#state{group = NewGroup1};
_ ->
?LOG_INFO("Set view `~s`, ~s group `~s`, completed transferral of replica partitions ~w~n"
"New group of replica partitions to transfer is ~w~n",
[?set_name(State), ?type(State), ?group_id(State),
ReplicasTransferred, ?set_replicas_on_transfer(NewGroup)]),
ReplicasTransferred, ?set_replicas_on_transfer(NewGroup0)]),
ok = set_state(State#state.replica_group, [], [], ReplicasTransferred),
group = NewGroup,
update_listeners = Listeners2,
group = NewGroup0,
replica_partitions = ordsets:subtract(State#state.replica_partitions, ReplicasTransferred)
HeaderBin = couch_set_view_util:group_to_header_bin(,
ok = couch_file:write_header_bin(Fd, HeaderBin),
Listeners2 = notify_update_listeners(NewState, Listeners,,
ok = couch_file:flush(Fd),
NewState#state{update_listeners = Listeners2}.

-spec notify_update_listeners(#state{}, dict(), #set_view_group{}) -> dict().
Expand Down Expand Up @@ -3358,3 +3387,61 @@ updater_indexing_time() ->
StartTs = get_updater_start_time(),
LastCpTs = get_last_updater_checkpoint_ts(),
timer:now_diff(LastCpTs, StartTs) / 1000000.

-spec updater_needs_restart(#set_view_group{}, bitmask(),
bitmask(), bitmask()) -> boolean().
updater_needs_restart(Group, _, _, _) when ?set_replicas_on_transfer(Group) /= [] ->
updater_needs_restart(Group, ActiveMask, PassiveMask, CleanupMask) ->
BeforeIndexable = ?set_abitmask(Group) bor ?set_pbitmask(Group),
AfterActive = (?set_abitmask(Group) bor ActiveMask) band (bnot CleanupMask),
case AfterActive == ?set_abitmask(Group) of
true ->
NewCleanup = CleanupMask band BeforeIndexable,
AfterCleanup = ?set_cbitmask(Group) bor NewCleanup,
% If this state transition only adds new passive partitions, don't restart
% the updater. Send the updater the new set of passive partitions, and
% hopefully it will still be on time to index them, otherwise it will see
% them the next time it's started.
AfterCleanup =/= ?set_cbitmask(Group);
false ->
AfterIndexable = (BeforeIndexable bor ActiveMask bor PassiveMask) band (bnot CleanupMask),
% Don't restart updater when a state change request only transitions
% partitions from passive to active state (or vice-versa). This speeds
% up rebalance with consistent views enabled.
AfterIndexable =/= BeforeIndexable

-spec missing_partitions(#set_view_group{}, #set_view_group{}) ->
missing_partitions(UpdaterGroup, OurGroup) ->
MissingMask = (?set_pbitmask(OurGroup) bor ?set_abitmask(OurGroup)) bxor
(?set_pbitmask(UpdaterGroup) bor ?set_abitmask(UpdaterGroup)),

-spec fix_updater_group(#set_view_group{}, #set_view_group{}) ->
fix_updater_group(UpdaterGroup, OurGroup) ->
% Confront with logic in ?MODULE:updater_needs_restart/4.
Missing = missing_partitions(UpdaterGroup, OurGroup),
UpdaterHeader = UpdaterGroup#set_view_group.index_header,
Seqs2 = lists:foldl(
fun(PartId, Acc) ->
case couch_set_view_util:has_part_seq(PartId, Acc) of
true ->
false ->
ordsets:add_element({PartId, 0}, Acc)
?set_seqs(UpdaterGroup), Missing),
index_header = UpdaterHeader#set_view_index_header{
abitmask = ?set_abitmask(OurGroup),
pbitmask = ?set_pbitmask(OurGroup),
seqs = Seqs2

