Skip to content
Permalink
Browse files

Merge pull request #2758 from emqx/merge_3.3

Fix unmount crash
  • Loading branch information
turtleDeng committed Aug 12, 2019
2 parents be2ce93 + 0e82170 commit 58ba22dfc79ce81ac74fffae60a624d2238585ca
Showing with 60 additions and 50 deletions.
  1. +20 −16 src/emqx_mountpoint.erl
  2. +1 −1 src/emqx_protocol.erl
  3. +39 −33 src/emqx_topic.erl
@@ -15,9 +15,7 @@
-module(emqx_mountpoint).

-include("emqx.hrl").
-include("logger.hrl").

-logger_header("[Mountpoint]").
-include("types.hrl").

-export([ mount/2
, unmount/2
@@ -32,30 +30,34 @@
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------

mount(undefined, Any) ->
Any;
mount(MountPoint, Topic) when is_binary(Topic) ->
prefix(MountPoint, Topic);
mount(MountPoint, Msg = #message{topic = Topic}) ->
Msg#message{topic = <<MountPoint/binary, Topic/binary>>};

Msg#message{topic = prefix(MountPoint, Topic)};
mount(MountPoint, TopicFilters) when is_list(TopicFilters) ->
[{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters].
[{prefix(MountPoint, Topic), SubOpts} || {Topic, SubOpts} <- TopicFilters].

unmount(undefined, Msg) ->
Msg;
unmount(undefined, Any) ->
Any;
unmount(MountPoint, Topic) when is_binary(Topic) ->
case string:prefix(Topic, MountPoint) of
nomatch -> Topic;
Topic1 -> Topic1
end;
unmount(MountPoint, Msg = #message{topic = Topic}) ->
try split_binary(Topic, byte_size(MountPoint)) of
{MountPoint, Topic1} -> Msg#message{topic = Topic1}
catch
_Error:Reason ->
?LOG(error, "Unmount error : ~p", [Reason]),
Msg
case string:prefix(Topic, MountPoint) of
nomatch -> Msg;
Topic1 -> Msg#message{topic = Topic1}
end.

-spec(replvar(maybe(mountpoint()), map()) -> maybe(mountpoint())).
replvar(undefined, _Vars) ->
undefined;
replvar(MountPoint, #{client_id := ClientId, username := Username}) ->
lists:foldl(fun feed_var/2, MountPoint, [{<<"%c">>, ClientId}, {<<"%u">>, Username}]).
lists:foldl(fun feed_var/2, MountPoint,
[{<<"%c">>, ClientId}, {<<"%u">>, Username}]).

feed_var({<<"%c">>, ClientId}, MountPoint) ->
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
@@ -64,3 +66,5 @@ feed_var({<<"%u">>, undefined}, MountPoint) ->
feed_var({<<"%u">>, Username}, MountPoint) ->
emqx_topic:feed_var(<<"%u">>, Username, MountPoint).

prefix(MountPoint, Topic) ->
<<MountPoint/binary, Topic/binary>>.
@@ -1027,7 +1027,7 @@ raw_topic_filters(#pstate{zone = Zone, proto_ver = ProtoVer, is_bridge = IsBridg
end.

mountpoint(Credentials) ->
maps:get(mountpoint, Credentials, undefined).
emqx_mountpoint:replvar(maps:get(mountpoint, Credentials, undefined), Credentials).

do_check_banned(_EnableBan = true, Credentials) ->
case emqx_banned:check(Credentials) of
@@ -14,8 +14,6 @@

-module(emqx_topic).

-include("emqx_mqtt.hrl").

%% APIs
-export([ match/2
, validate/1
@@ -33,19 +31,23 @@
, parse/2
]).

-export_type([ group/0
, topic/0
, word/0
, triple/0
]).

-type(group() :: binary()).
-type(topic() :: binary()).
-type(word() :: '' | '+' | '#' | binary()).
-type(words() :: list(word())).
-opaque(triple() :: {root | binary(), word(), binary()}).

-export_type([group/0, topic/0, word/0, triple/0]).

-define(MAX_TOPIC_LEN, 4096).

%%------------------------------------------------------------------------------
%%--------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
%%--------------------------------------------------------------------

%% @doc Is wildcard topic?
-spec(wildcard(topic() | words()) -> true | false).
@@ -60,15 +62,15 @@ wildcard(['+'|_]) ->
wildcard([_H|T]) ->
wildcard(T).

%% @doc Match Topic name with filter
%% @doc Match Topic name with filter.
-spec(match(Name, Filter) -> boolean() when
Name :: topic() | words(),
Filter :: topic() | words()).
match(<<$$, _/binary>>, <<$+, _/binary>>) ->
false;
match(<<$$, _/binary>>, <<$#, _/binary>>) ->
false;
match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
match(words(Name), words(Filter));
match([], []) ->
true;
@@ -95,13 +97,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter ->
-spec(validate(name | filter, topic()) -> true).
validate(_, <<>>) ->
error(empty_topic);
validate(_, Topic) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) ->
error(topic_too_long);
validate(filter, Topic) when is_binary(Topic) ->
validate2(words(Topic));
validate(name, Topic) when is_binary(Topic) ->
Words = words(Topic),
validate2(Words) and (not wildcard(Words)).
validate2(Words)
andalso (not wildcard(Words))
orelse error(topic_name_error).

validate2([]) ->
true;
@@ -123,7 +127,7 @@ validate3(<<C/utf8, _Rest/binary>>) when C == $#; C == $+; C == 0 ->
validate3(<<_/utf8, Rest/binary>>) ->
validate3(Rest).

%% @doc Topic to triples
%% @doc Topic to triples.
-spec(triples(topic()) -> list(triple())).
triples(Topic) when is_binary(Topic) ->
triples(words(Topic), root, []).
@@ -206,27 +210,29 @@ join(Words) ->
end, {true, <<>>}, [bin(W) || W <- Words]),
Bin.

-spec(parse(topic()) -> {topic(), #{}}).
parse(Topic) when is_binary(Topic) ->
parse(Topic, #{}).

parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) ->
error({invalid_topic, Topic});
parse(Topic = <<?SHARE, "/", _/binary>>, #{share := _Group}) ->
error({invalid_topic, Topic});
parse(<<"$queue/", Topic1/binary>>, Options) ->
parse(Topic1, maps:put(share, <<"$queue">>, Options));
parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) ->
case binary:split(Topic1, <<"/">>) of
[<<>>] -> error({invalid_topic, Topic});
[_] -> error({invalid_topic, Topic});
[Group, Topic2] ->
case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of
nomatch -> {Topic2, maps:put(share, Group, Options)};
_ -> error({invalid_topic, Topic})
-spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}).
parse(TopicFilter) when is_binary(TopicFilter) ->
parse(TopicFilter, #{});
parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse(TopicFilter, Options).

-spec(parse(topic(), map()) -> {topic(), map()}).
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
parse(TopicFilter, Options#{share => <<"$queue">>});
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
case binary:split(Rest, <<"/">>) of
[_Any] -> error({invalid_topic_filter, TopicFilter});
[ShareName, Filter] ->
case binary:match(ShareName, [<<"+">>, <<"#">>]) of
nomatch -> parse(Filter, Options#{share => ShareName});
_ -> error({invalid_topic_filter, TopicFilter})
end
end;
parse(Topic, Options = #{qos := QoS}) ->
{Topic, Options#{rc => QoS}};
parse(Topic, Options) ->
{Topic, Options}.
parse(TopicFilter, Options = #{qos := QoS}) ->
{TopicFilter, Options#{rc => QoS}};
parse(TopicFilter, Options) ->
{TopicFilter, Options}.

0 comments on commit 58ba22d

Please sign in to comment.
You can’t perform that action at this time.