Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

More efficient index partition state transitions

If a request to make one (or more) partitions active/passive comes
and the partition is still under cleanup, don't block the caller
until the cleanup finishes. Instead reply immediately to caller and
apply the requested state transition later. This new approach
also offers more chances of avoiding unnecessary transitions/IO if
the caller later asks to cleanup the partition that it previously
asked to mark as active/passive and before this later transition
got applied.

This new behaviour, like the old behaviour, also guarantees that if
the caller gets a positive reply, there's no state information loss
if the server crashes after replying to the caller. The pending state
transition is written to the index header and fsync'ed before replying
to the caller.

Change-Id: I0704d66a33856e540e50f2f4f14f623e881acbe7
Reviewed-on: http://review.couchbase.org/14765
Tested-by: buildbot <build@couchbase.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
Reviewed-by: Damien Katz <damien@couchbase.com>
  • Loading branch information...
commit ad151b0509043cf7e9c462b4a1e24b18e86721d3 1 parent 7ef409d
@fdmanana fdmanana authored Damienkatz committed
View
10 src/couch_set_view/include/couch_set_view.hrl
@@ -82,7 +82,15 @@
id_btree_state = nil,
view_states = nil,
has_replica = false,
- replicas_on_transfer = []
+ replicas_on_transfer = [],
+ % Pending partition states transition.
+ pending_transition = nil % 'nil' | #set_view_transition{}
+}).
+
+-record(set_view_transition, {
+ active,
+ passive,
+ cleanup
}).
-record(set_view_debug_info, {
View
323 src/couch_set_view/src/couch_set_view_group.erl
@@ -44,6 +44,9 @@
is_integer(((State#state.group)#set_view_group.index_header)#set_view_index_header.num_partitions)).
-define(replicas_on_transfer(State),
((State#state.group)#set_view_group.index_header)#set_view_index_header.replicas_on_transfer).
+-define(get_pending_transition(State),
+ ((State#state.group)#set_view_group.index_header)#set_view_index_header.pending_transition).
+-define(have_pending_transition(State), ((?get_pending_transition(State)) /= nil)).
-define(MAX_HIST_SIZE, 20).
@@ -60,18 +63,10 @@
commit_ref = nil,
waiting_list = [],
cleaner_pid = nil,
- cleanup_waiters = [],
shutdown = false,
replica_partitions = []
}).
--record(cleanup_waiter, {
- from,
- active_list,
- passive_list,
- cleanup_list
-}).
-
-define(inc_stat(Group, S),
ets:update_counter(
?SET_VIEW_STATS_ETS,
@@ -307,7 +302,7 @@ do_init({_, SetName, _} = InitArgs) ->
true = ets:insert(
?SET_VIEW_STATS_ETS,
#set_view_group_stats{ets_key = ?set_view_group_stats_key(Group)}),
- {ok, InitState};
+ {ok, maybe_apply_pending_transition(InitState)};
Error ->
throw(Error)
end.
@@ -376,8 +371,7 @@ handle_call(is_view_defined, _From, #state{group = Group} = State) ->
handle_call({partition_deleted, master}, _From, State) ->
Error = {error, {db_deleted, ?master_dbname((?set_name(State)))}},
State2 = reply_all(State, Error),
- State3 = reply_all_cleanup_waiters(State2, Error),
- {stop, shutdown, shutdown, State3};
+ {stop, shutdown, shutdown, State2};
handle_call({partition_deleted, PartId}, _From, #state{group = Group} = State) ->
Mask = 1 bsl PartId,
case ((?set_abitmask(Group) band Mask) =/= 0) orelse
@@ -385,8 +379,7 @@ handle_call({partition_deleted, PartId}, _From, #state{group = Group} = State) -
true ->
Error = {error, {db_deleted, ?dbname((?set_name(State)), PartId)}},
State2 = reply_all(State, Error),
- State3 = reply_all_cleanup_waiters(State2, Error),
- {stop, shutdown, shutdown, State3};
+ {stop, shutdown, shutdown, State2};
false ->
{reply, ignore, State, ?TIMEOUT}
end;
@@ -394,11 +387,11 @@ handle_call({partition_deleted, PartId}, _From, #state{group = Group} = State) -
handle_call(_Msg, _From, State) when not ?is_defined(State) ->
{reply, view_undefined, State};
-handle_call({set_state, ActiveList, PassiveList, CleanupList}, From, State) ->
+handle_call({set_state, ActiveList, PassiveList, CleanupList}, _From, State) ->
try
- NewState = update_partition_states(
- ActiveList, PassiveList, CleanupList, From, State),
- {noreply, NewState, ?TIMEOUT}
+ NewState = maybe_update_partition_states(
+ ActiveList, PassiveList, CleanupList, State),
+ {reply, ok, NewState, ?TIMEOUT}
catch
throw:Error ->
{reply, Error, State}
@@ -430,9 +423,6 @@ handle_call({add_replicas, BitMask}, _From, #state{replica_group = ReplicaPid} =
BitMask2 bxor Common2
end,
Parts = ordsets:from_list(couch_set_view_util:decode_bitmask(BitMask3)),
- % TODO: Improve this. Technically the set_state on the replica group
- % can block us until its cleanup is finished. This happens when we
- % request it to add a group of partitions that are still marked for cleanup.
ok = set_state(ReplicaPid, [], Parts, []),
NewReplicaParts = ordsets:union(ReplicaParts, Parts),
?LOG_INFO("Set view `~s`, ~s group `~s`, defined new replica partitions: ~w~n"
@@ -615,9 +605,8 @@ handle_call({compact_done, Result}, {Pid, _}, #state{compactor_pid = Pid} = Stat
end,
group = NewGroup2
},
- State3 = notify_cleanup_waiters(State2),
- inc_compactions(State3#state.group, Result),
- {reply, ok, State3, ?TIMEOUT};
+ inc_compactions(State2#state.group, Result),
+ {reply, ok, State2, ?TIMEOUT};
false ->
?LOG_INFO("Set view `~s`, ~s group `~s`, compaction still behind, retrying",
[?set_name(State), ?type(State), ?group_id(State)]),
@@ -685,21 +674,23 @@ handle_info(timeout, State) ->
end;
handle_info({updater_info, Pid, {state, UpdaterState}}, #state{updater_pid = Pid} = State) ->
- #state{waiting_list = WaitList, cleanup_waiters = CleanupWaiters} = State,
+ #state{
+ group = Group,
+ waiting_list = WaitList,
+ replica_partitions = RepParts
+ } = State,
State2 = State#state{updater_state = UpdaterState},
case UpdaterState of
- updating_passive when WaitList =/= [] andalso CleanupWaiters =/= [] ->
- State3 = stop_updater(State2),
+ updating_passive ->
+ reply_with_group(Group, RepParts, WaitList),
case State#state.shutdown of
true ->
+ State3 = stop_updater(State2),
{stop, normal, State3};
false ->
- {noreply, start_updater(State3)}
+ State3 = maybe_apply_pending_transition(State2),
+ {noreply, State3#state{waiting_list = []}}
end;
- updating_passive when WaitList =/= [] ->
- reply_with_group(
- State2#state.group, State2#state.replica_partitions, WaitList),
- {noreply, State2#state{waiting_list = []}};
_ ->
{noreply, State2}
end;
@@ -749,7 +740,7 @@ handle_info({'EXIT', Pid, {clean_group, NewGroup, Count, Time}}, #state{cleaner_
group = NewGroup
},
inc_cleanups(State2#state.group, Time, Count),
- {noreply, notify_cleanup_waiters(State2)};
+ {noreply, maybe_apply_pending_transition(State2)};
handle_info({'EXIT', Pid, Reason}, #state{cleaner_pid = Pid} = State) ->
{stop, {cleaner_died, Reason}, State#state{cleaner_pid = nil}};
@@ -801,7 +792,7 @@ handle_info({'EXIT', Pid, {updater_finished, Result}}, #state{updater_pid = Pid}
waiting_list = [],
group = NewGroup
},
- State3 = notify_cleanup_waiters(State2),
+ State3 = maybe_apply_pending_transition(State2),
State4 = maybe_start_cleaner(State3),
{noreply, State4, ?TIMEOUT}
end;
@@ -861,7 +852,6 @@ terminate(Reason, State) ->
[?set_name(State), ?type(State), ?group_id(State), Reason]),
State2 = stop_cleaner(State),
State3 = reply_all(State2, Reason),
- reply_all_cleanup_waiters(State3, {shutdown, Reason}),
catch couch_db_set:close(?db_set(State3)),
couch_util:shutdown_sync(State3#state.updater_pid),
couch_util:shutdown_sync(State3#state.compactor_pid),
@@ -904,17 +894,6 @@ reply_all(#state{waiting_list = WaitList} = State, Reply) ->
State#state{waiting_list = []}.
-reply_all_cleanup_waiters(#state{cleanup_waiters = []} = State, _Reply) ->
- State;
-reply_all_cleanup_waiters(#state{cleanup_waiters = Waiters} = State, Reply) ->
- ?LOG_INFO("Set view `~s`, ~s group `~s`, replying to all cleanup waiters with: ~p",
- [?set_name(State), ?type(State), ?group_id(State), Reply]),
- lists:foreach(
- fun(#cleanup_waiter{from = F}) -> catch gen_server:reply(F, Reply) end,
- Waiters),
- State#state{cleanup_waiters = []}.
-
-
prepare_group({RootDir, SetName, #set_view_group{sig = Sig, type = Type} = Group}, ForceReset)->
case open_index_file(RootDir, SetName, Type, Sig) of
{ok, Fd} ->
@@ -954,9 +933,18 @@ prepare_group({RootDir, SetName, #set_view_group{sig = Sig, type = Type} = Group
Error
end.
-get_index_header_data(#set_view_group{id_btree = IdBtree, views = Views, index_header = Header}) ->
+get_index_header_data(Group) ->
+ #set_view_group{
+ id_btree = IdBtree,
+ views = Views,
+ index_header = Header
+ } = Group,
ViewStates = [
- {couch_btree:get_state(V#set_view.btree), V#set_view.update_seqs, V#set_view.purge_seqs} || V <- Views
+ {
+ couch_btree:get_state(V#set_view.btree),
+ V#set_view.update_seqs,
+ V#set_view.purge_seqs
+ } || V <- Views
],
Header#set_view_index_header{
id_btree_state = couch_btree:get_state(IdBtree),
@@ -1054,6 +1042,7 @@ get_group_info(State) ->
def_lang = Lang,
views = Views
} = Group,
+ PendingTrans = ?get_pending_transition(State),
[Stats] = ets:lookup(?SET_VIEW_STATS_ETS, ?set_view_group_stats_key(Group)),
JsonStats = {[
{full_updates, Stats#set_view_group_stats.full_updates},
@@ -1064,7 +1053,6 @@ get_group_info(State) ->
{cleanups, Stats#set_view_group_stats.cleanups},
{waiting_clients, length(WaitersList)},
{cleanup_interruptions, Stats#set_view_group_stats.cleanup_stops},
- {cleanup_blocked_processes, length(State#state.cleanup_waiters)},
{update_history, Stats#set_view_group_stats.update_history},
{compaction_history, Stats#set_view_group_stats.compaction_history},
{cleanup_history, Stats#set_view_group_stats.cleanup_history}
@@ -1087,7 +1075,18 @@ get_group_info(State) ->
{active_partitions, couch_set_view_util:decode_bitmask(?set_abitmask(Group))},
{passive_partitions, couch_set_view_util:decode_bitmask(?set_pbitmask(Group))},
{cleanup_partitions, couch_set_view_util:decode_bitmask(?set_cbitmask(Group))},
- {stats, JsonStats}
+ {stats, JsonStats},
+ {pending_transition, case PendingTrans of
+ nil ->
+ null;
+ #set_view_transition{} ->
+ {[
+ {active, PendingTrans#set_view_transition.active},
+ {passive, PendingTrans#set_view_transition.passive},
+ {cleanup, PendingTrans#set_view_transition.cleanup}
+ ]}
+ end
+ }
] ++
case (?type(State) =:= main) andalso is_pid(ReplicaPid) of
true ->
@@ -1226,7 +1225,7 @@ compare_seqs([{PartId, SeqA} | RestA], [{PartId, SeqB} | RestB]) ->
end.
-update_partition_states(ActiveList, PassiveList, CleanupList, From, State) ->
+maybe_update_partition_states(ActiveList, PassiveList, CleanupList, State) ->
#state{group = Group} = State,
ActiveMask = couch_set_view_util:build_bitmask(ActiveList),
case ActiveMask >= (1 bsl ?set_num_partitions(Group)) of
@@ -1253,56 +1252,133 @@ update_partition_states(ActiveList, PassiveList, CleanupList, From, State) ->
(PassiveMask bor ?set_pbitmask(Group)) =:= ?set_pbitmask(Group) andalso
(CleanupMask bor ?set_cbitmask(Group)) =:= ?set_cbitmask(Group) of
true ->
- gen_server:reply(From, ok),
State;
false ->
- do_update_partition_states(ActiveList, PassiveList, CleanupList, From, State)
+ update_partition_states(ActiveList, PassiveList, CleanupList, State)
end.
-do_update_partition_states(ActiveList, PassiveList, CleanupList, From, State) ->
- #state{cleanup_waiters = CleanupWaiters} = State2 = stop_cleaner(State),
- UpdaterRunning = is_pid(State2#state.updater_pid),
+update_partition_states(ActiveList, PassiveList, CleanupList, State) ->
+ case ?have_pending_transition(State) of
+ true ->
+ merge_into_pending_transition(ActiveList, PassiveList, CleanupList, State);
+ false ->
+ do_update_partition_states(ActiveList, PassiveList, CleanupList, State)
+ end.
+
+
+merge_into_pending_transition(ActiveList, PassiveList, CleanupList, State) ->
+ % Note: checking if there's an intersection between active, passive and
+ % cleanup lists must have been done already.
+ Pending = ?get_pending_transition(State),
+ Pending2 = merge_pending_active(Pending, ActiveList),
+ Pending3 = merge_pending_passive(Pending2, PassiveList),
+ Pending4 = merge_pending_cleanup(Pending3, CleanupList),
+ #set_view_transition{
+ active = ActivePending4,
+ passive = PassivePending4,
+ cleanup = CleanupPending4
+ } = Pending4,
+ State2 = set_pending_transition(State, Pending4),
+ ok = commit_header(State2#state.group, true),
+ ?LOG_INFO("Set view `~s`, ~s group `~s`, updated pending partition "
+ "states transition to:~n"
+ " Active partitions: ~w~n"
+ " Passive partitions: ~w~n"
+ " Cleanup partitions: ~w~n",
+ [?set_name(State), ?type(State), ?group_id(State),
+ ActivePending4, PassivePending4, CleanupPending4]),
+ maybe_apply_pending_transition(State2).
+
+
+merge_pending_active(Pending, ActiveList) ->
+ #set_view_transition{
+ active = ActivePending,
+ passive = PassivePending,
+ cleanup = CleanupPending
+ } = Pending,
+ Pending#set_view_transition{
+ active = ordsets:union(ActivePending, ActiveList),
+ passive = ordsets:subtract(PassivePending, ActiveList),
+ cleanup = ordsets:subtract(CleanupPending, ActiveList)
+ }.
+
+
+merge_pending_passive(Pending, PassiveList) ->
+ #set_view_transition{
+ active = ActivePending,
+ passive = PassivePending,
+ cleanup = CleanupPending
+ } = Pending,
+ #set_view_transition{
+ active = ordsets:subtract(ActivePending, PassiveList),
+ passive = ordsets:union(PassivePending, PassiveList),
+ cleanup = ordsets:subtract(CleanupPending, PassiveList)
+ }.
+
+
+merge_pending_cleanup(Pending, CleanupList) ->
+ #set_view_transition{
+ active = ActivePending,
+ passive = PassivePending,
+ cleanup = CleanupPending
+ } = Pending,
+ #set_view_transition{
+ active = ordsets:subtract(ActivePending, CleanupList),
+ passive = ordsets:subtract(PassivePending, CleanupList),
+ cleanup = ordsets:union(CleanupPending, CleanupList)
+ }.
+
+
+do_update_partition_states(ActiveList, PassiveList, CleanupList, State) ->
+ UpdaterRunning = is_pid(State#state.updater_pid),
+ State2 = stop_cleaner(State),
#state{group = Group3} = State3 = stop_updater(State2, immediately),
- {InCleanup, _NotInCleanup} =
- partitions_still_in_cleanup(ActiveList ++ PassiveList, Group3),
+ InCleanup = partitions_still_in_cleanup(ActiveList ++ PassiveList, Group3),
case InCleanup of
[] ->
- State4 = persist_partition_states(State3, ActiveList, PassiveList, CleanupList),
- gen_server:reply(From, ok);
+ State4 = persist_partition_states(State3, ActiveList, PassiveList, CleanupList);
_ ->
- ?LOG_INFO("Set view `~s`, ~s group `~s`, blocking client ~p, "
- "requesting partition state change because the following "
- "partitions are still in cleanup: ~w",
- [?set_name(State), ?type(State), ?group_id(State), element(1, From), InCleanup]),
- Waiter = #cleanup_waiter{
- from = From,
- active_list = ActiveList,
- passive_list = PassiveList,
- cleanup_list = CleanupList
+ ?LOG_INFO("Set view `~s`, ~s group `~s`, created pending partition "
+ "states transition, because the following partitions are still "
+ "in cleanup:~w~n~n"
+ "Pending partition states transition details:~n"
+ " Active partitions: ~w~n"
+ " Passive partitions: ~w~n"
+ " Cleanup partitions: ~w~n",
+ [?set_name(State), ?type(State), ?group_id(State),
+ InCleanup, ActiveList, PassiveList, CleanupList]),
+ Pending = #set_view_transition{
+ active = ActiveList,
+ passive = PassiveList,
+ cleanup = CleanupList
},
- State4 = State3#state{cleanup_waiters = CleanupWaiters ++ [Waiter]}
+ State4 = set_pending_transition(State3, Pending)
end,
+ after_partition_states_updated(State4, UpdaterRunning).
+
+
+after_partition_states_updated(State, UpdaterWasRunning) ->
case ?type(State) of
main ->
- State5 = case UpdaterRunning of
+ State2 = case UpdaterWasRunning of
true ->
% Updater was running, we stopped it, updated the group we received
% from the updater, updated that group's bitmasks and update/purge
% seqs, and now restart the updater with this modified group.
- start_updater(State4);
+ start_updater(State);
false ->
- State4
+ State
end,
- State6 = restart_compactor(State5, "partition states were updated"),
- maybe_start_cleaner(State6);
+ State3 = restart_compactor(State2, "partition states were updated"),
+ maybe_start_cleaner(State3);
replica ->
- State5 = restart_compactor(State4, "partition states were updated"),
- case is_pid(State5#state.compactor_pid) of
+ State2 = restart_compactor(State, "partition states were updated"),
+ case is_pid(State2#state.compactor_pid) of
true ->
- State5;
+ State2;
false ->
- maybe_update_replica_index(State5)
+ maybe_update_replica_index(State2)
end
end.
@@ -1381,6 +1457,37 @@ persist_partition_states(State, ActiveList, PassiveList, CleanupList) ->
State2.
+maybe_apply_pending_transition(State) when not ?have_pending_transition(State) ->
+ State;
+maybe_apply_pending_transition(State) ->
+ #set_view_transition{
+ active = ActivePending,
+ passive = PassivePending,
+ cleanup = CleanupPending
+ } = ?get_pending_transition(State),
+ InCleanup = partitions_still_in_cleanup(
+ ActivePending ++ PassivePending, State#state.group),
+ case InCleanup of
+ [] ->
+ ?LOG_INFO("Set view `~s`, ~s group `~s`, applying pending partition "
+ "states transition:~n"
+ " Active partitions: ~w~n"
+ " Passive partitions: ~w~n"
+ " Cleanup partitions: ~w~n",
+ [?set_name(State), ?type(State), ?group_id(State),
+ ActivePending, PassivePending, CleanupPending]),
+ UpdaterRunning = is_pid(State#state.updater_pid),
+ State2 = stop_cleaner(State),
+ State3 = stop_updater(State2, immediately),
+ State4 = set_pending_transition(State3, nil),
+ State5 = persist_partition_states(
+ State4, ActivePending, PassivePending, CleanupPending),
+ after_partition_states_updated(State5, UpdaterRunning);
+ _ ->
+ State
+ end.
+
+
set_passive_partitions([], Abitmask, Pbitmask, Seqs, PurgeSeqs) ->
{ok, Abitmask, Pbitmask, Seqs, PurgeSeqs};
@@ -1562,12 +1669,11 @@ stop_cleaner(#state{cleaner_pid = Pid, group = OldGroup} = State) when is_pid(Pi
_ ->
?inc_cleanup_stops(State#state.group)
end,
- State2 = State#state{
+ State#state{
group = NewGroup,
cleaner_pid = nil,
commit_ref = schedule_commit(State)
- },
- notify_cleanup_waiters(State2);
+ };
{'EXIT', Pid, Reason} ->
exit({cleanup_process_died, Reason})
end.
@@ -1743,12 +1849,11 @@ stop_updater(#state{updater_pid = Pid} = State, When) ->
NewGroup, State2#state.replica_partitions, State2#state.waiting_list),
WaitingList2 = []
end,
- NewState = State2#state{
+ State2#state{
updater_pid = nil,
updater_state = not_running,
waiting_list = WaitingList2
- },
- notify_cleanup_waiters(NewState);
+ };
{'EXIT', Pid, Reason} ->
Reply = case Reason of
{updater_error, _} ->
@@ -1805,46 +1910,17 @@ do_start_updater(State) ->
partitions_still_in_cleanup(Parts, Group) ->
- partitions_still_in_cleanup(Parts, Group, [], []).
+ partitions_still_in_cleanup(Parts, Group, []).
-partitions_still_in_cleanup([], _Group, AccStill, AccNot) ->
- {lists:reverse(AccStill), lists:reverse(AccNot)};
-partitions_still_in_cleanup([PartId | Rest], Group, AccStill, AccNot) ->
+partitions_still_in_cleanup([], _Group, Acc) ->
+ lists:reverse(Acc);
+partitions_still_in_cleanup([PartId | Rest], Group, Acc) ->
Mask = 1 bsl PartId,
case Mask band ?set_cbitmask(Group) of
Mask ->
- partitions_still_in_cleanup(Rest, Group, [PartId | AccStill], AccNot);
+ partitions_still_in_cleanup(Rest, Group, [PartId | Acc]);
0 ->
- partitions_still_in_cleanup(Rest, Group, AccStill, [PartId | AccNot])
- end.
-
-
-% TODO: instead of applying a group of state updates one by one and unblocking cleanup
-% waiters one by one, these state updates should be collapsed as soon they arrive and
-% applied all at once. This would also avoids the need to block clients when they ask
-% to mark partitions as active/passive when they're still in cleanup.
-notify_cleanup_waiters(#state{cleanup_waiters = []} = State) ->
- State;
-notify_cleanup_waiters(State) ->
- #state{group = Group, cleanup_waiters = [Waiter | RestWaiters]} = State,
- #cleanup_waiter{
- from = From,
- active_list = Active,
- passive_list = Passive,
- cleanup_list = Cleanup
- } = Waiter,
- {InCleanup, _NotInCleanup} =
- partitions_still_in_cleanup(Active ++ Passive, Group),
- case InCleanup of
- [] ->
- State2 = persist_partition_states(State, Active, Passive, Cleanup),
- % TODO: track how much time a cleanup waiter is blocked and log it
- ?LOG_INFO("Set view `~s`, ~s group `~s`, unblocking cleanup waiter ~p",
- [?set_name(State2), ?type(State), ?group_id(State2), element(1, From)]),
- gen_server:reply(From, ok),
- notify_cleanup_waiters(State2#state{cleanup_waiters = RestWaiters});
- _ ->
- State
+ partitions_still_in_cleanup(Rest, Group, Acc)
end.
@@ -2062,3 +2138,12 @@ inc_view_group_access_stats(#set_view_group_req{update_stats = true}, Group) ->
?inc_accesses(Group);
inc_view_group_access_stats(_Req, _Group) ->
ok.
+
+
+set_pending_transition(#state{group = Group} = State, Transition) ->
+ #set_view_group{index_header = IndexHeader} = Group,
+ IndexHeader2 = IndexHeader#set_view_index_header{
+ pending_transition = Transition
+ },
+ Group2 = Group#set_view_group{index_header = IndexHeader2},
+ State#state{group = Group2}.
View
24 test/python/set_view/cleanup.py
@@ -306,6 +306,30 @@ def do_test_change_partition_states_while_cleanup_running(self):
# print "Marking partitions 1 and 2 as active while cleanup is ongoing"
common.set_partition_states(self._params, active = [0, 1])
+ info = common.get_set_view_info(self._params)
+ self.assertEqual(type(info["pending_transition"]), dict, "pending_transition is an object")
+ self.assertEqual(sorted(info["pending_transition"]["active"]),
+ [0, 1],
+ "pending_transition active list is [0, 1]")
+ self.assertEqual(info["pending_transition"]["passive"],
+ [],
+ "pending_transition passive list is []")
+ self.assertEqual(info["pending_transition"]["cleanup"],
+ [],
+ "pending_transition cleanup list is []")
+
+ # print "Waiting for pending transition to be applied"
+ iterations = 0
+ while True:
+ if iterations > 600:
+ raise(Exception("timeout waiting for pending transition to be applied"))
+ info = common.get_set_view_info(self._params)
+ if info["pending_transition"] is None:
+ break
+ else:
+ time.sleep(1)
+ iterations += 1
+
# print "Querying view"
(resp, view_result) = common.query(self._params, "mapview1")
doc_count = common.set_doc_count(self._params, [0, 1, 2, 3])
View
16 test/python/set_view/replica_index.py
@@ -570,6 +570,7 @@ def test_replica_index(self):
# print "defining partitions [4, 5, 7] as replicas again"
common.add_replica_partitions(self._params, [4, 5, 7])
+ self.wait_for_pending_transition_applied()
info = common.get_set_view_info(self._params)
self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
@@ -1096,7 +1097,6 @@ def test_replica_index(self):
self.assertTrue(key in all_keys, "Key from partition 7 in view result")
-
def restart_compare_info(self, info_before, info_after):
# print "Restarting server"
time.sleep(1)
@@ -1108,3 +1108,17 @@ def restart_compare_info(self, info_before, info_after):
del info_after["replica_group_info"]["stats"]
self.assertEqual(info_after, info_before, "same index state after server restart")
+
+ def wait_for_pending_transition_applied(self):
+ # print "Waiting for pending transition to be applied"
+ iterations = 0
+ while True:
+ if iterations > 600:
+ raise(Exception("timeout waiting for pending transition to be applied"))
+ info = common.get_set_view_info(self._params)
+ if (info["pending_transition"] is None) and \
+ (info["replica_group_info"]["pending_transition"] is None):
+ break
+ else:
+ time.sleep(1)
+ iterations += 1
Please sign in to comment.
Something went wrong with that request. Please try again.