Skip to content

Commit

Permalink
Merge pull request #1926 from emqx/rewrite-mqueue
Browse files Browse the repository at this point in the history
Rewrite emqx_mqueue.erl
  • Loading branch information
spring2maz committed Nov 3, 2018
2 parents 41b79e4 + 28c8f2d commit 7ad3a63
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 194 deletions.
15 changes: 7 additions & 8 deletions Makefile
Expand Up @@ -74,8 +74,10 @@ etc/gen.emqx.conf: bbmustache etc/emqx.conf
ok = file:write_file('etc/gen.emqx.conf', Targ), \
halt(0)."

app.config: cuttlefish etc/gen.emqx.conf
$(verbose) ./cuttlefish -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/
CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish

app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf
$(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/

ct: app.config

Expand All @@ -86,19 +88,16 @@ coveralls:
@rebar3 coveralls send


cuttlefish: rebar-deps
@if [ ! -f cuttlefish ]; then \
make -C _build/default/lib/cuttlefish; \
mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \
fi
$(CUTTLEFISH_SCRIPT): rebar-deps
@if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi

rebar-xref:
@rebar3 xref

rebar-deps:
@rebar3 get-deps

rebar-eunit: cuttlefish
rebar-eunit: $(CUTTLEFISH_SCRIPT)
@rebar3 eunit

rebar-compile:
Expand Down
34 changes: 15 additions & 19 deletions etc/emqx.conf
Expand Up @@ -506,17 +506,6 @@ mqtt.wildcard_subscription = true
## Value: boolean
mqtt.shared_subscription = true

## Message queue type.
##
## Value: simple | priority
mqtt.mqueue_type = simple

## Topic priorities. Default is 0.
##
## Priority: Number [0-255]
##
## mqtt.mqueue_priorities = topic/1=10,topic/2=8

##--------------------------------------------------------------------
## Zones
##--------------------------------------------------------------------
Expand Down Expand Up @@ -649,22 +638,29 @@ zone.external.await_rel_timeout = 300s
## Default: 2h, 2 hours
zone.external.session_expiry_interval = 2h

## Message queue type.
##
## Value: simple | priority
zone.external.mqueue_type = simple

## Maximum queue length. Enqueued messages when persistent client disconnected,
## or inflight window is full. 0 means no limit.
##
## Value: Number >= 0
zone.external.max_mqueue_len = 1000

## Topic priorities. Default is 0.
## Topic priorities.
## 'none' to indicate no priority table (by default), hence all messages
## are treated equal
##
## Priority: Number [0-255]
## Priority number [1-255]
## Example: topic/1=10,topic/2=8
## NOTE: comma and equal signs are not allowed for priority topic names
## NOTE: messages for topics not in the priority table are treated as
## either highest or lowest priority depending on the configured
## value for mqueue_default_priority
##
zone.external.mqueue_priorities = none

## Default to highest priority for topics not matching priority table
##
## zone.external.mqueue_priorities = topic/1=10,topic/2=8
## Value: highest | lowest
zone.external.mqueue_default_priority = highest

## Whether to enqueue Qos0 messages.
##
Expand Down
6 changes: 6 additions & 0 deletions include/emqx.hrl
Expand Up @@ -27,6 +27,12 @@

-define(ERTS_MINIMUM_REQUIRED, "10.0").

%%--------------------------------------------------------------------
%% Configs
%%--------------------------------------------------------------------

-define(NO_PRIORITY_TABLE, none).

%%--------------------------------------------------------------------
%% Topics' prefix: $SYS | $queue | $share
%%--------------------------------------------------------------------
Expand Down
44 changes: 25 additions & 19 deletions priv/emqx.schema
Expand Up @@ -651,18 +651,6 @@ end}.
{datatype, {enum, [true, false]}}
]}.

%% @doc Type: simple | priority
{mapping, "mqtt.mqueue_type", "emqx.mqueue_type", [
{default, simple},
{datatype, {enum, [simple, priority]}}
]}.

%% @doc Topic Priorities: 0~255, Default is 0
{mapping, "mqtt.mqueue_priorities", "emqx.mqueue_priorities", [
{default, ""},
{datatype, string}
]}.

%%--------------------------------------------------------------------
%% Zones
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -804,24 +792,30 @@ end}.
{datatype, {duration, s}}
]}.

%% @doc Type: simple | priority
{mapping, "zone.$name.mqueue_type", "emqx.zones", [
{default, simple},
{datatype, {enum, [simple, priority]}}
]}.

%% @doc Max queue length. Enqueued messages when persistent client
%% disconnected, or inflight window is full. 0 means no limit.
{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
{default, 1000},
{datatype, integer}
]}.

%% @doc Topic Priorities: 0~255, Default is 0
%% @doc Topic Priorities, comma separated topic=priority pairs,
%% where priority should be integer in range 1-255 (inclusive)
%% 1 being the lowest and 255 being the highest.
%% default value `none` to indicate no priority table, hence all
%% messages are treated equal, which means either highest ('infinity'),
%% or lowest (0) depending on mqueue_default_priority config.
{mapping, "zone.$name.mqueue_priorities", "emqx.zones", [
{default, "none"},
{datatype, string}
]}.

%% @doc Default priority for topics not in priority table.
{mapping, "zone.$name.mqueue_default_priority", "emqx.zones", [
{default, lowest},
{datatype, {enum, [highest, lowest]}}
]}.

%% @doc Queue Qos0 messages?
{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
{default, true},
Expand Down Expand Up @@ -886,6 +880,18 @@ end}.
max_heap_size => Siz1}
end,
{force_shutdown_policy, ShutdownPolicy};
("mqueue_priorities", Val) ->
case Val of
"none" -> none; % NO_PRIORITY_TABLE
_ ->
lists:foldl(fun(T, Acc) ->
%% NOTE: space in "= " is intended
[{Topic, Prio}] = string:tokens(T, "= "),
P = list_to_integer(Prio),
(P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}),
maps:put(iolist_to_binary(Topic), P, Acc)
end, string:tokens(Val, ","))
end;
(Opt, Val) ->
{list_to_atom(Opt), Val}
end,
Expand Down
3 changes: 1 addition & 2 deletions rebar.config
Expand Up @@ -29,6 +29,5 @@
{cover_opts, [verbose]}.
{cover_export_enabled, true}.

%% rebar3_neotoma_plugin is needed to compile the .peg file for cuttlefish
{plugins, [coveralls, rebar3_neotoma_plugin]}.
{plugins, [coveralls]}.

6 changes: 3 additions & 3 deletions src/emqx_local_bridge.erl
Expand Up @@ -63,8 +63,7 @@ init([Pool, Id, Node, Topic, Options]) ->
Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}),
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
MQueue = emqx_mqueue:init(#{type => simple,
max_len => State#state.max_queue_len,
MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len,
store_qos0 => true}),
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
false ->
Expand Down Expand Up @@ -96,7 +95,8 @@ handle_cast(Msg, State) ->

handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) ->
%% TODO: how to drop???
{noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}};
{_Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
{noreply, State#state{mqueue = NewQ}};

handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),
Expand Down

0 comments on commit 7ad3a63

Please sign in to comment.