Skip to content

Commit

Permalink
Merge pull request #9585 from sstrigler/EMQX-8571-bring-back-connecto…
Browse files Browse the repository at this point in the history
…rs-test-as-bridge-test

feat(emqx_bridge): add /bridges_probe API endpoint
  • Loading branch information
sstrigler committed Dec 22, 2022
2 parents 71d40c0 + 6131ee5 commit 32cb181
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 7 deletions.
6 changes: 3 additions & 3 deletions apps/emqx/src/emqx_map_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ deep_convert(Val, _, _Args) ->

-spec unsafe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}.
unsafe_atom_key_map(Map) ->
covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
convert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).

-spec binary_key_map(map()) -> map().
binary_key_map(Map) ->
Expand All @@ -167,7 +167,7 @@ binary_key_map(Map) ->

-spec safe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}.
safe_atom_key_map(Map) ->
covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end).
convert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end).

-spec jsonable_map(map() | list()) -> map() | list().
jsonable_map(Map) ->
Expand Down Expand Up @@ -221,7 +221,7 @@ binary_string(Val) ->
Val.

%%---------------------------------------------------------------------------
covert_keys_to_atom(BinKeyMap, Conv) ->
convert_keys_to_atom(BinKeyMap, Conv) ->
deep_convert(
BinKeyMap,
fun
Expand Down
16 changes: 16 additions & 0 deletions apps/emqx_bridge/i18n/emqx_bridge_api.conf
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,20 @@ NOTE:不允许在单节点上启用/禁用 Bridge"""
}
}

desc_api9 {
desc {
en: """
Test creating a new bridge by given ID </br>
The ID must be of format '{type}:{name}'
"""
zh: """
通过给定的 ID 测试创建一个新的桥接。 </br>
ID 的格式必须为 ’{type}:{name}”
"""
}
label: {
en: "Test Bridge Creation"
zh: "测试桥接创建"
}
}
}
77 changes: 75 additions & 2 deletions apps/emqx_bridge/src/emqx_bridge_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
'/bridges/:id'/2,
'/bridges/:id/operation/:operation'/2,
'/nodes/:node/bridges/:id/operation/:operation'/2,
'/bridges/:id/reset_metrics'/2
'/bridges/:id/reset_metrics'/2,
'/bridges_probe'/2
]).

-export([lookup_from_local_node/2]).
Expand Down Expand Up @@ -68,7 +69,8 @@ paths() ->
"/bridges/:id",
"/bridges/:id/operation/:operation",
"/nodes/:node/bridges/:id/operation/:operation",
"/bridges/:id/reset_metrics"
"/bridges/:id/reset_metrics",
"/bridges_probe"
].

error_schema(Code, Message) when is_atom(Code) ->
Expand Down Expand Up @@ -386,6 +388,23 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
}
}
};
schema("/bridges_probe") ->
#{
'operationId' => '/bridges_probe',
post => #{
tags => [<<"bridges">>],
desc => ?DESC("desc_api9"),
summary => <<"Test creating bridge">>,
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_bridge_schema:post_request(),
bridge_info_examples(post)
),
responses => #{
204 => <<"Test bridge OK">>,
400 => error_schema(['TEST_FAILED'], "bridge test failed")
}
}
}.

'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
Expand Down Expand Up @@ -462,6 +481,60 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
end
).

'/bridges_probe'(post, Request) ->
RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"},
case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
{ok, #{body := #{<<"type">> := ConnType} = Params}} ->
case do_probe(ConnType, maps:remove(<<"type">>, Params)) of
ok ->
{204};
{error, Error} ->
{400, error_msg('TEST_FAILED', Error)}
end;
BadRequest ->
BadRequest
end.

do_probe(ConnType, Params) ->
case test_connection(host_and_port(ConnType, Params)) of
ok ->
emqx_bridge_resource:create_dry_run(ConnType, Params);
Error ->
Error
end.

host_and_port(mqtt, #{<<"server">> := Server}) ->
Server;
host_and_port(webhook, #{<<"url">> := Url}) ->
{BaseUrl, _Path} = parse_url(Url),
{ok, #{host := Host, port := Port}} = emqx_http_lib:uri_parse(BaseUrl),
{Host, Port};
host_and_port(_Unknown, _) ->
undefined.

%% [TODO] remove in EMQX-8588 when resource manager handles things more elegantly
test_connection(undefined) ->
%% be friendly, it might fail later on with a 'timeout' error.
ok;
test_connection({Host, Port}) ->
case gen_tcp:connect(Host, Port, [], 5000) of
{ok, TestSocket} -> gen_tcp:close(TestSocket);
Error -> Error
end.

parse_url(Url) ->
case string:split(Url, "//", leading) of
[Scheme, UrlRem] ->
case string:split(UrlRem, "/", leading) of
[HostPort, Path] ->
{iolist_to_binary([Scheme, "//", HostPort]), Path};
[HostPort] ->
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
end;
[Url] ->
error({invalid_url, Url})
end.

lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
Nodes = mria_mnesia:running_nodes(),
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
Expand Down
6 changes: 4 additions & 2 deletions apps/emqx_bridge/src/emqx_bridge_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,16 @@ recreate(Type, Name, Conf, Opts) ->
Opts
).

create_dry_run(Type, Conf) ->
create_dry_run(Type, Conf0) ->
TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
Conf = emqx_map_lib:safe_atom_key_map(Conf0),
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
ParseConf = parse_confs(bin(Type), TmpPath, ConfNew),
Res = emqx_resource:create_dry_run_local(
bridge_to_resource_type(Type), ConfNew
bridge_to_resource_type(Type), ParseConf
),
_ = maybe_clear_certs(TmpPath, ConfNew),
Res
Expand Down
46 changes: 46 additions & 0 deletions apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,52 @@ t_reset_bridges(Config) ->
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).

-define(MQTT_BRIDGE(Server), #{
<<"server">> => Server,
<<"username">> => <<"user1">>,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v5">>,
<<"ssl">> => #{<<"enable">> => false},
<<"type">> => <<"mqtt">>,
<<"name">> => <<"mqtt_egress_test_bridge">>
}).

t_bridges_probe(Config) ->
Port = ?config(port, Config),
URL = ?URL(Port, "some_path"),

{ok, 204, <<>>} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
),

%% second time with same name is ok since no real bridge created
{ok, 204, <<>>} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
),

{ok, 400, _} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME)
),

{ok, 204, _} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"127.0.0.1:1883">>)
),

{ok, 400, _} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"127.0.0.1:2883">>)
),
ok.

request(Method, Url, Body) ->
request(<<"bridge_admin">>, Method, Url, Body).

Expand Down
2 changes: 2 additions & 0 deletions changes/v5.0.13-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

- Return `204` instead of `200` for `PUT /authenticator/:id` [#9434](https://github.com/emqx/emqx/pull/9434/).

- `/bridges_probe` API endpoint to test params for creating a new data bridge [#9585](https://github.com/emqx/emqx/pull/9585).

## Bug fixes

- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487).
2 changes: 2 additions & 0 deletions changes/v5.0.13-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

- 现在,`PUT /authenticator/:id` 将会返回 204 而不再是 200 [#9434](https://github.com/emqx/emqx/pull/9434/)

- [FIXME] `/bridges_probe` API 端点用于测试创建新数据桥的参数 [#9585](https://github.com/emqx/emqx/pull/9585)

## 修复

- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)

0 comments on commit 32cb181

Please sign in to comment.