Permalink
Browse files

Merge pull request #265 from basho/jdb-ordered-rpc

  • Loading branch information...
jtuple committed Jan 3, 2013
2 parents 96cb174 + 36a5a15 commit 427a2cf7c7284a45b9f5ca4c7f117fa2295add95
Showing with 82 additions and 4 deletions.
  1. +82 −4 src/riak_core_util.erl
View
@@ -40,6 +40,13 @@
orddict_delta/2,
rpc_every_member/4,
rpc_every_member_ann/4,
+ pmap/2,
+ multi_rpc/4,
+ multi_rpc/5,
+ multi_rpc_ann/4,
+ multi_rpc_ann/5,
+ multicall_ann/4,
+ multicall_ann/5,
is_arch/1]).
-ifdef(TEST).
@@ -236,6 +243,22 @@ ensure_started(App) ->
ok
end.
+%% @doc Invoke function `F' over each element of list `L' in parallel,
+%% returning the results in the same order as the input list.
+-spec pmap(function(), [node()]) -> [any()].
+pmap(F, L) ->
+ Parent = self(),
+ lists:foldl(
+ fun(X, N) ->
+ spawn(fun() ->
+ Parent ! {pmap, N, F(X)}
+ end),
+ N+1
+ end, 0, L),
+ L2 = [receive {pmap, N, R} -> {N,R} end || _ <- L],
+ {_, L3} = lists:unzip(lists:keysort(1, L2)),
+ L3.
+
%% @spec rpc_every_member(atom(), atom(), [term()], integer()|infinity)
%% -> {Results::[term()], BadNodes::[node()]}
%% @doc Make an RPC call to the given module and function on each
@@ -253,10 +276,65 @@ rpc_every_member(Module, Function, Args, Timeout) ->
rpc_every_member_ann(Module, Function, Args, Timeout) ->
{ok, MyRing} = riak_core_ring_manager:get_my_ring(),
Nodes = riak_core_ring:all_members(MyRing),
- {Results, Down} = rpc:multicall(Nodes, Module, Function, Args, Timeout),
- Up = Nodes -- Down,
- TaggedResults = lists:zip(Up, Results),
- {TaggedResults, Down}.
+ {Results, Down} = multicall_ann(Nodes, Module, Function, Args, Timeout),
+ {Results, Down}.
+
+%% @doc Perform an RPC call to a list of nodes in parallel, returning the
+%% results in the same order as the input list.
+-spec multi_rpc([node()], module(), function(), [any()]) -> [any()].
+multi_rpc(Nodes, Mod, Fun, Args) ->
+ multi_rpc(Nodes, Mod, Fun, Args, infinity).
+
+%% @doc Perform an RPC call to a list of nodes in parallel, returning the
+%% results in the same order as the input list.
+-spec multi_rpc([node()], module(), function(), [any()], timeout()) -> [any()].
+multi_rpc(Nodes, Mod, Fun, Args, Timeout) ->
+ pmap(fun(Node) ->
+ rpc:call(Node, Mod, Fun, Args, Timeout)
+ end, Nodes).
+
+%% @doc Perform an RPC call to a list of nodes in parallel, returning the
+%% results in the same order as the input list. Each result is tagged
+%% with the corresponding node name.
+-spec multi_rpc_ann([node()], module(), function(), [any()])
+ -> [{node(), any()}].
+multi_rpc_ann(Nodes, Mod, Fun, Args) ->
+ multi_rpc_ann(Nodes, Mod, Fun, Args, infinity).
+
+%% @doc Perform an RPC call to a list of nodes in parallel, returning the
+%% results in the same order as the input list. Each result is tagged
+%% with the corresponding node name.
+-spec multi_rpc_ann([node()], module(), function(), [any()], timeout())
+ -> [{node(), any()}].
+multi_rpc_ann(Nodes, Mod, Fun, Args, Timeout) ->
+ Results = multi_rpc(Nodes, Mod, Fun, Args, Timeout),
+ lists:zip(Nodes, Results).
+
+%% @doc Similar to {@link rpc:multicall/4}. Performs an RPC call to a list
+%% of nodes in parallel, returning a list of results as well as a list
+%% of nodes that are down/unreachable. The results will be returned in
+%% the same order as the input list, and each result is tagged with the
+%% corresponding node name.
+-spec multicall_ann([node()], module(), function(), [any()])
+ -> {Results :: [{node(), any()}], Down :: [node()]}.
+multicall_ann(Nodes, Mod, Fun, Args) ->
+ multicall_ann(Nodes, Mod, Fun, Args, infinity).
+
+%% @doc Similar to {@link rpc:multicall/6}. Performs an RPC call to a list
+%% of nodes in parallel, returning a list of results as well as a list
+%% of nodes that are down/unreachable. The results will be returned in
+%% the same order as the input list, and each result is tagged with the
+%% corresponding node name.
+-spec multicall_ann([node()], module(), function(), [any()], timeout())
+ -> {Results :: [{node(), any()}], Down :: [node()]}.
+multicall_ann(Nodes, Mod, Fun, Args, Timeout) ->
+ L = multi_rpc_ann(Nodes, Mod, Fun, Args, Timeout),
+ {Results, DownAnn} =
+ lists:partition(fun({_, Result}) ->
+ Result /= {badrpc, nodedown}
+ end, L),
+ {Down, _} = lists:unzip(DownAnn),
+ {Results, Down}.
%% @doc Convert a list of elements into an N-ary tree. This conversion
%% works by treating the list as an array-based tree where, for

0 comments on commit 427a2cf

Please sign in to comment.