Skip to content

Commit

Permalink
feat: cluster link prototype WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeTupchiy committed Apr 24, 2024
1 parent 712008b commit 6bb70c1
Show file tree
Hide file tree
Showing 18 changed files with 1,686 additions and 36 deletions.
87 changes: 60 additions & 27 deletions apps/emqx/src/emqx_broker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,22 @@ publish(Msg) when is_record(Msg, message) ->
topic => Topic
}),
[];
Msg1 = #message{topic = Topic} ->
PersistRes = persist_publish(Msg1),
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes)
Msg1 = #message{} ->
do_publish(Msg1);
Msgs when is_list(Msgs) -> do_publish_many(Msgs)
end.

do_publish_many([]) ->
[];
do_publish_many([Msg | T]) ->
do_publish(Msg) ++ do_publish_many(T).

do_publish(#message{topic = Topic} = Msg) ->
PersistRes = persist_publish(Msg),
{Routes, ExtRoutes} = aggre(emqx_router:match_routes(Topic)),
Routes1 = maybe_add_ext_routes(ExtRoutes, Routes, Msg),
route(Routes1, delivery(Msg), PersistRes).

persist_publish(Msg) ->
case emqx_persistent_message:persist(Msg) of
ok ->
Expand Down Expand Up @@ -311,26 +322,40 @@ do_route({To, Node}, Delivery) when Node =:= node() ->
{Node, To, dispatch(To, Delivery)};
do_route({To, Node}, Delivery) when is_atom(Node) ->
{Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
do_route({To, {external, _} = ExtDest}, Delivery) ->
{ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)};
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.

aggre([]) ->
[];
{[], []};
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
[{To, Node}];
{[{To, Node}], []};
aggre([#route{topic = To, dest = {external, _} = ExtDest}]) ->
{[], [{To, ExtDest}]};
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
[{To, Group}];
{[{To, Group}], []};
aggre(Routes) ->
aggre(Routes, false, []).

aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) ->
aggre(Rest, Dedup, [{To, Node} | Acc]);
aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) ->
aggre(Rest, true, [{To, Group} | Acc]);
aggre(Routes, false, {[], []}).

aggre([#route{topic = To, dest = Node} | Rest], Dedup, {Acc, ExtAcc}) when is_atom(Node) ->
aggre(Rest, Dedup, {[{To, Node} | Acc], ExtAcc});
aggre([#route{topic = To, dest = {external, _} = ExtDest} | Rest], Dedup, {Acc, ExtAcc}) ->
aggre(Rest, Dedup, {Acc, [{To, ExtDest} | ExtAcc]});
aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, {Acc, ExtAcc}) ->
aggre(Rest, true, {[{To, Group} | Acc], ExtAcc});
aggre([], false, Acc) ->
Acc;
aggre([], true, Acc) ->
lists:usort(Acc).
aggre([], true, {Acc, ExtAcc}) ->
{lists:usort(Acc), lists:usort(ExtAcc)}.

maybe_add_ext_routes([] = _ExtRoutes, Routes, _Msg) ->
Routes;
maybe_add_ext_routes(ExtRoutes, Routes, Msg) ->
case emqx_external_broker:should_route_to_external_dests(Msg) of
true -> Routes ++ ExtRoutes;
false -> Routes
end.

%% @doc Forward message to another node.
-spec forward(
Expand Down Expand Up @@ -643,19 +668,27 @@ maybe_delete_route(Topic) ->

sync_route(Action, Topic, ReplyTo) ->
EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),
case EnabledOn of
all ->
push_sync_route(Action, Topic, ReplyTo);
none ->
regular_sync_route(Action, Topic);
Role ->
case Role =:= mria_config:whoami() of
true ->
push_sync_route(Action, Topic, ReplyTo);
false ->
regular_sync_route(Action, Topic)
end
end.
Res =
case EnabledOn of
all ->
push_sync_route(Action, Topic, ReplyTo);
none ->
regular_sync_route(Action, Topic);
Role ->
case Role =:= mria_config:whoami() of
true ->
push_sync_route(Action, Topic, ReplyTo);
false ->
regular_sync_route(Action, Topic)
end
end,
_ = external_sync_route(Action, Topic),
Res.

external_sync_route(add, Topic) ->
emqx_external_broker:maybe_add_route(Topic);
external_sync_route(delete, Topic) ->
emqx_external_broker:maybe_delete_route(Topic).

push_sync_route(Action, Topic, Opts) ->
emqx_router_syncer:push(Action, Topic, node(), Opts).
Expand Down
117 changes: 117 additions & 0 deletions apps/emqx/src/emqx_external_broker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_external_broker).

-callback forward(emqx_router:external_dest(), emqx_types:delivery()) ->
emqx_types:deliver_result().

-callback should_route_to_external_dests(emqx_types:message()) -> boolean().

-callback maybe_add_route(emqx_types:topic()) -> ok.
-callback maybe_delete_route(emqx_types:topic()) -> ok.

-export([
provider/0,
register_provider/1,
unregister_provider/1,
forward/2,
should_route_to_external_dests/1,
maybe_add_route/1,
maybe_delete_route/1
]).

-include("logger.hrl").

-define(PROVIDER, {?MODULE, external_broker}).

-define(safe_with_provider(IfRegistered, IfNotRegistered),
case persistent_term:get(?PROVIDER, undefined) of
undefined ->
IfNotRegistered;
Provider ->
try
Provider:IfRegistered
catch
Err:Reason:St ->
?SLOG(error, #{
msg => "external_broker_crashed",
provider => Provider,
callback => ?FUNCTION_NAME,
stacktrace => St,
error => Err,
reason => Reason
}),
{error, Reason}
end
end
).

%% TODO: provider API copied from emqx_external_traces,
%% but it can be moved to a common module.

%%--------------------------------------------------------------------
%% Provider API
%%--------------------------------------------------------------------

-spec register_provider(module()) -> ok | {error, term()}.
register_provider(Module) when is_atom(Module) ->
case is_valid_provider(Module) of
true ->
persistent_term:put(?PROVIDER, Module);
false ->
{error, invalid_provider}
end.

-spec unregister_provider(module()) -> ok | {error, term()}.
unregister_provider(Module) ->
case persistent_term:get(?PROVIDER, undefined) of
Module ->
persistent_term:erase(?PROVIDER),
ok;
_ ->
{error, not_registered}
end.

-spec provider() -> module() | undefined.
provider() ->
persistent_term:get(?PROVIDER, undefined).

%%--------------------------------------------------------------------
%% Broker API
%%--------------------------------------------------------------------

forward(ExternalDest, Delivery) ->
?safe_with_provider(?FUNCTION_NAME(ExternalDest, Delivery), {error, unknown_dest}).

should_route_to_external_dests(Message) ->
?safe_with_provider(?FUNCTION_NAME(Message), false).

maybe_add_route(Topic) ->
?safe_with_provider(?FUNCTION_NAME(Topic), ok).

maybe_delete_route(Topic) ->
?safe_with_provider(?FUNCTION_NAME(Topic), ok).

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

is_valid_provider(Module) ->
lists:all(
fun({F, A}) -> erlang:function_exported(Module, F, A) end,
?MODULE:behaviour_info(callbacks)
).
5 changes: 3 additions & 2 deletions apps/emqx/src/emqx_router.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@
deinit_schema/0
]).

