Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mqtt-bridge): disallow QoS 2 on ingress bridges #9952

Merged
merged 3 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 37 additions & 9 deletions apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl
Expand Up @@ -52,7 +52,7 @@
-define(INGRESS_CONF, #{
<<"remote">> => #{
<<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
<<"qos">> => 2
<<"qos">> => 1
},
<<"local">> => #{
<<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
Expand All @@ -77,7 +77,7 @@
-define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{
<<"remote">> => #{
<<"topic">> => <<?INGRESS_REMOTE_TOPIC, "/#">>,
<<"qos">> => 2
<<"qos">> => 1
},
<<"local">> => #{
<<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
Expand Down Expand Up @@ -242,26 +242,54 @@ t_mqtt_conn_bridge_ingress(_) ->

ok.

t_mqtt_conn_bridge_ignores_clean_start(_) ->
t_mqtt_egress_bridge_ignores_clean_start(_) ->
BridgeName = atom_to_binary(?FUNCTION_NAME),
BridgeID = create_bridge(
?SERVER_CONF(<<"user1">>)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => BridgeName,
<<"ingress">> => ?INGRESS_CONF,
<<"egress">> => ?EGRESS_CONF,
<<"clean_start">> => false
}
),

{ok, 200, BridgeJSON} = request(get, uri(["bridges", BridgeID]), []),
Bridge = jsx:decode(BridgeJSON),
{ok, _, #{state := #{name := WorkerName}}} =
emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)),
?assertMatch(
#{clean_start := true},
maps:from_list(emqx_connector_mqtt_worker:info(WorkerName))
),

%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),

ok.

t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) ->
BridgeName = atom_to_binary(?FUNCTION_NAME),
BridgeID = create_bridge(
?SERVER_CONF(<<"user1">>)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => BridgeName,
<<"ingress">> => emqx_map_lib:deep_merge(
?INGRESS_CONF,
#{<<"remote">> => #{<<"qos">> => 2}}
)
}
),

RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
Payload = <<"whatqos">>,
emqx:subscribe(LocalTopic),
emqx:publish(emqx_message:make(undefined, _QoS = 2, RemoteTopic, Payload)),

