From f9c06d629525873f2130caac81992a0cb42ab01f Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Wed, 30 Oct 2013 17:40:20 -0400 Subject: [PATCH] Suggest moves from all donor nodes in parallel Previously the generator would suggest all moves from the first node before moving onto the second one. In the case where the quantum of jobs is much smaller than the number of moves per node this results in the other donors being neglected for long periods. BugzID: 24612 --- src/mem3_rebalance.erl | 87 ++++++++++++++++++++++++------------------ 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl index 5589e45..f00201a 100644 --- a/src/mem3_rebalance.erl +++ b/src/mem3_rebalance.erl @@ -36,7 +36,7 @@ -include("mem3.hrl"). -record (gacc, { - node, + donors, targets, moves, limit, @@ -136,36 +136,28 @@ global_expand(TargetNodes0, LocalOps, Limit) -> end, shard_count_by_node(LocalOps)), TotalCount = lists:foldl(fun({_, C}, Sum) -> Sum + C end, 0, CountByNode), TargetLevel = TotalCount div length(TargetNodes), - FoldFun = fun - (_, Acc) when length(Acc) >= Limit -> - % We've already accumulated the max number of shard ops. - Acc; - ({_Node, Count}, Acc) when Count =< TargetLevel -> - % This node is not a donor. - Acc; - ({Node0, Count}, Acc) -> - Node = list_to_existing_atom(binary_to_list(Node0)), - InternalAcc0 = #gacc{ - node = Node, - targets = TargetNodes0, - moves = Acc, - limit = erlang:min(Count - TargetLevel, Limit - length(Acc)), - target_level = TargetLevel - }, - try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of - #gacc{moves = Moves} -> - Moves - catch - {complete, Moves} -> - Moves - end - end, - lists:foldl(FoldFun, LocalOps, CountByNode). + Donors = [{list_to_existing_atom(binary_to_list(N)), C - TargetLevel} || + {N, C} <- CountByNode, C > TargetLevel], + InternalAcc0 = #gacc{ + donors = orddict:from_list(Donors), + targets = TargetNodes0, + moves = LocalOps, + limit = Limit - length(LocalOps), + target_level = TargetLevel + }, + try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of + #gacc{moves = Moves} -> + Moves + catch + {complete, Moves} -> + Moves + end. donate_fold(_Shard, #gacc{limit = 0, moves = Moves}) -> throw({complete, Moves}); -donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) -> +donate_fold(#shard{node = Node} = Shard, Acc0) -> #gacc{ + donors = Donors, targets = Nodes, moves = Moves, limit = DC, @@ -202,21 +194,40 @@ donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) -> false end end, - case {lists:member(Shard, Shards), lists:dropwhile(Fun, SortedByCount)} of - {false, _} -> - Acc0; - {true, []} -> - Acc0; - {true, [{Target, _} | _]} -> - print({move, Shard, Target}), - Acc0#gacc{ - moves = [{move, Shard, Target} | Moves], - limit = DC - 1 - } + case {lists:member(Shard, Shards), lists:keymember(Node, 1, Donors)} of + {true, true} -> + case lists:dropwhile(Fun, SortedByCount) of + [{Target, _} | _] -> + NewMoves = [{move, Shard, Target} | Moves], + print({move, Shard, Target}), + Acc0#gacc{ + moves = NewMoves, + limit = DC - 1, + donors = update_donors(Node, Donors, NewMoves) + }; + [] -> + Acc0 + end; + _ -> + Acc0 end; donate_fold(_Shard, Acc) -> Acc. +update_donors(Node, Donors, Moves) -> + NewDonors = case orddict:fetch(Node, Donors) of + 1 -> + orddict:erase(Node, Donors); + X -> + orddict:store(Node, X-1, Donors) + end, + case orddict:size(NewDonors) of + 0 -> + throw({complete, Moves}); + _ -> + NewDonors + end. + get_shard_count(AtomKey, ShardsByNode) when is_atom(AtomKey) -> length(couch_util:get_value(AtomKey, ShardsByNode, [])).