Skip to content

Commit

Permalink
Merge pull request #3000 from emqx/ordered_rpc
Browse files Browse the repository at this point in the history
Ordered messaging via multiple gen_rpc clients
  • Loading branch information
tigercl committed Nov 1, 2019
2 parents eb2fec8 + 3405dba commit 7f3001e
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/emqx_broker.erl
Expand Up @@ -258,15 +258,15 @@ aggre(Routes) ->
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
-> emqx_types:deliver_result()).
forward(Node, To, Delivery, async) ->
case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
true -> ok;
{badrpc, Reason} ->
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
{error, badrpc}
end;

forward(Node, To, Delivery, sync) ->
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} ->
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
{error, badrpc};
Expand Down
19 changes: 17 additions & 2 deletions src/emqx_rpc.erl
Expand Up @@ -18,8 +18,11 @@
-module(emqx_rpc).

-export([ call/4
, call/5
, cast/4
, cast/5
, multicall/4
, multicall/5
]).

-compile({inline,
Expand All @@ -34,15 +37,27 @@
call(Node, Mod, Fun, Args) ->
filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).

call(Key, Node, Mod, Fun, Args) ->
filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)).

multicall(Nodes, Mod, Fun, Args) ->
filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).

multicall(Key, Nodes, Mod, Fun, Args) ->
filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).

cast(Node, Mod, Fun, Args) ->
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).

rpc_node(Node) ->
cast(Key, Node, Mod, Fun, Args) ->
filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)).

rpc_node(Node) when is_atom(Node) ->
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
{Node, rand:uniform(ClientNum)};
rpc_node({Key, Node}) when is_atom(Node) ->
ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
{Node, rand:uniform(ClientNum)}.
{Node, erlang:phash2(Key, ClientNum) + 1}.

rpc_nodes(Nodes) ->
rpc_nodes(Nodes, []).
Expand Down
50 changes: 38 additions & 12 deletions test/emqx_rpc_SUITE.erl
Expand Up @@ -24,25 +24,16 @@

all() -> emqx_ct:all(?MODULE).

t_multicall(_) ->
error('TODO').

t_cast(_) ->
error('TODO').

t_call(_) ->
error('TODO').



t_prop_rpc(_) ->
ok = load(),
Opts = [{to_file, user}, {numtests, 10}],
{ok, _Apps} = application:ensure_all_started(gen_rpc),
ok = application:set_env(gen_rpc, call_receive_timeout, 1),
ok = emqx_logger:set_log_level(emergency),
?assert(proper:quickcheck(prop_node(), Opts)),
?assert(proper:quickcheck(prop_node_with_key(), Opts)),
?assert(proper:quickcheck(prop_nodes(), Opts)),
?assert(proper:quickcheck(prop_nodes_with_key(), Opts)),
ok = application:stop(gen_rpc),
ok = unload().

Expand All @@ -57,6 +48,17 @@ prop_node() ->
end
end).

prop_node_with_key() ->
?FORALL({Node, Key}, nodename_with_key(),
begin
?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])),
case emqx_rpc:call(Key, Node, erlang, system_time, []) of
{badrpc, _Reason} -> true;
Delivery when is_integer(Delivery) -> true;
_Other -> false
end
end).

prop_nodes() ->
?FORALL(Nodes, nodesname(),
begin
Expand All @@ -70,6 +72,19 @@ prop_nodes() ->
end
end).

prop_nodes_with_key() ->
?FORALL({Nodes, Key}, nodesname_with_key(),
begin
case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of
{badrpc, _Reason} -> true;
{RealResults, RealBadNodes}
when is_list(RealResults);
is_list(RealBadNodes) ->
true;
_Other -> false
end
end).

%%--------------------------------------------------------------------
%% helper
%%--------------------------------------------------------------------
Expand All @@ -96,8 +111,19 @@ nodename() ->
list_to_atom(Node)
end).

nodename_with_key() ->
?LET({NodePrefix, HostName, Key},
{node_prefix(), hostname(), choose(0, 10)},
begin
Node = NodePrefix ++ "@" ++ HostName,
{list_to_atom(Node), Key}
end).

nodesname() ->
oneof([list(nodename()), ["emqxct@127.0.0.1"]]).
oneof([list(nodename()), ['emqxct@127.0.0.1']]).

nodesname_with_key() ->
oneof([{list(nodename()), choose(0, 10)}, {['emqxct@127.0.0.1'], 1}]).

node_prefix() ->
oneof(["emqxct", text_like()]).
Expand Down

0 comments on commit 7f3001e

Please sign in to comment.