%% verify that there's no `clean_start` in response
?assertEqual(#{}, maps:with([<<"clean_start">>], Bridge)),
%% we should receive a message on the local broker, with specified topic
Msg = assert_mqtt_msg_received(LocalTopic, Payload),
?assertMatch(#message{qos = 1}, Msg),

%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),

ok.

Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf
Expand Up @@ -159,8 +159,8 @@ broker MUST support this feature."""

clean_start {
desc {
en: "The clean-start or the clean-session of the MQTT protocol"
zh: "MQTT 清除会话"
en: "Whether or not to start a clean session when reconnecting a remote broker for ingress bridge"
zh: "与 ingress MQTT 桥的远程服务器重连时是否清除老的 MQTT 会话。"
}
label: {
en: "Clean Session"
Expand Down
6 changes: 2 additions & 4 deletions apps/emqx_connector/src/emqx_connector_mqtt.erl
Expand Up @@ -251,6 +251,7 @@ basic_config(
server := Server,
proto_ver := ProtoVer,
bridge_mode := BridgeMode,
clean_start := CleanStart,
keepalive := KeepAlive,
retry_interval := RetryIntv,
max_inflight := MaxInflight,
Expand All @@ -270,11 +271,8 @@ basic_config(
%% non-standard mqtt connection packets will be filtered out by LB.
%% So let's disable bridge_mode.
bridge_mode => BridgeMode,
%% NOTE
%% We are ignoring the user configuration here because there's currently no reliable way
%% to ensure proper session recovery according to the MQTT spec.
clean_start => true,
keepalive => ms_to_s(KeepAlive),
clean_start => CleanStart,
retry_interval => RetryIntv,
max_inflight => MaxInflight,
ssl => EnableSsl,
Expand Down
10 changes: 4 additions & 6 deletions apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
Expand Up @@ -18,6 +18,7 @@

-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").

-behaviour(hocon_schema).

Expand Down Expand Up @@ -111,9 +112,7 @@ fields("server_configs") ->
boolean(),
#{
default => true,
desc => ?DESC("clean_start"),
hidden => true,
deprecated => {since, "v5.0.16"}
desc => ?DESC("clean_start")
}
)},
{keepalive, mk_duration("MQTT Keepalive.", #{default => "300s"})},
Expand Down Expand Up @@ -143,8 +142,7 @@ fields("ingress") ->
mk(
ref(?MODULE, "ingress_local"),
#{
desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local"),
is_required => false
desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")
}
)}
];
Expand All @@ -161,7 +159,7 @@ fields("ingress_remote") ->
)},
{qos,
mk(
qos(),
emqx_schema:qos(),
#{
default => 1,
desc => ?DESC("ingress_remote_qos")
Expand Down
60 changes: 51 additions & 9 deletions apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
Expand Up @@ -75,6 +75,7 @@
connect/1,
status/1,
ping/1,
info/1,
send_to_remote/2,
send_to_remote_async/3
]).
Expand Down Expand Up @@ -114,7 +115,7 @@ start_link(Name, BridgeOpts) ->
name => Name,
options => BridgeOpts
}),
Conf = init_config(BridgeOpts),
Conf = init_config(Name, BridgeOpts),
Options = mk_client_options(Conf, BridgeOpts),
case emqtt:start_link(Options) of
{ok, Pid} ->
Expand All @@ -129,13 +130,13 @@ start_link(Name, BridgeOpts) ->
Error
end.

init_config(Opts) ->
init_config(Name, Opts) ->
Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
Subscriptions = maps:get(subscriptions, Opts, undefined),
Forwards = maps:get(forwards, Opts, undefined),
#{
mountpoint => format_mountpoint(Mountpoint),
subscriptions => pre_process_subscriptions(Subscriptions),
subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts),
forwards => pre_process_forwards(Forwards)
}.

Expand All @@ -145,6 +146,16 @@ mk_client_options(Conf, BridgeOpts) ->
Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined),
Subscriptions = maps:get(subscriptions, Conf),
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
CleanStart =
case Subscriptions of
#{remote := _} ->
maps:get(clean_start, BridgeOpts);
undefined ->
%% NOTE
%% We are ignoring the user configuration here because there's currently no reliable way
%% to ensure proper session recovery according to the MQTT spec.
true
end,
Opts = maps:without(
[
address,
Expand All @@ -160,6 +171,7 @@ mk_client_options(Conf, BridgeOpts) ->
Opts#{
msg_handler => mk_client_event_handler(Vars, #{server => Server}),
hosts => [HostPort],
clean_start => CleanStart,
force_ping => true,
proto_ver => maps:get(proto_ver, BridgeOpts, v4)
}.
Expand Down Expand Up @@ -205,10 +217,12 @@ subscribe_remote_topics(_Ref, undefined) ->
stop(Ref) ->
emqtt:stop(ref(Ref)).

info(Ref) ->
emqtt:info(ref(Ref)).

status(Ref) ->
try
Info = emqtt:info(ref(Ref)),
case proplists:get_value(socket, Info) of
case proplists:get_value(socket, info(Ref)) of
Socket when Socket /= undefined ->
connected;
undefined ->
Expand Down Expand Up @@ -282,11 +296,18 @@ format_mountpoint(undefined) ->
format_mountpoint(Prefix) ->
binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).

pre_process_subscriptions(undefined) ->
pre_process_subscriptions(undefined, _, _) ->
undefined;
pre_process_subscriptions(#{local := LC} = Conf) when is_map(Conf) ->
Conf#{local => pre_process_in_out_common(LC)};
pre_process_subscriptions(Conf) when is_map(Conf) ->
pre_process_subscriptions(
#{remote := RC, local := LC} = Conf,
BridgeName,
BridgeOpts
) when is_map(Conf) ->
Conf#{
remote => pre_process_in_remote(RC, BridgeName, BridgeOpts),
local => pre_process_in_out_common(LC)
};
pre_process_subscriptions(Conf, _, _) when is_map(Conf) ->
%% have no 'local' field in the config
undefined.

Expand Down Expand Up @@ -314,6 +335,27 @@ pre_process_conf(Key, Conf) ->
Conf#{Key => Val}
end.

pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) ->
QoS = downgrade_ingress_qos(QoSIn),
case QoS of
QoSIn ->
ok;
_ ->
?SLOG(warning, #{
msg => "downgraded_unsupported_ingress_qos",
qos_configured => QoSIn,
qos_used => QoS,
name => BridgeName,
options => BridgeOpts
})
end,
Conf#{qos => QoS}.

downgrade_ingress_qos(2) ->
1;
downgrade_ingress_qos(QoS) ->
QoS.

get_pid(Name) ->
case gproc:where(?NAME(Name)) of
Pid when is_pid(Pid) ->
Expand Down
2 changes: 2 additions & 0 deletions changes/v5.0.17/fix-9952.en.md
@@ -0,0 +1,2 @@
Disallow subscribing with QoS 2 for ingress MQTT bridges.
Allow user to configure `clean_start` option for ingress MQTT bridges however.
2 changes: 2 additions & 0 deletions changes/v5.0.17/fix-9952.zh.md
@@ -0,0 +1,2 @@
不允许对 ingress MQTT 网桥的 QoS 2 进行订阅。
但允许用户为 ingress MQTT 桥配置 "clean_start" 选项。