-export_type([dest/0]).
-export_type([dest/0, external_dest/0]).
-export_type([schemavsn/0]).

-type group() :: binary().
-type dest() :: node() | {group(), node()}.
-type external_dest() :: {external, term()}.
-type dest() :: node() | {group(), node()} | external_dest().
-type schemavsn() :: v1 | v2.

%% Operation :: {add, ...} | {delete, ...}.
Expand Down
44 changes: 43 additions & 1 deletion apps/emqx/src/emqx_topic.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
feed_var/3,
systop/1,
parse/1,
parse/2
parse/2,
intersection/2
]).

-export([
Expand All @@ -52,6 +53,8 @@
((C =:= '#' orelse C =:= <<"#">>) andalso REST =/= [])
).

-define(IS_WILDCARD(W), W =:= '+' orelse W =:= '#').

%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -98,6 +101,45 @@ match(_, ['#']) ->
match(_, _) ->
false.

%% @doc Finds an intersection between two topics, two filters or a topic and a filter.
%% The function is commutative: reversing parameters doesn't affect the returned value.
%% Two topics intersect only when they are equal.
%% The intersection of a topic and a filter is always either the topic itself or false (no intersection).
%% The intersection of two filters is either false or a new topic filter that would match only those topics,
%% that can be matched by both input filters.
%% For example, the intersection of "t/global/#" and "t/+/1/+" is "t/global/1/+".
-spec intersection(TopicOrFilter, TopicOrFilter) -> TopicOrFilter | false when
TopicOrFilter :: emqx_types:topic().
intersection(Topic1, Topic2) when is_binary(Topic1), is_binary(Topic2) ->
case intersection(words(Topic1), words(Topic2), []) of
[] -> false;
Intersection -> join(lists:reverse(Intersection))
end.

intersection(Words1, ['#'], Acc) ->
lists:reverse(Words1, Acc);
intersection(['#'], Words2, Acc) ->
lists:reverse(Words2, Acc);
intersection([W1], ['+'], Acc) ->
[W1 | Acc];
intersection(['+'], [W2], Acc) ->
[W2 | Acc];
intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W1), ?IS_WILDCARD(W2) ->
intersection(T1, T2, [wildcard_intersection(W1, W2) | Acc]);
intersection([W | T1], [W | T2], Acc) ->
intersection(T1, T2, [W | Acc]);
intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W1) ->
intersection(T1, T2, [W2 | Acc]);
intersection([W1 | T1], [W2 | T2], Acc) when ?IS_WILDCARD(W2) ->
intersection(T1, T2, [W1 | Acc]);
intersection([], [], Acc) ->
Acc;
intersection(_, _, _) ->
[].

