New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: cluster linking WIP #12879
base: master
Are you sure you want to change the base?
feat: cluster linking WIP #12879
Conversation
NOTE: still not even a draft quality, but it's constantly updated.. |
003a017
to
2b77999
Compare
2b77999
to
23b44f7
Compare
%% we don't expect the message to be forwarded from an older EMQX release, | ||
%% that doesn't set extra = #{} by default. | ||
with_sender_name(#message{extra = Extra} = Msg, ClusterName) when is_map(Extra) -> | ||
Msg#message{extra = Extra#{sender_link => ClusterName}}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Msg#message{extra = Extra#{sender_link => ClusterName}}. | |
Msg#message{extra = Extra#{link_origin => ClusterName}}. |
apps/emqx/src/emqx_broker.erl
Outdated
@@ -246,7 +246,7 @@ publish(Msg) when is_record(Msg, message) -> | |||
[]; | |||
Msg1 = #message{topic = Topic} -> | |||
PersistRes = persist_publish(Msg1), | |||
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes) | |||
route(aggre(emqx_router:match_routes(Topic), Msg1), delivery(Msg1), PersistRes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe return two lists, one for regular routes, another for external routes.
then drop external routes if Msg1 is originated from a link delivery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed.
_ = | ||
case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of | ||
#message{} = ForwardedMsg -> | ||
emqx_broker:publish(with_sender_name(ForwardedMsg, ClusterName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should return {stop, DecodedMessage}
from here ? otherwise the original message will be received by other hooks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks. This also avoids extraemqx_broker:publish/1
call. 🙌
ok. | ||
|
||
put_hook() -> | ||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need to add this hook with priority HP_SYS_MSGS
(higher than message validation), to stop other hooks from intercepting ?LINK messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
%% Not implemented yet | ||
ok | ||
end, | ||
ok; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return {stop, []}
here to terminate the hook chain.
NOTE: []
does not seem to be acceptable by the current hook caller, so we'll need to add a new case clause to support []
and [Message | Batch]
.
(batching is not supported for now, but we could use this interface for future extension. For now, []
is to terminate publish, and #message{...}
when forward is accepted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
6bb70c1
to
8154ab4
Compare
8154ab4
to
df2c352
Compare
1aa1271
to
df2c352
Compare
Release version: v/e5.7.0
Summary
PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update