Skip to content
Permalink
Browse files

Expire the coap_adapter after a period of time

  • Loading branch information
terry-xiaoyu committed Oct 14, 2019
1 parent c7c1754 commit 9b8ede093cfc3b7211663520e496c579c11611f6
Showing with 11 additions and 18 deletions.
  1. +0 −8 etc/emqx_coap.conf
  2. +0 −5 priv/emqx_coap.schema
  3. +11 −5 src/emqx_coap_mqtt_adapter.erl
@@ -7,14 +7,6 @@
## Value: Port
coap.port = 5683

## Interval for keepalive, specified in seconds.
##
## Value: Duration
## -s: seconds
## -m: minutes
## -h: hours
coap.keepalive = 120s

## Whether to enable statistics for CoAP clients.
##
## Value: on | off
@@ -5,11 +5,6 @@
{datatype, integer}
]}.

{mapping, "coap.keepalive", "emqx_coap.keepalive", [
{default, "120s"},
{datatype, {duration, s}}
]}.

{mapping, "coap.enable_stats", "emqx_coap.enable_stats", [
{datatype, flag}
]}.
@@ -57,6 +57,8 @@
-define(CHAN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).

-define(ALIVE_INTERVAL, 20000).

%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@@ -100,6 +102,7 @@ init({ClientId, Username, Password, {PeerHost, _Port}= Channel}) ->
case authenticate(ClientId, Username, Password, PeerHost) of
ok ->
ClientInfo = #{clientid => ClientId, username => Username, peerhost => PeerHost},
erlang:send_after(?ALIVE_INTERVAL, self(), check_alive),
{ok, #state{client_info = ClientInfo, peer = Channel}};
{error, Reason} ->
?LOG(debug, "authentication faild: ~p", [Reason]),
@@ -151,8 +154,11 @@ handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}}, State
deliver([{Topic, Payload}], Subscribers),
{noreply, State, hibernate};

handle_info(timeout, State) ->
{stop, {shutdown, idle_timeout}, State};
handle_info(check_alive, State = #state{sub_topics = []}) ->
{stop, {shutdown, check_alive}, State};
handle_info(check_alive, State) ->
erlang:send_after(?ALIVE_INTERVAL, self(), check_alive),
{noreply, State, hibernate};

handle_info({shutdown, Error}, State) ->
{stop, {shutdown, Error}, State};
@@ -197,10 +203,10 @@ authenticate(ClientId, Username, Password, PeerHost) ->
{error, Error}
end.

chann_subscribe(Topic, ClientInfo) ->
chann_subscribe(Topic, ClientInfo = #{clientid := ClientId}) ->
?LOG(debug, "subscribe Topic=~p", [Topic]),
Opts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0},
emqx_broker:subscribe(Topic, Opts),
Opts = #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false},
emqx_broker:subscribe(Topic, ClientId, Opts),
emqx_hooks:run('session.subscribed', [ClientInfo, Topic, Opts]).

chann_unsubscribe(Topic, ClientInfo) ->

0 comments on commit 9b8ede0

Please sign in to comment.
You can’t perform that action at this time.