Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: function_clause when sending messages to bridges #9687

Merged
merged 2 commits into from Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 7 additions & 4 deletions apps/emqx_bridge/src/emqx_bridge.erl
Expand Up @@ -363,10 +363,13 @@ get_matched_egress_bridges(Topic) ->

get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
Acc;
get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when
?EGRESS_DIR_BRIDGES(BType)
->
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc);
get_matched_bridge_id(BType, Conf, Topic, BName, Acc) when ?EGRESS_DIR_BRIDGES(BType) ->
case maps:get(local_topic, Conf, undefined) of
undefined ->
Acc;
Filter ->
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc)
end;
get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc);
get_matched_bridge_id(kafka, #{producer := #{mqtt := #{topic := Filter}}}, Topic, BName, Acc) ->
Expand Down
49 changes: 49 additions & 0 deletions apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
Expand Up @@ -305,6 +305,55 @@ t_http_crud_apis(Config) ->
),
ok.

t_http_bridges_local_topic(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),

%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name1 = <<"t_http_bridges_with_local_topic1">>,
Name2 = <<"t_http_bridges_without_local_topic1">>,
%% create one http bridge with local_topic
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name1)
),
%% and we create another one without local_topic
{ok, 201, _} = request(
post,
uri(["bridges"]),
maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name2))
),
BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name1),
BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name2),
%% Send an message to emqx and the message should be forwarded to the HTTP server.
%% This is to verify we can have 2 bridges with and without local_topic fields
%% at the same time.
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
?assert(
receive
{http_server, received, #{
method := <<"POST">>,
path := <<"/path1">>,
body := Body
}} ->
true;
Msg ->
ct:pal("error: http got unexpected request: ~p", [Msg]),
false
after 100 ->
false
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID1]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID2]), []),
ok.

t_check_dependent_actions_on_delete(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
Expand Down
2 changes: 2 additions & 0 deletions changes/v5.0.14/fix-9687.en.md
@@ -0,0 +1,2 @@
Fix the problem that sending messages to data-bridges failed because of incorrect handling of some data-bridges without `local_topic` field configured.
Before this change, if some bridges have configured the `local_topic` field but others have not, a `function_clause` error will occur when forwarding messages to the data-bridges.
2 changes: 2 additions & 0 deletions changes/v5.0.14/fix-9687.zh.md
@@ -0,0 +1,2 @@
修复由于某些数据桥接未配置 `local_topic` 字段,导致的所有数据桥接无法发送消息。
在此改动之前,如果有些桥接设置了 `local_topic` 字段而有些没有设置,数据桥接转发消息时会出现 `function_clause` 的错误。