Skip to content

Commit

Permalink
feat: cluster link prototype WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeTupchiy committed Apr 12, 2024
1 parent 2fea651 commit 003a017
Show file tree
Hide file tree
Showing 15 changed files with 908 additions and 31 deletions.
74 changes: 48 additions & 26 deletions apps/emqx/src/emqx_broker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ publish(Msg) when is_record(Msg, message) ->
[];
Msg1 = #message{topic = Topic} ->
PersistRes = persist_publish(Msg1),
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes)
route(aggre(emqx_router:match_routes(Topic), Msg1), delivery(Msg1), PersistRes)
end.

persist_publish(Msg) ->
Expand Down Expand Up @@ -311,25 +311,39 @@ do_route({To, Node}, Delivery) when Node =:= node() ->
{Node, To, dispatch(To, Delivery)};
do_route({To, Node}, Delivery) when is_atom(Node) ->
{Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
do_route({To, {external, _} = ExtDest}, Delivery) ->
{ExtDest, To, emqx_external_broker:forward(ExtDest, Delivery)};
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.

aggre([]) ->
aggre([], _Msg) ->
[];
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
aggre([#route{topic = To, dest = Node}], _Msg) when is_atom(Node) ->
[{To, Node}];
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
aggre([#route{topic = To, dest = {external, _} = ExtDest}], Msg) ->
case emqx_external_broker:is_relevant_route(ExtDest, Msg) of
true -> [{To, ExtDest}];
false -> []
end;
aggre([#route{topic = To, dest = {Group, _Node}}], _Msg) ->
[{To, Group}];
aggre(Routes) ->
aggre(Routes, false, []).

aggre([#route{topic = To, dest = Node} | Rest], Dedup, Acc) when is_atom(Node) ->
aggre(Rest, Dedup, [{To, Node} | Acc]);
aggre([#route{topic = To, dest = {Group, _Node}} | Rest], _Dedup, Acc) ->
aggre(Rest, true, [{To, Group} | Acc]);
aggre([], false, Acc) ->
aggre(Routes, Msg) ->
aggre(Routes, Msg, false, []).

aggre([#route{topic = To, dest = Node} | Rest], Msg, Dedup, Acc) when is_atom(Node) ->
aggre(Rest, Msg, Dedup, [{To, Node} | Acc]);
aggre([#route{topic = To, dest = {external, _} = ExtDest} | Rest], Msg, Dedup, Acc) ->
Acc1 =
case emqx_external_broker:is_relevant_route(ExtDest, Msg) of
true -> [{To, ExtDest} | Acc];
false -> Acc
end,
aggre(Rest, Msg, Dedup, Acc1);
aggre([#route{topic = To, dest = {Group, _Node}} | Rest], Msg, _Dedup, Acc) ->
aggre(Rest, Msg, true, [{To, Group} | Acc]);
aggre([], _Msg, false, Acc) ->
Acc;
aggre([], true, Acc) ->
aggre([], _Msg, true, Acc) ->
lists:usort(Acc).

%% @doc Forward message to another node.
Expand Down Expand Up @@ -643,19 +657,27 @@ maybe_delete_route(Topic) ->

sync_route(Action, Topic, ReplyTo) ->
EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]),
case EnabledOn of
all ->
push_sync_route(Action, Topic, ReplyTo);
none ->
regular_sync_route(Action, Topic);
Role ->
case Role =:= mria_config:whoami() of
true ->
push_sync_route(Action, Topic, ReplyTo);
false ->
regular_sync_route(Action, Topic)
end
end.
Res =
case EnabledOn of
all ->
push_sync_route(Action, Topic, ReplyTo);
none ->
regular_sync_route(Action, Topic);
Role ->
case Role =:= mria_config:whoami() of
true ->
push_sync_route(Action, Topic, ReplyTo);
false ->
regular_sync_route(Action, Topic)
end
end,
_ = external_sync_route(Action, Topic),
Res.

external_sync_route(add, Topic) ->
emqx_external_broker:maybe_add_route(Topic);
external_sync_route(delete, Topic) ->
emqx_external_broker:maybe_delete_route(Topic).

push_sync_route(Action, Topic, Opts) ->
emqx_router_syncer:push(Action, Topic, node(), Opts).
Expand Down
102 changes: 102 additions & 0 deletions apps/emqx/src/emqx_external_broker.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_external_broker).

-callback forward(emqx_router:external_dest(), emqx_types:delivery()) ->
emqx_types:deliver_result().

-callback is_relevant_route(emqx_router:external_dest(), emqx_types:message()) -> boolean().

-callback maybe_add_route(emqx_types:topic()) -> ok.
-callback maybe_delete_route(emqx_types:topic()) -> ok.

-export([
provider/0,
register_provider/1,
unregister_provider/1,
forward/2,
is_relevant_route/2,
maybe_add_route/1,
maybe_delete_route/1
]).

-define(PROVIDER, {?MODULE, external_broker}).

-define(with_provider(IfRegistered, IfNotRegistered),
case persistent_term:get(?PROVIDER, undefined) of
undefined ->
IfNotRegistered;
Provider ->
Provider:IfRegistered
end
).

%% TODO: provider API copied from emqx_external_traces,
%% but it can be moved to a common module.

%%--------------------------------------------------------------------
%% Provider API
%%--------------------------------------------------------------------

-spec register_provider(module()) -> ok | {error, term()}.
register_provider(Module) when is_atom(Module) ->
case is_valid_provider(Module) of
true ->
persistent_term:put(?PROVIDER, Module);
false ->
{error, invalid_provider}
end.

-spec unregister_provider(module()) -> ok | {error, term()}.
unregister_provider(Module) ->
case persistent_term:get(?PROVIDER, undefined) of
Module ->
persistent_term:erase(?PROVIDER),
ok;
_ ->
{error, not_registered}
end.

-spec provider() -> module() | undefined.
provider() ->
persistent_term:get(?PROVIDER, undefined).

%%--------------------------------------------------------------------
%% Broker API
%%--------------------------------------------------------------------

forward(ExternalDest, Delivery) ->
?with_provider(?FUNCTION_NAME(ExternalDest, Delivery), {error, unknown_dest}).

is_relevant_route(ExternalDest, Message) ->
?with_provider(?FUNCTION_NAME(ExternalDest, Message), false).

maybe_add_route(Topic) ->
?with_provider(?FUNCTION_NAME(Topic), ok).

maybe_delete_route(Topic) ->
?with_provider(?FUNCTION_NAME(Topic), ok).

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

is_valid_provider(Module) ->
lists:all(
fun({F, A}) -> erlang:function_exported(Module, F, A) end,
?MODULE:behaviour_info(callbacks)
).
5 changes: 3 additions & 2 deletions apps/emqx/src/emqx_router.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@
deinit_schema/0
]).

-export_type([dest/0]).
-export_type([dest/0, external_dest/0]).
-export_type([schemavsn/0]).

-type group() :: binary().
-type dest() :: node() | {group(), node()}.
-type external_dest() :: {external, term()}.
-type dest() :: node() | {group(), node()} | external_dest().
-type schemavsn() :: v1 | v2.

%% Operation :: {add, ...} | {delete, ...}.
Expand Down
94 changes: 94 additions & 0 deletions apps/emqx_cluster_link/BSL.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
Business Source License 1.1

Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2023
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2028-01-26
Change License: Apache License, Version 2.0

For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact

Notice

The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.

License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.

-----------------------------------------------------------------------------

Business Source License 1.1

Terms

The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.

Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.

If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.

All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.

You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.

Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.

This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).

TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.

MariaDB hereby grants you permission to use this License’s text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.

Covenants of Licensor

In consideration of the right to use this License’s text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:

1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.

2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.

3. To specify a Change Date.

4. Not to modify this License in any other way.
10 changes: 10 additions & 0 deletions apps/emqx_cluster_link/include/emqx_cluster_link.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------

-define(TOPIC_PREFIX, "$LINK/cluster/").
-define(ROUTE_TOPIC_PREFIX, ?TOPIC_PREFIX "route/").
-define(MSG_TOPIC_PREFIX, ?TOPIC_PREFIX "msg/").

-define(ROUTE_POOL_PREFIX, "emqx_cluster_link_mqtt:route:").
-define(MSG_POOL_PREFIX, "emqx_cluster_link_mqtt:msg:").
19 changes: 19 additions & 0 deletions apps/emqx_cluster_link/src/emqx_cluster_link.app.src
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
%% -*- mode: erlang -*-
{application, emqx_cluster_link, [
{description, "EMQX Cluster Linking"},
% strict semver, bump manually!
{vsn, "0.1.0"},
{modules, []},
{registered, []},
{applications, [
kernel, stdlib, emqtt, ecpool, emqx
]},
{mod, {emqx_cluster_link_app, []}},
{env, []},
{licenses, ["Business Source License 1.1"]},
{maintainers, ["EMQX Team <contact@emqx.io>"]},
{links, [
{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx"}
]}
]}.

0 comments on commit 003a017

Please sign in to comment.