wildcard_intersection(W, W) -> W;
wildcard_intersection(_, _) -> '+'.

-spec match_share(Name, Filter) -> boolean() when
Name :: share(),
Filter :: topic() | share().
Expand Down
7 changes: 7 additions & 0 deletions apps/emqx/src/emqx_topic_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
-export([delete/3]).
-export([match/2]).
-export([matches/3]).
-export([matches_filter/3]).

-export([make_key/2]).

Expand Down Expand Up @@ -72,6 +73,12 @@ match(Topic, Tab) ->
matches(Topic, Tab, Opts) ->
emqx_trie_search:matches(Topic, make_nextf(Tab), Opts).

%% @doc Match given topic filter against the index and return _all_ matches.
%% If `unique` option is given, return only unique matches by record ID.
-spec matches_filter(emqx_types:topic(), ets:table(), emqx_trie_search:opts()) -> [match(_ID)].
matches_filter(TopicFilter, Tab, Opts) ->
emqx_trie_search:matches_filter(TopicFilter, make_nextf(Tab), Opts).

%% @doc Extract record ID from the match.
-spec get_id(match(ID)) -> ID.
get_id(Key) ->
Expand Down
30 changes: 27 additions & 3 deletions apps/emqx/src/emqx_trie_search.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
-module(emqx_trie_search).

-export([make_key/2, make_pat/2, filter/1]).
-export([match/2, matches/3, get_id/1, get_topic/1]).
-export([match/2, matches/3, get_id/1, get_topic/1, matches_filter/3]).
-export_type([key/1, word/0, words/0, nextf/0, opts/0]).

-define(END, '$end_of_table').
Expand Down Expand Up @@ -183,9 +183,20 @@ match(Topic, NextF) ->
matches(Topic, NextF, Opts) ->
search(Topic, NextF, Opts).

%% @doc Match given topic filter against the index and return _all_ matches.
-spec matches_filter(emqx_types:topic(), nextf(), opts()) -> [key(_)].
matches_filter(TopicFilter, NextF, Opts) ->
search(TopicFilter, NextF, [topic_filter | Opts]).

%% @doc Entrypoint of the search for a given topic.
search(Topic, NextF, Opts) ->
Words = topic_words(Topic),
%% A private opt
IsFilter = proplists:get_bool(topic_filter, Opts),
Words =
case IsFilter of
true -> filter_words(Topic);
false -> topic_words(Topic)
end,
Base = base_init(Words),
ORetFirst = proplists:get_bool(return_first, Opts),
OUnique = proplists:get_bool(unique, Opts),
Expand All @@ -200,8 +211,10 @@ search(Topic, NextF, Opts) ->
end,
Matches =
case search_new(Words, Base, NextF, Acc0) of
{Cursor, Acc} ->
{Cursor, Acc} when not IsFilter ->
match_topics(Topic, Cursor, NextF, Acc);
{_Cursor, Acc} ->
Acc;
Acc ->
Acc
end,
Expand Down Expand Up @@ -275,6 +288,17 @@ compare(['#'], _Words, _) ->
% Closest possible next entries that we must not miss:
% * a/+/+/d/# (same topic but a different ID)
match_full;
%% Filter search %%
compare(_Filter, ['#'], _) ->
match_full;
compare([_ | TF], ['+' | TW], Pos) ->
case compare(TF, TW, Pos + 1) of
lower ->
lower;
Other ->
Other
end;
%% Filter search end %%
compare(['+' | TF], [HW | TW], Pos) ->
case compare(TF, TW, Pos + 1) of
lower ->
Expand Down

0 comments on commit 6bb70c1

Please sign in to comment.