Skip to content
Permalink
Browse files

Adapt to emqx 4.0-beta.2

  • Loading branch information
terry-xiaoyu committed Oct 14, 2019
1 parent c8efac8 commit c7c17540c1248dcdd402b41323c23a211e8292fc
Showing with 75 additions and 220 deletions.
  1. +75 −187 src/emqx_coap_mqtt_adapter.erl
  2. +0 −1 src/emqx_coap_ps_resource.erl
  3. +0 −1 src/emqx_coap_resource.erl
  4. +0 −31 test/emqx_coap_SUITE.erl
@@ -24,13 +24,14 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").

-include_lib("emqx/include/emqx_mqtt.hrl").

-logger_header("[CoAP-Adpter]").

%% API.
-export([ subscribe/2
, unsubscribe/2
, publish/3
, keepalive/1
]).

-export([ client_pid/4
@@ -46,16 +47,9 @@
, code_change/3
]).

-record(state, {chann, peer, keepalive, sub_topics = [], enable_stats}).

-define(DEFAULT_KEEPALIVE_DURATION, 60 * 2).
-record(state, {client_info, peer, sub_topics = []}).

-define(CHANN_INIT(A, B, C, D, E), chann_init(A, B, C, D, E)).
-define(CHANN_SUBSCRIBE(X, Y), chann_subscribe(X, Y)).
-define(CHANN_UNSUBSCRIBE(X, Y), chann_unsubscribe(X, Y)).
-define(CHANN_PUBLISH(A1, A2, P), chann_publish(A1, A2, P)).
-define(CHANN_HANDLEOUT(A1, P), emqx_channel:handle_out(A1, P)).
-define(CHANN_DELIVER_ACK(A1, A2, P), chann_deliver_ack(A1, A2, P)).
-define(CHANN_TIMEOUT(A1, A2, P), chann_timeout(A1, A2, P)).
-define(CHANN_ENSURE_TIMER(A1, P), emqx_channel:ensure_timer(A1, P)).
-define(CHANN_SHUTDOWN(A, B), emqx_channel:terminate(A, B)).
@@ -96,54 +90,43 @@ unsubscribe(Pid, Topic) ->
publish(Pid, Topic, Payload) ->
gen_server:call(Pid, {publish, Topic, Payload}).

keepalive(Pid)->
gen_server:cast(Pid, keepalive).

%%--------------------------------------------------------------------
%% gen_server Callbacks
%%--------------------------------------------------------------------

init({ClientId, Username, Password, Channel}) ->
init({ClientId, Username, Password, {PeerHost, _Port}= Channel}) ->
?LOG(debug, "try to start adapter ClientId=~p, Username=~p, Password=~p, Channel=~p",
[ClientId, Username, Password, Channel]),

EnableStats = application:get_env(?APP, enable_stats, false),
Interval = application:get_env(?APP, keepalive, ?DEFAULT_KEEPALIVE_DURATION),

