Skip to content

Commit

Permalink
Move the emq_mod_retainer project to emq_retainer
Browse files Browse the repository at this point in the history
  • Loading branch information
Feng Lee committed Feb 15, 2017
1 parent df74932 commit 1e3063a
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 111 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -12,4 +12,4 @@ data/
*.swp
*.swo
.erlang.mk/
emq_mod_retainer.d
emq_retainer.d
8 changes: 4 additions & 4 deletions Makefile
@@ -1,6 +1,6 @@
PROJECT = emq_mod_retainer
PROJECT_DESCRIPTION = Retainer Module
PROJECT_VERSION = 2.0.7
PROJECT = emq_retainer
PROJECT_DESCRIPTION = EMQ Retainer
PROJECT_VERSION = 2.1

BUILD_DEPS = emqttd cuttlefish
dep_emqttd = git https://github.com/emqtt/emqttd master
Expand All @@ -11,5 +11,5 @@ ERLC_OPTS += +'{parse_transform, lager_transform}'
include erlang.mk

app.config::
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_mod_retainer.conf -i priv/emq_mod_retainer.schema -d data
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_retainer.conf -i priv/emq_retainer.schema -d data

23 changes: 12 additions & 11 deletions README.md
@@ -1,26 +1,27 @@
emq_mod_retainer
================

Retainer Module
EMQ Retainer
============

Configuration
-------------

etc/emq_mod_retainer.conf:
etc/emq_retainer.conf:

```
## disc: disc_copies, ram: ram_copies
## Notice: retainer's storage_type on each node in a cluster must be the same!
module.retainer.storage_type = disc
retainer.storage_type = disc
## Max number of retained messages
module.retainer.max_message_num = 100000
retainer.max_message_num = 1000000
## Max Payload Size of retained message
module.retainer.max_payload_size = 64KB
retainer.max_payload_size = 64KB
## Expired after seconds, never expired if 0
module.retainer.expired_after = 0
## Expiry interval. Never expired if 0
## h - hour
## m - minute
## s - second
retainer.expiry_interval = 0
```

License
Expand All @@ -31,5 +32,5 @@ Apache License Version 2.0
Author
------

feng at emqtt.io
Feng at emqtt.io

17 changes: 0 additions & 17 deletions etc/emq_mod_retainer.conf

This file was deleted.

23 changes: 0 additions & 23 deletions priv/emq_mod_retainer.schema

This file was deleted.

32 changes: 0 additions & 32 deletions src/emq_mod_retainer_app.erl

This file was deleted.

43 changes: 22 additions & 21 deletions src/emq_mod_retainer.erl → src/emq_retainer.erl
Expand Up @@ -14,7 +14,9 @@
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emq_mod_retainer).
-module(emq_retainer).

-author("Feng Lee <feng@emqtt.io>").

-behaviour(gen_server).

Expand All @@ -37,9 +39,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(mqtt_retained, {topic, msg}).
-record(mqtt_retained, {topic, msg, ts}).

-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
-record(state, {stats_fun, expiry_interval, stats_timer, expire_timer}).

%%--------------------------------------------------------------------
%% Load/Unload
Expand All @@ -65,16 +67,16 @@ on_message_publish(Msg = #mqtt_message{retain = true, topic = Topic, payload = <
mnesia:dirty_delete(mqtt_retained, Topic),
{stop, Msg};

on_message_publish(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}, Env) ->
on_message_publish(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload, timestamp = Ts}, Env) ->
case {is_table_full(Env), is_too_big(size(Payload), Env)} of
{false, false} ->
mnesia:dirty_write(#mqtt_retained{topic = Topic, msg = Msg}),
mnesia:dirty_write(#mqtt_retained{topic = Topic, msg = Msg, ts = emqttd_time:now_ms(Ts)}),
emqttd_metrics:set('messages/retained', retained_count());
{true, _}->
{true, _}->
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
{_, true}->
lager:error("Cannot retain message(topic=~s, payload_size=~p)"
" for payload is too big!", [Topic, size(Payload)])
{_, true}->
lager:error("Cannot retain message(topic=~s, payload_size=~p) "
"for payload is too big!", [Topic, byte_size(Payload)])
end,
{ok, Msg#mqtt_message{retain = false}}.

Expand Down Expand Up @@ -123,15 +125,15 @@ init([Env]) ->
StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'),
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
State = #state{stats_fun = StatsFun, stats_timer = StatsTimer},
{ok, init_expire_timer(proplists:get_value(expired_after, Env, 0), State)}.
{ok, start_expire_timer(proplists:get_value(expiry_interval, Env, 0), State)}.

init_expire_timer(0, State) ->
start_expire_timer(0, State) ->
State;
init_expire_timer(undefined, State) ->
start_expire_timer(undefined, State) ->
State;
init_expire_timer(Secs, State) ->
{ok, Timer} = timer:send_interval(timer:seconds(Secs), expire),
State#state{expired_after = Secs, expire_timer = Timer}.
start_expire_timer(Ms, State) ->
{ok, Timer} = timer:send_interval(Ms, expire),
State#state{expiry_interval = Ms, expire_timer = Timer}.

handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
Expand All @@ -143,20 +145,19 @@ handle_info(stats, State = #state{stats_fun = StatsFun}) ->
StatsFun(retained_count()),
{noreply, State, hibernate};

handle_info(expire, State = #state{expired_after = Never})
handle_info(expire, State = #state{expiry_interval = Never})
when Never =:= 0 orelse Never =:= undefined ->
{noreply, State, hibernate};

handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
expire_messages(emqttd_time:now_to_secs() - ExpiredAfter),
handle_info(expire, State = #state{expiry_interval = Interval}) ->
expire_messages(emqttd_time:now_ms() - Interval),
{noreply, State, hibernate};

handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).

terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) ->
timer:cancel(TRef1),
timer:cancel(TRef2).
timer:cancel(TRef1), timer:cancel(TRef2).

code_change(_OldVsn, State, _Extra) ->
{ok, State}.
Expand Down Expand Up @@ -185,7 +186,7 @@ expire_messages(Time) when is_integer(Time) ->
mnesia:transaction(
fun() ->
Match = ets:fun2ms(
fun(#mqtt_retained{topic = Topic, msg = #mqtt_message{timestamp = Ts}})
fun(#mqtt_retained{topic = Topic, ts = Ts})
when Time > Ts -> Topic
end),
Topics = mnesia:select(mqtt_retained, Match, write),
Expand Down
6 changes: 4 additions & 2 deletions src/emq_mod_retainer_sup.erl → src/emq_retainer_sup.erl
Expand Up @@ -14,15 +14,17 @@
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emq_mod_retainer_sup).
-module(emq_retainer_sup).

-author("Feng Lee <feng@emqtt.io>").

-behaviour(supervisor).

-export([start_link/1]).

-export([init/1]).

-define(M, emq_mod_retainer).
-define(M, emq_retainer).

start_link(Env) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]).
Expand Down

0 comments on commit 1e3063a

Please sign in to comment.