Skip to content

Commit

Permalink
Add APIs for subscription add / delete
Browse files Browse the repository at this point in the history
  • Loading branch information
spring2maz committed Feb 17, 2019
1 parent 3790182 commit 2ae7d9f
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 36 deletions.
8 changes: 7 additions & 1 deletion src/emqx_portal_connect.erl
Expand Up @@ -18,14 +18,16 @@

-export_type([config/0, connection/0]).

-optional_callbacks([]).
-optional_callbacks([ensure_subscribed/3, ensure_unsubscribed/2]).

%% map fields depend on implementation
-type config() :: map().
-type connection() :: term().
-type conn_ref() :: term().
-type batch() :: emqx_protal:batch().
-type ack_ref() :: emqx_portal:ack_ref().
-type topic() :: emqx_topic:topic().
-type qos() :: emqx_mqtt_types:qos().

-include("logger.hrl").

Expand All @@ -42,6 +44,10 @@
%% called when owner is shutting down.
-callback stop(conn_ref(), connection()) -> ok.

-callback ensure_subscribed(connection(), topic(), qos()) -> ok.

-callback ensure_unsubscribed(connection(), topic()) -> ok.

start(Module, Config) ->
case Module:start(Config) of
{ok, Ref, Conn} ->
Expand Down
108 changes: 79 additions & 29 deletions src/portal/emqx_portal.erl
Expand Up @@ -74,7 +74,7 @@

%% management APIs
-export([get_forwards/1, ensure_forward_present/2, ensure_forward_absent/2]).
-export([get_subscriptions/1]). %, add_subscription/3, del_subscription/2]).
-export([get_subscriptions/1, ensure_subscription_present/3, ensure_subscription_absent/2]).

-export_type([config/0,
batch/0,
Expand Down Expand Up @@ -142,17 +142,32 @@ handle_ack(Pid, Ref) when node() =:= node(Pid) ->
-spec get_forwards(id()) -> [topic()].
get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)).

%% @doc Return all subscriptions (subscription over mqtt connection to remote broker).
-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).

%% @doc Add a new forward (local topic subscription).
-spec ensure_forward_present(id(), topic()) -> ok.
ensure_forward_present(Id, Topic) ->
gen_statem:call(id(Id), {ensure_forward_present, topic(Topic)}).
gen_statem:call(id(Id), {ensure_present, forwards, topic(Topic)}).

%% @doc Ensure a forward topic is deleted.
-spec ensure_forward_absent(id(), topic()) -> ok.
ensure_forward_absent(Id, Topic) ->
gen_statem:call(id(Id), {ensure_forward_absent, topic(Topic)}).
gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}).

-spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}].
get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions).
%% @doc Ensure subscribed to remote topic.
%% NOTE: only applicable when connection module is emqx_portal_mqtt
%% return `{error, no_remote_subscription_support}' otherwise.
-spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}.
ensure_subscription_present(Id, Topic, QoS) ->
gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}).

%% @doc Ensure unsubscribed from remote topic.
%% NOTE: only applicable when connection module is emqx_portal_mqtt
-spec ensure_subscription_absent(id(), topic()) -> ok.
ensure_subscription_absent(Id, Topic) ->
gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}).

callback_mode() -> [state_functions, state_enter].

Expand Down Expand Up @@ -187,7 +202,7 @@ init(Config) ->
mountpoint,
forwards
], Config#{subscriptions => Subs}),
ConnectFun = fun() -> emqx_portal_connect:start(ConnectModule, ConnectConfig) end,
ConnectFun = fun(SubsX) -> emqx_portal_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end,
{ok, connecting,
#{connect_module => ConnectModule,
connect_fun => ConnectFun,
Expand Down Expand Up @@ -217,8 +232,10 @@ connecting(enter, connected, #{reconnect_delay_ms := Timeout}) ->
Action = {state_timeout, Timeout, reconnect},
{keep_state_and_data, Action};
connecting(enter, connecting, #{reconnect_delay_ms := Timeout,
connect_fun := ConnectFun} = State) ->
case ConnectFun() of
connect_fun := ConnectFun,
subscriptions := Subs
} = State) ->
case ConnectFun(Subs) of
{ok, ConnRef, Conn} ->
Action = {state_timeout, 0, connected},
{keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
Expand Down Expand Up @@ -277,36 +294,22 @@ connected(info, {batch_ack, Ref}, State) ->
%% try re-connect then re-send
{next_state, connecting, disconnect(State)};
{ok, NewState} ->
{keep_state, NewState}
{keep_state, NewState, ?maybe_send}
end;
connected(Type, Content, State) ->
common(connected, Type, Content, State).

%% Common handlers
common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) ->
{keep_state_and_data, [{reply, From, Forwards}]};
common(_StateName, {call, From}, {ensure_forward_present, Topic},
#{forwards := Forwards} = State) ->
case lists:member(Topic, Forwards) of
true ->
{keep_state_and_data, [{reply, From, ok}]};
false ->
ok = subscribe_local_topic(Topic),
{keep_state, State#{forwards := lists:usort([Topic | Forwards])},
[{reply, From, ok}]}
end;
common(_StateName, {call, From}, {ensure_forward_absent, Topic},
#{forwards := Forwards} = State) ->
case lists:member(Topic, Forwards) of
true ->
emqx_broker:unsubscribe(Topic),
{keep_state, State#{forwards := lists:delete(Topic, Forwards)},
[{reply, From, ok}]};
false ->
{keep_state_and_data, [{reply, From, ok}]}
end;
common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) ->
{keep_state_and_data, [{reply, From, Subs}]};
common(_StateName, {call, From}, {ensure_present, What, Topic}, State) ->
{Result, NewState} = ensure_present(What, Topic, State),
{keep_state, NewState, [{reply, From, Result}]};
common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) ->
{Result, NewState} = ensure_absent(What, Topic, State),
{keep_state, NewState, [{reply, From, Result}]};
common(_StateName, info, {dispatch, _, Msg},
#{replayq := Q} = State) ->
NewQ = replayq:append(Q, collect([Msg])),
Expand All @@ -316,6 +319,53 @@ common(StateName, Type, Content, State) ->
[name(), Type, StateName, Content]),
{keep_state, State}.

ensure_present(Key, Topic, State) ->
Topics = maps:get(Key, State),
case is_topic_present(Topic, Topics) of
true ->
{ok, State};
false ->
R = do_ensure_present(Key, Topic, State),
{R, State#{Key := lists:usort([Topic | Topics])}}
end.

ensure_absent(Key, Topic, State) ->
Topics = maps:get(Key, State),
case is_topic_present(Topic, Topics) of
true ->
R = do_ensure_absent(Key, Topic, State),
{R, State#{Key := ensure_topic_absent(Topic, Topics)}};
false ->
{ok, State}
end.

ensure_topic_absent(_Topic, []) -> [];
ensure_topic_absent(Topic, [{_, _} | _] = L) -> lists:keydelete(Topic, 1, L);
ensure_topic_absent(Topic, L) -> lists:delete(Topic, L).

is_topic_present({Topic, _QoS}, Topics) ->
is_topic_present(Topic, Topics);
is_topic_present(Topic, Topics) ->
lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics).

do_ensure_present(forwards, Topic, _) ->
ok = subscribe_local_topic(Topic);
do_ensure_present(subscriptions, {Topic, QoS},
#{connect_module := ConnectModule, connection := Conn}) ->
case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of
true -> ConnectModule:ensure_subscribed(Conn, Topic, QoS);
false -> {error, no_remote_subscription_support}
end.

do_ensure_absent(forwards, Topic, _) ->
ok = emqx_broker:unsubscribe(Topic);
do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule,
connection := Conn}) ->
case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of
true -> ConnectModule:ensure_unsubscribed(Conn, Topic);
false -> {error, no_remote_subscription_support}
end.

collect(Acc) ->
receive
{dispatch, _, Msg} ->
Expand Down
17 changes: 17 additions & 0 deletions src/portal/emqx_portal_mqtt.erl
Expand Up @@ -23,6 +23,11 @@
stop/2
]).

%% optional behaviour callbacks
-export([ensure_subscribed/3,
ensure_unsubscribed/2
]).

-include("emqx_mqtt.hrl").

-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}).
Expand Down Expand Up @@ -66,6 +71,18 @@ stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) ->
safe_stop(Pid, fun() -> emqx_client:stop(Pid) end, 1000),
ok.

ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
emqx_client:subscribe(Pid, Topic, QoS);
ensure_subscribed(_Conn, _Topic, _QoS) ->
%% return ok for now, next re-connect should should call start with new topic added to config
ok.

ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
emqx_client:unsubscribe(Pid, Topic);
ensure_unsubscribed(_, _) ->
%% return ok for now, next re-connect should should call start with this topic deleted from config
ok.

safe_stop(Pid, StopF, Timeout) ->
MRef = monitor(process, Pid),
unlink(Pid),
Expand Down
34 changes: 28 additions & 6 deletions test/emqx_portal_SUITE.erl
Expand Up @@ -17,7 +17,7 @@
-export([all/0, init_per_suite/1, end_per_suite/1]).
-export([t_rpc/1,
t_mqtt/1,
t_forwards_mngr/1
t_mngr/1
]).

-include_lib("eunit/include/eunit.hrl").
Expand All @@ -29,7 +29,7 @@

all() -> [t_rpc,
t_mqtt,
t_forwards_mngr
t_mngr
].

init_per_suite(Config) ->
Expand All @@ -44,7 +44,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().

t_forwards_mngr(Config) when is_list(Config) ->
t_mngr(Config) when is_list(Config) ->
Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
Cfg = #{address => node(),
forwards => [<<"mngr">>],
Expand All @@ -62,6 +62,10 @@ t_forwards_mngr(Config) when is_list(Config) ->
?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr2")),
?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr3")),
?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Pid)),
?assertEqual({error, no_remote_subscription_support},
emqx_portal:ensure_subscription_present(Pid, <<"t">>, 0)),
?assertEqual({error, no_remote_subscription_support},
emqx_portal:ensure_subscription_absent(Pid, <<"t">>)),
?assertEqual(Subs, emqx_portal:get_subscriptions(Pid))
after
ok = emqx_portal:stop(Pid)
Expand Down Expand Up @@ -93,10 +97,16 @@ t_rpc(Config) when is_list(Config) ->
ok = emqx_portal:stop(Pid)
end.

%% Full data loopback flow explained:
%% test-pid ---> mock-cleint ----> local-broker ---(local-subscription)--->
%% portal(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) -->
%% portal(import) --(mecked message sending)--> test-pid
t_mqtt(Config) when is_list(Config) ->
SendToTopic = <<"t_mqtt/one">>,
SendToTopic2 = <<"t_mqtt/two">>,
Mountpoint = <<"forwarded/${node}/">>,
ForwardedTopic = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic]),
ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]),
Cfg = #{address => "127.0.0.1:1883",
forwards => [SendToTopic],
connect_module => emqx_portal_mqtt,
Expand All @@ -118,7 +128,7 @@ t_mqtt(Config) when is_list(Config) ->
start_type => manual,
%% Consume back to forwarded message for verification
%% NOTE: this is a indefenite loopback without mocking emqx_portal:import_batch/2
subscriptions => [{ForwardedTopic, 1}]
subscriptions => [{ForwardedTopic, _QoS = 1}]
},
Tester = self(),
Ref = make_ref(),
Expand All @@ -131,23 +141,35 @@ t_mqtt(Config) when is_list(Config) ->
{ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"client-1">>,
try
?assertEqual([{ForwardedTopic, 1}], emqx_portal:get_subscriptions(Pid)),
emqx_portal:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1),
?assertEqual([{ForwardedTopic, 1},
{ForwardedTopic2, 1}], emqx_portal:get_subscriptions(Pid)),
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
%% message from a different client, to avoid getting terminated by no-local
Msgs = lists:seq(1, 10),
Max = 100,
Msgs = lists:seq(1, Max),
lists:foreach(fun(I) ->
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
end, Msgs),
ok = receive_and_match_messages(Ref, Msgs),
ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2),
Msgs2 = lists:seq(Max + 1, Max * 2),
lists:foreach(fun(I) ->
Msg = emqx_message:make(<<"client-2">>, ?QOS_1, SendToTopic2, integer_to_binary(I)),
emqx_session:publish(SPid, I, Msg)
end, Msgs2),
ok = receive_and_match_messages(Ref, Msgs2),
emqx_mock_client:close_session(ConnPid)
after
ok = emqx_portal:stop(Pid),
meck:unload(emqx_portal)
end.

receive_and_match_messages(Ref, Msgs) ->
TRef = erlang:send_after(timer:seconds(4), self(), {Ref, timeout}),
TRef = erlang:send_after(timer:seconds(5), self(), {Ref, timeout}),
try
do_receive_and_match_messages(Ref, Msgs)
after
Expand Down

0 comments on commit 2ae7d9f

Please sign in to comment.