Skip to content

Commit

Permalink
feat: cluster link prototype WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeTupchiy committed Apr 19, 2024
1 parent 2fea651 commit 23b44f7
Show file tree
Hide file tree
Showing 18 changed files with 1,379 additions and 35 deletions.
74 changes: 48 additions & 26 deletions apps/emqx/src/emqx_broker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ publish(Msg) when is_record(Msg, message) ->
[];
Msg1 = #message{topic = Topic} ->
PersistRes = persist_publish(Msg1),
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes)
route(aggre(emqx_router:match_routes(Topic), Msg1), delivery(Msg1), PersistRes)
end.

persist_publish(Msg) ->
Expand Down Expand Up @@ -311,25 +311,39 @@ do_route({To, Node}, Delivery) when Node =:= node() ->
{Node, To, dispatch(To, Delivery)};
do_route({To, Node}, Delivery) when is_atom(Node) ->
{Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
do_route({To, {external, _} = ExtDest}, Delivery) ->
{ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)};
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.

aggre([]) ->
aggre([], _Msg) ->
[];
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
aggre([#route{topic = To, dest = Node}], _Msg) when is_atom(Node) ->
[{To, Node}];
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
aggre([#route{topic = To, dest = {external, _} = ExtDest}], Msg) ->
case emqx_external_broker:is_relevant_route(ExtDest, Msg) of
true -> [{To, ExtDest}];
false -> []
end;
aggre([#route{topic = To, dest = {Group, _Node}}], _Msg) ->
[{To, Group}];
aggre(Routes) ->
aggre(Routes, false, []).

aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) ->
aggre(Rest, Dedup, [{To, Node} | Acc]);
aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) ->
aggre(Rest, true, [{To, Group} | Acc]);
aggre([], false, Acc) ->
aggre(Routes, Msg) ->
aggre(Routes, Msg, false, []).

aggre([#route{topic = To, dest = Node} | Rest], Msg, Dedup, Acc) when is_atom(Node) ->
aggre(Rest, Msg, Dedup, [{To, Node} | Acc]);
aggre([#route{topic = To, dest = {external, _} = ExtDest} | Rest], Msg, Dedup, Acc) ->
Acc1 =
case emqx_external_broker:is_relevant_route(ExtDest, Msg) of
true -> [{To, ExtDest} | Acc];
false -> Acc
end,
aggre(Rest, Msg, Dedup, Acc1);
aggre([#route{topic = To, dest = {Group, _Node}} | Rest], Msg, _Dedup, Acc) ->
aggre(Rest, Msg, true, [{To, Group} | Acc]);
aggre([], _Msg, false, Acc) ->
Acc;
aggre([], true, Acc) ->
aggre([], _Msg, true, Acc) ->
lists:usort(Acc).

%% @doc Forward message to another node.
Expand Down Expand Up @@ -643,19 +657,27 @@ maybe_delete_route(Topic) ->

sync_route(Action, Topic, ReplyTo) ->
EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),
case EnabledOn of
all ->
push_sync_route(Action, Topic, ReplyTo);
none ->
regular_sync_route(Action, Topic);
Role ->
case Role =:= mria_config:whoami() of
true ->
push_sync_route(Action, Topic, ReplyTo);
false ->
regular_sync_route(Action, Topic)
end
end.
Res =
case EnabledOn of
all ->
push_sync_route(Action, Topic, ReplyTo);
none ->
regular_sync_route(Action, Topic);
Role ->
case Role =:= mria_config:whoami() of
true ->
push_sync_route(Action, Topic, ReplyTo);
false ->
regular_sync_route(Action, Topic)
end
end,
_ = external_sync_route(Action, Topic),
Res.

external_sync_route(add, Topic) ->
emqx_external_broker:maybe_add_route(Topic);
external_sync_route(delete, Topic) ->
emqx_external_broker:maybe_delete_route(Topic).

push_sync_route(Action, Topic, Opts) ->
emqx_router_syncer:push(Action, Topic, node(), Opts).
Expand Down
117 changes: 117 additions & 0 deletions apps/emqx/src/emqx_external_broker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_external_broker).

-callback forward(emqx_router:external_dest(), emqx_types:delivery()) ->
emqx_types:deliver_result().

-callback is_relevant_route(emqx_router:external_dest(), emqx_types:message()) -> boolean().

-callback maybe_add_route(emqx_types:topic()) -> ok.
-callback maybe_delete_route(emqx_types:topic()) -> ok.

-export([
provider/0,
register_provider/1,
unregister_provider/1,
forward/2,
is_relevant_route/2,
maybe_add_route/1,
maybe_delete_route/1
]).

-include("logger.hrl").

-define(PROVIDER, {?MODULE, external_broker}).

-define(safe_with_provider(IfRegistered, IfNotRegistered),
case persistent_term:get(?PROVIDER, undefined) of
undefined ->
IfNotRegistered;
Provider ->
try
Provider:IfRegistered
catch
Err:Reason:St ->
?SLOG(error, #{
msg => "external_broker_crashed",
provider => Provider,
callback => ?FUNCTION_NAME,
stacktrace => St,
error => Err,
reason => Reason
}),
{error, Reason}
end
end
).

%% TODO: provider API copied from emqx_external_traces,
%% but it can be moved to a common module.

%%--------------------------------------------------------------------
%% Provider API
%%--------------------------------------------------------------------

-spec register_provider(module()) -> ok | {error, term()}.
register_provider(Module) when is_atom(Module) ->
case is_valid_provider(Module) of
true ->
persistent_term:put(?PROVIDER, Module);
false ->
{error, invalid_provider}
end.

-spec unregister_provider(module()) -> ok | {error, term()}.
unregister_provider(Module) ->
case persistent_term:get(?PROVIDER, undefined) of
Module ->
persistent_term:erase(?PROVIDER),
ok;
_ ->
{error, not_registered}
end.

-spec provider() -> module() | undefined.
provider() ->
persistent_term:get(?PROVIDER, undefined).

%%--------------------------------------------------------------------
%% Broker API
%%--------------------------------------------------------------------

forward(ExternalDest, Delivery) ->
?safe_with_provider(?FUNCTION_NAME(ExternalDest, Delivery), {error, unknown_dest}).

is_relevant_route(ExternalDest, Message) ->
?safe_with_provider(?FUNCTION_NAME(ExternalDest, Message), false).

maybe_add_route(Topic) ->
?safe_with_provider(?FUNCTION_NAME(Topic), ok).

maybe_delete_route(Topic) ->
?safe_with_provider(?FUNCTION_NAME(Topic), ok).

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

is_valid_provider(Module) ->
lists:all(
fun({F, A}) -> erlang:function_exported(Module, F, A) end,
?MODULE:behaviour_info(callbacks)
).
5 changes: 3 additions & 2 deletions apps/emqx/src/emqx_router.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@
deinit_schema/0
]).

-export_type([dest/0]).
-export_type([dest/0, external_dest/0]).
-export_type([schemavsn/0]).

-type group() :: binary().
-type dest() :: node() | {group(), node()}.
-type external_dest() :: {external, term()}.
-type dest() :: node() | {group(), node()} | external_dest().
-type schemavsn() :: v1 | v2.

%% Operation :: {add, ...} | {delete, ...}.
Expand Down
44 changes: 43 additions & 1 deletion apps/emqx/src/emqx_topic.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
feed_var/3,
systop/1,
parse/1,
parse/2
parse/2,
intersection/2
]).

-export([
Expand All @@ -52,6 +53,8 @@
((C =:= '#' orelse C =:= <<"#">>) andalso REST =/= [])
).

-define(IS_WILDCARD(W), W =:= '+' orelse W =:= '#').

%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -98,6 +101,45 @@ match(_, ['#']) ->
match(_, _) ->
false.

%% @doc Finds an intersection between two topics, two filters or a topic and a filter.
%% The function is commutative: reversing parameters doesn't affect the returned value.
%% Two topics intersect only when they are equal.
%% The intersection of a topic and a filter is always either the topic itself or false (no intersection).
%% The intersection of two filters is either false or a new topic filter that would match only those topics,
%% that can be matched by both input filters.
%% For example, the intersection of "t/global/#" and "t/+/1/+" is "t/global/1/+".
-spec intersection(TopicOrFilter, TopicOrFilter) -> TopicOrFilter | false when
TopicOrFilter :: emqx_types:topic().
intersection(Topic1, Topic2) when is_binary(Topic1), is_binary(Topic2) ->
case intersection(words(Topic1), words(Topic2), []) of
[] -> false;
Intersection -> join(lists:reverse(Intersection))
end.

intersection(Words1, ['#'], Acc) ->
lists:reverse(Words1, Acc);
intersection(['#'], Words2, Acc) ->
lists:reverse(Words2, Acc);
intersection([W1], ['+'], Acc) ->
[W1 | Acc];
intersection(['+'], [W2], Acc) ->
[W2 | Acc];
intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W1), ?IS_WILDCARD(W2) ->
intersection(T1, T2, [wildcard_intersection(W1, W2) | Acc]);
intersection([W | T1], [W | T2], Acc) ->
intersection(T1, T2, [W | Acc]);
intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W1) ->
intersection(T1, T2, [W2 | Acc]);
intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W2) ->
intersection(T1, T2, [W1 | Acc]);
intersection([], [], Acc) ->
Acc;
intersection(_, _, _) ->
[].

wildcard_intersection(W, W) -> W;
wildcard_intersection(_, _) -> '+'.

-spec match_share(Name, Filter) -> boolean() when
Name :: share(),
Filter :: topic() | share().
Expand Down
7 changes: 7 additions & 0 deletions apps/emqx/src/emqx_topic_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
-export([delete/3]).
-export([match/2]).
-export([matches/3]).
-export([matches_filter/3]).

-export([make_key/2]).

Expand Down Expand Up @@ -72,6 +73,12 @@ match(Topic, Tab) ->
matches(Topic, Tab, Opts) ->
emqx_trie_search:matches(Topic, make_nextf(Tab), Opts).

%% @doc Match given topic filter against the index and return _all_ matches.
%% If `unique` option is given, return only unique matches by record ID.
-spec matches_filter(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [match(_ID)].
matches_filter(TopicFilter, Tab, Opts) ->
emqx_trie_search:matches_filter(TopicFilter, make_nextf(Tab), Opts).

%% @doc Extract record ID from the match.
-spec get_id(match(ID)) -> ID.
get_id(Key) ->
Expand Down
30 changes: 27 additions & 3 deletions apps/emqx/src/emqx_trie_search.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
-module(emqx_trie_search).

-export([make_key/2, make_pat/2, filter/1]).
-export([match/2, matches/3, get_id/1, get_topic/1]).
-export([match/2, matches/3, get_id/1, get_topic/1, matches_filter/3]).
-export_type([key/1, word/0, words/0, nextf/0, opts/0]).

-define(END, '$end_of_table').
Expand Down Expand Up @@ -183,9 +183,20 @@ match(Topic, NextF) ->
matches(Topic, NextF, Opts) ->
search(Topic, NextF, Opts).

%% @doc Match given topic filter against the index and return _all_ matches.
-spec matches_filter(emqx_types:topic(), nextf(), opts()) -> [key(_)].
matches_filter(TopicFilter, NextF, Opts) ->
search(TopicFilter, NextF, [topic_filter | Opts]).

%% @doc Entrypoint of the search for a given topic.
search(Topic, NextF, Opts) ->
Words = topic_words(Topic),
%% A private opt
IsFilter = proplists:get_bool(topic_filter, Opts),
Words =
case IsFilter of
true -> filter_words(Topic);
false -> topic_words(Topic)
end,
Base = base_init(Words),
ORetFirst = proplists:get_bool(return_first, Opts),
OUnique = proplists:get_bool(unique, Opts),
Expand All @@ -200,8 +211,10 @@ search(Topic, NextF, Opts) ->
end,
Matches =
case search_new(Words, Base, NextF, Acc0) of
{Cursor, Acc} ->
{Cursor, Acc} when not IsFilter ->
match_topics(Topic, Cursor, NextF, Acc);
{_Cursor, Acc} ->
Acc;
Acc ->
Acc
end,
Expand Down Expand Up @@ -275,6 +288,17 @@ compare(['#'], _Words, _) ->
% Closest possible next entries that we must not miss:
% * a/+/+/d/# (same topic but a different ID)
match_full;
%% Filter search %%
compare(_Filter, ['#'], _) ->
match_full;
compare([_ | TF], ['+' | TW], Pos) ->
case compare(TF, TW, Pos + 1) of
lower ->
lower;
Other ->
Other
end;
%% Filter search end %%
compare(['+' | TF], [HW | TW], Pos) ->
case compare(TF, TW, Pos + 1) of
lower ->
Expand Down

0 comments on commit 23b44f7

Please sign in to comment.