case ?CHANN_INIT(ClientId, Username, Password, Channel, EnableStats) of
{ok, CState} ->
?LOG(debug, "Keepalive at the interval of ~p", [Interval]),
AliveTimer = emqx_coap_timer:start_timer(Interval, {keepalive, check}),
{ok, #state{chann = CState, peer = Channel, keepalive = AliveTimer, enable_stats = EnableStats}};
{stop, auth_failure} ->
{stop, auth_failure};
Other ->
{stop, Other}
case authenticate(ClientId, Username, Password, PeerHost) of
ok ->
ClientInfo = #{clientid => ClientId, username => Username, peerhost => PeerHost},
{ok, #state{client_info = ClientInfo, peer = Channel}};
{error, Reason} ->
?LOG(debug, "authentication faild: ~p", [Reason]),
{stop, {shutdown, Reason}}
end.

handle_call({subscribe, Topic, CoapPid}, _From, State=#state{chann = CState, sub_topics = TopicList}) ->
handle_call({subscribe, Topic, CoapPid}, _From, State=#state{client_info = ClientInfo, sub_topics = TopicList}) ->
NewTopics = proplists:delete(Topic, TopicList),
IsWild = emqx_topic:wildcard(Topic),
NCState = ?CHANN_SUBSCRIBE(Topic, CState),
{reply, ok, State#state{chann = NCState, sub_topics = [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};
chann_subscribe(Topic, ClientInfo),
{reply, ok, State#state{sub_topics = [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};

handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{chann = CState, sub_topics = TopicList}) ->
handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{client_info = ClientInfo, sub_topics = TopicList}) ->
NewTopics = proplists:delete(Topic, TopicList),
NCState = ?CHANN_UNSUBSCRIBE(Topic, CState),
{reply, ok, State#state{chann = NCState, sub_topics = NewTopics}, hibernate};
chann_unsubscribe(Topic, ClientInfo),
{reply, ok, State#state{sub_topics = NewTopics}, hibernate};

handle_call({publish, Topic, Payload}, _From, State=#state{chann = CState}) ->
NCState = ?CHANN_PUBLISH(Topic, Payload, CState),
{reply, ok, State#state{chann = NCState}};
handle_call({publish, Topic, Payload}, _From, State=#state{client_info = ClientInfo}) ->
chann_publish(Topic, Payload, ClientInfo),
{reply, ok, State};

handle_call(info, _From, State = #state{chann = CStateState, peer = Channel}) ->
CStateInfo = emqx_channel:info(CStateState),
ClientInfo = [{peername, Channel}],
Stats = stats(State),
{reply, lists:append([ClientInfo, CStateInfo, Stats]), State};
handle_call(info, _From, State = #state{peer = PeerHost}) ->
ClientInfo = [{peerhost, PeerHost}],
{reply, ClientInfo, State};

handle_call(stats, _From, State) ->
{reply, stats(State), State, hibernate};
{reply, [], State, hibernate};

handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State};
@@ -156,48 +139,17 @@ handle_call(get_rate_limit, _From, State) ->
?LOG(error, "get_rate_limit is not support", []),
{reply, ok, State};

handle_call(session, _From, State = #state{chann = CStateState}) ->
{reply, emqx_channel:info(session, CStateState), State};

handle_call(Request, _From, State) ->
?LOG(error, "adapter unexpected call ~p", [Request]),
{reply, ignored, State, hibernate}.

handle_cast(keepalive, State=#state{keepalive = undefined}) ->
{noreply, State, hibernate};

handle_cast(keepalive, State=#state{keepalive = Keepalive}) ->
NewKeepalive = emqx_coap_timer:kick_timer(Keepalive),
{noreply, State#state{keepalive = NewKeepalive}, hibernate};

handle_cast(Msg, State) ->
?LOG(error, "broker_api unexpected cast ~p", [Msg]),
{noreply, State, hibernate}.

handle_info(Deliver = {deliver, _Topic, _Msg}, State = #state{chann = CState, sub_topics = Subscribers}) ->
Delivers = emqx_misc:drain_deliver([Deliver]),
case ?CHANN_HANDLEOUT({deliver, Delivers}, CState) of
{ok, CState1} -> {noreply, State#state{chann = CState1}};
{ok, PubPkts, CState1} ->
{ok, CState2} = deliver(PubPkts, CState1, Subscribers),
{noreply, State#state{chann = CState2}};
{stop, Reason, CState1} ->
{stop, Reason, State#state{chann = CState1}}
end;

handle_info({keepalive, check}, State = #state{keepalive = Timer}) ->
case emqx_coap_timer:is_timeout(Timer) of
false ->
?LOG(debug, "Keepalive checked ok", []),
NTimer = emqx_coap_timer:restart_timer(Timer),
{noreply, State#state{keepalive = NTimer}};
true ->
?LOG(info, "Keepalive timeout", []),
{stop, normal, State}
end;

handle_info({timeout, TRef, emit_stats}, State) ->
{noreply, ok, handle_timeout(TRef, {emit_stats, stats(State)}, State)};
handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}}, State = #state{sub_topics = Subscribers}) ->
deliver([{Topic, Payload}], Subscribers),
{noreply, State, hibernate};

handle_info(timeout, State) ->
{stop, {shutdown, idle_timeout}, State};
@@ -213,101 +165,73 @@ handle_info(Info, State) ->
?LOG(error, "adapter unexpected info ~p", [Info]),
{noreply, State, hibernate}.

terminate(Reason, #state{chann = CState, keepalive = Timer}) ->
emqx_coap_timer:cancel_timer(Timer),
case {CState, Reason} of
{undefined, _} ->
ok;
{_, {shutdown, Error}} ->
?CHANN_SHUTDOWN(Error, CState);
{_, Reason} ->
?CHANN_SHUTDOWN(Reason, CState)
end.
terminate(Reason, #state{client_info = ClientInfo, sub_topics = SubTopics}) ->
?LOG(debug, "unsubscribe ~p while exiting for ~p", [SubTopics, Reason]),
[chann_unsubscribe(Topic, ClientInfo) || {Topic, _} <- SubTopics],
emqx_hooks:run('client.disconnected', [ClientInfo, Reason, #{}]).

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%--------------------------------------------------------------------
%% Channel adapter functions

chann_init(ClientId, Username, Password, Channel, EnableStats) ->
Options = [{zone, external}],
ConnInfo = #{peername => Channel,
protocol => coap,
peercert => nossl},
CState = set_enable_stats(EnableStats, emqx_channel:init(ConnInfo, Options)),
ConnPkt = #mqtt_packet_connect{clientid = ClientId,
username = Username,
password = Password,
clean_start = true,
keepalive = 0 %% Not set keepalive timer with channel functions
},
case emqx_channel:handle_in(?CONNECT_PACKET(ConnPkt), CState) of
{ok, _Connack, CState1} -> {ok, CState1};
{stop, {shutdown, auth_failure}, _Connack, _CState1} -> {stop, auth_failure};
Other -> error(Other)
authenticate(ClientId, Username, Password, PeerHost) ->
Credentials = credentials(PeerHost, ClientId, Username, Password),
case emqx_access_control:authenticate(Credentials) of
{ok, AuthResult} ->
Credentials1 = maps:merge(Credentials, AuthResult),
emqx_hooks:run('client.connected',
[Credentials1, ?RC_SUCCESS,
#{clean_start => true,
expiry_interval => 0,
proto_name => coap,
peerhost => PeerHost,
connected_at => os:timestamp(),
keepalive => 0,
peercert => nossl,
proto_ver => <<"1.0">>}]),
ok;
{error, Error} ->
emqx_hooks:run('client.connected', [Credentials, ?RC_NOT_AUTHORIZED, #{}]),
{error, Error}
end.

chann_subscribe(Topic, CState) ->
chann_subscribe(Topic, ClientInfo) ->
?LOG(debug, "subscribe Topic=~p", [Topic]),
SubOpts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_1},
throw_if(
emqx_channel:handle_in(?SUBSCRIBE_PACKET(1, [{Topic, SubOpts}]), CState)).
Opts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0},
emqx_broker:subscribe(Topic, Opts),
emqx_hooks:run('session.subscribed', [ClientInfo, Topic, Opts]).

chann_unsubscribe(Topic, CState) ->
chann_unsubscribe(Topic, ClientInfo) ->
?LOG(debug, "unsubscribe Topic=~p", [Topic]),
throw_if(
emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, [Topic]), CState)).
Opts = #{rh => 0, rap => 0, nl => 0, qos => 0},
emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, Opts]).

chann_publish(Topic, Payload, CState) ->
chann_publish(Topic, Payload, #{clientid := ClientId}) ->
?LOG(debug, "publish Topic=~p, Payload=~p", [Topic, Payload]),
Publish = #mqtt_packet{
header = #mqtt_packet_header{type = ?PUBLISH, qos = ?QOS_0},
variable = #mqtt_packet_publish{topic_name = Topic, packet_id = 1},
payload = Payload
},
throw_if(
emqx_channel:handle_in(Publish, CState)).

chann_deliver_ack(?QOS_0, _, CState) ->
CState;
chann_deliver_ack(?QOS_1, PktId, CState) ->
throw_if(
emqx_channel:handle_in(?PUBACK_PACKET(PktId), CState));

chann_deliver_ack(?QOS_2, PktId, CState) ->
CState1 = throw_if(emqx_channel:handle_in(?PUBREC_PACKET(PktId), CState)),
throw_if(
emqx_channel:handle_in(?PUBCOMP_PACKET(PktId), CState1)).

chann_timeout(TRef, Msg, CState) ->
case emqx_channel:handle_timeout(TRef, Msg, CState) of
{ok, NCState} -> NCState;
{ok, _Pkts, NCState} ->
%% TODO: pkts???
NCState;
Other -> error(Other)
end.
emqx_broker:publish(
emqx_message:set_flag(retain, false,
emqx_message:make(ClientId, ?QOS_0, Topic, Payload))).

%%--------------------------------------------------------------------
%% Deliver

deliver([], CState, _) -> {ok, CState};
deliver([Pub | More], CState, Subscribers) ->
{ok, CState1} = deliver(Pub, CState, Subscribers),
deliver(More, CState1, Subscribers);
deliver([], _) -> ok;
deliver([Pub | More], Subscribers) ->
ok = do_deliver(Pub, Subscribers),
deliver(More, Subscribers).

deliver(?PUBLISH_PACKET(Qos, Topic, PktId, Payload), CState, Subscribers) ->
do_deliver({Topic, Payload}, Subscribers) ->
%% handle PUBLISH packet from broker
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),

NCState = ?CHANN_DELIVER_ACK(Qos, PktId, CState),
deliver_to_coap(Topic, Payload, Subscribers),
{ok, NCState};
ok;

deliver(Pkt, CState, _Subscribers) ->
do_deliver(Pkt, _Subscribers) ->
?LOG(warning, "unknown packet type to deliver, pkt=~p,", [Pkt]),
{ok, CState}.
ok.

deliver_to_coap(_TopicName, _Payload, []) ->
ok;
@@ -320,44 +244,8 @@ deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) ->
Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
deliver_to_coap(TopicName, Payload, T).

%%--------------------------------------------------------------------
%% Misc funcs

set_enable_stats(EnableStats, CState) ->
StatsTimer = if
EnableStats -> undefined;
true -> disabled
end,
Timers = element(9, CState),
setelement(9, CState, Timers#{stats_timer => StatsTimer}).

handle_timeout(TRef, Msg = {emit_stats, _}, State = #state{chann = CState}) ->
State#state{chann = ?CHANN_TIMEOUT(TRef, Msg, CState)}.

throw_if({ok, CState}) ->
CState;
throw_if({ok, _Pkts, CState}) ->
CState;
throw_if({stop, Reason, _DisconnPkt, _CState}) ->
error(Reason);
throw_if(OtherRet) ->
error(OtherRet).


stats(#state{chann = ChanState}) ->
SockStats = socket_stats(undefined, ?SOCK_STATS),
ChanStats = [{Name, emqx_pd:get_counter(Name)} || Name <- ?CHAN_STATS],
SessStats = emqx_session:stats(emqx_channel:info(session, ChanState)),
lists:append([SockStats, ChanStats, SessStats, emqx_misc:proc_stats()]).

%% here we keep the original socket_stats implementation, which will be put into use when we can get socket fd in emqx_coap_mqtt_adapter process
%socket_stats(Sock, Stats) when is_port(Sock), is_list(Stats)->
%inet:getstat(Sock, Stats).

%%this socket_stats is a fake funtion
socket_stats(undefined, Stats) when is_list(Stats)->
FakeSockOpt = [0, 0, 0, 0, 0],
List = lists:zip(Stats, FakeSockOpt),
?LOG(debug, "The List=~p", [List]),
List.

credentials(PeerHost, ClientId, Username, Password) ->
#{peerhost => PeerHost,
clientid => ClientId,
username => Username,
password => Password}.
@@ -56,7 +56,6 @@ coap_get(ChId, ?PS_PREFIX, TopicPath, Query, Content=#coap_content{format = Form
case emqx_coap_mqtt_adapter:client_pid(Clientid, Usr, Passwd, ChId) of
{ok, Pid} ->
put(mqtt_client_pid, Pid),
emqx_coap_mqtt_adapter:keepalive(Pid),
case Format of
<<"application/link-format">> ->
Content;
@@ -53,7 +53,6 @@ coap_get(ChId, ?MQTT_PREFIX, Path, Query, _Content) ->
case emqx_coap_mqtt_adapter:client_pid(Clientid, Usr, Passwd, ChId) of
{ok, Pid} ->
put(mqtt_client_pid, Pid),
emqx_coap_mqtt_adapter:keepalive(Pid),
#coap_content{};
{error, auth_failure} ->
put(mqtt_client_pid, undefined),

0 comments on commit c7c1754

Please sign in to comment.
You can’t perform that action at this time.