Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Suggest moves from all donor nodes in parallel
Previously the generator would suggest all moves from the first node
before moving onto the second one.  In the case where the quantum of
jobs is much smaller than the number of moves per node this results in
the other donors being neglected for long periods.

BugzID: 24612
  • Loading branch information
kocolosk authored and rnewson committed Jul 23, 2014
1 parent a6ca5c6 commit f9c06d629525873f2130caac81992a0cb42ab01f
Showing 1 changed file with 49 additions and 38 deletions.
@@ -36,7 +36,7 @@
-include("mem3.hrl").

-record (gacc, {
node,
donors,
targets,
moves,
limit,
@@ -136,36 +136,28 @@ global_expand(TargetNodes0, LocalOps, Limit) ->
end, shard_count_by_node(LocalOps)),
TotalCount = lists:foldl(fun({_, C}, Sum) -> Sum + C end, 0, CountByNode),
TargetLevel = TotalCount div length(TargetNodes),
FoldFun = fun
(_, Acc) when length(Acc) >= Limit ->
% We've already accumulated the max number of shard ops.
Acc;
({_Node, Count}, Acc) when Count =< TargetLevel ->
% This node is not a donor.
Acc;
({Node0, Count}, Acc) ->
Node = list_to_existing_atom(binary_to_list(Node0)),
InternalAcc0 = #gacc{
node = Node,
targets = TargetNodes0,
moves = Acc,
limit = erlang:min(Count - TargetLevel, Limit - length(Acc)),
target_level = TargetLevel
},
try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
#gacc{moves = Moves} ->
Moves
catch
{complete, Moves} ->
Moves
end
end,
lists:foldl(FoldFun, LocalOps, CountByNode).
Donors = [{list_to_existing_atom(binary_to_list(N)), C - TargetLevel} ||
{N, C} <- CountByNode, C > TargetLevel],
InternalAcc0 = #gacc{
donors = orddict:from_list(Donors),
targets = TargetNodes0,
moves = LocalOps,
limit = Limit - length(LocalOps),
target_level = TargetLevel
},
try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
#gacc{moves = Moves} ->
Moves
catch
{complete, Moves} ->
Moves
end.

donate_fold(_Shard, #gacc{limit = 0, moves = Moves}) ->
throw({complete, Moves});
donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
donate_fold(#shard{node = Node} = Shard, Acc0) ->
#gacc{
donors = Donors,
targets = Nodes,
moves = Moves,
limit = DC,
@@ -202,21 +194,40 @@ donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
false
end
end,
case {lists:member(Shard, Shards), lists:dropwhile(Fun, SortedByCount)} of
{false, _} ->
Acc0;
{true, []} ->
Acc0;
{true, [{Target, _} | _]} ->
print({move, Shard, Target}),
Acc0#gacc{
moves = [{move, Shard, Target} | Moves],
limit = DC - 1
}
case {lists:member(Shard, Shards), lists:keymember(Node, 1, Donors)} of
{true, true} ->
case lists:dropwhile(Fun, SortedByCount) of
[{Target, _} | _] ->
NewMoves = [{move, Shard, Target} | Moves],
print({move, Shard, Target}),
Acc0#gacc{
moves = NewMoves,
limit = DC - 1,
donors = update_donors(Node, Donors, NewMoves)
};
[] ->
Acc0
end;
_ ->
Acc0
end;
donate_fold(_Shard, Acc) ->
Acc.

update_donors(Node, Donors, Moves) ->
NewDonors = case orddict:fetch(Node, Donors) of
1 ->
orddict:erase(Node, Donors);
X ->
orddict:store(Node, X-1, Donors)
end,
case orddict:size(NewDonors) of
0 ->
throw({complete, Moves});
_ ->
NewDonors
end.

get_shard_count(AtomKey, ShardsByNode) when is_atom(AtomKey) ->
length(couch_util:get_value(AtomKey, ShardsByNode, [])).

0 comments on commit f9c06d6

Please sign in to comment.