Skip to content

Commit

Permalink
fix(emqx_connector): report errors in on_start handler
Browse files Browse the repository at this point in the history
  • Loading branch information
sstrigler committed Jun 13, 2023
1 parent 8ba5a54 commit b2a5065
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 28 deletions.
25 changes: 23 additions & 2 deletions apps/emqx_bridge/test/emqx_bridge_testlib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,13 @@ create_bridge_api(Config) ->

create_bridge_api(Config, Overrides) ->
BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config),
BridgeName = ?config(bridge_name, Config),
BridgeConfig0 = ?config(bridge_config, Config),
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name},
create_bridge_api(BridgeType, BridgeName, BridgeConfig).

create_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
Expand Down Expand Up @@ -164,6 +167,24 @@ update_bridge_api(Config, Overrides) ->
ct:pal("bridge update result: ~p", [Res]),
Res.

op_bridge_api(Op, BridgeType, BridgeName) ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
{ok, {Status, Headers, Body}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
{error, {Status, Headers, Body}} ->
{error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
Error ->
Error
end,
ct:pal("bridge op result: ~p", [Res]),
Res.

probe_bridge_api(Config) ->
probe_bridge_api(Config, _Overrides = #{}).

Expand Down
78 changes: 56 additions & 22 deletions apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

-define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_NAME, atom_to_binary(?MODULE)).

all() ->
emqx_common_test_helpers:all(?MODULE).

Expand All @@ -36,15 +39,13 @@ groups() ->

init_per_suite(_Config) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
[].

end_per_suite(_Config) ->
ok = emqx_config:put([bridges], #{}),
ok = emqx_config:put_raw([bridges], #{}),
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]),
ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_bridge),
Expand All @@ -53,10 +54,22 @@ end_per_suite(_Config) ->
suite() ->
[{timetrap, {seconds, 60}}].

init_per_testcase(t_bad_bridge_config, Config) ->
Config;
init_per_testcase(t_send_async_connection_timeout, Config) ->
ResponseDelayMS = 500,
Server = start_http_server(#{response_delay_ms => ResponseDelayMS}),
[{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config];
init_per_testcase(_TestCase, Config) ->
Config.
Server = start_http_server(#{response_delay_ms => 0}),
[{http_server, Server} | Config].

end_per_testcase(_TestCase, _Config) ->
end_per_testcase(_TestCase, Config) ->
case ?config(http_server, Config) of
undefined -> ok;
Server -> stop_http_server(Server)
end,
emqx_bridge_testlib:delete_all_bridges(),
emqx_common_test_helpers:call_janitor(),
ok.

Expand All @@ -65,13 +78,14 @@ end_per_testcase(_TestCase, _Config) ->
%% (Orginally copied from emqx_bridge_api_SUITE)
%%------------------------------------------------------------------------------
start_http_server(HTTPServerConfig) ->
ct:pal("Start server\n"),
process_flag(trap_exit, true),
Parent = self(),
ct:pal("Starting server for ~p", [Parent]),
{ok, {Port, Sock}} = listen_on_random_port(),
Acceptor = spawn(fun() ->
accept_loop(Sock, Parent, HTTPServerConfig)
end),
ct:pal("Started server on port ~p", [Port]),
timer:sleep(100),
#{port => Port, sock => Sock, acceptor => Acceptor}.

Expand Down Expand Up @@ -160,8 +174,8 @@ parse_http_request_assertive(ReqStr0) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

bridge_async_config(#{port := Port} = Config) ->
Type = maps:get(type, Config, <<"webhook">>),
Name = maps:get(name, Config, atom_to_binary(?MODULE)),
Type = maps:get(type, Config, ?BRIDGE_TYPE),
Name = maps:get(name, Config, ?BRIDGE_NAME),
PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
Expand Down Expand Up @@ -217,8 +231,8 @@ parse_and_check(ConfigString, BridgeType, Name) ->
RetConfig.

make_bridge(Config) ->
Type = <<"webhook">>,
Name = atom_to_binary(?MODULE),
Type = ?BRIDGE_TYPE,
Name = ?BRIDGE_NAME,
BridgeConfig = bridge_async_config(Config#{
name => Name,
type => Type
Expand All @@ -236,16 +250,15 @@ make_bridge(Config) ->

%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed.
%% When the connection time out all the queued requests where dropped in
t_send_async_connection_timeout(_Config) ->
ResponseDelayMS = 90,
#{port := Port} = Server = start_http_server(#{response_delay_ms => 900}),
% Port = 9000,
t_send_async_connection_timeout(Config) ->
ResponseDelayMS = ?config(response_delay_ms, Config),
#{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
connect_timeout => 10_000,
request_timeout => ResponseDelayMS * 2,
resource_request_ttl => "infinity"
}),
NumberOfMessagesToSend = 10,
Expand All @@ -257,11 +270,10 @@ t_send_async_connection_timeout(_Config) ->
ct:pal("Sent messages\n"),
MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void),
receive_request_notifications(MessageIDs, ResponseDelayMS),
stop_http_server(Server),
ok.

t_async_free_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
t_async_free_retries(Config) ->
#{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
Expand All @@ -285,8 +297,8 @@ t_async_free_retries(_Config) ->
do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
ok.

t_async_common_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
t_async_common_retries(Config) ->
#{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
Expand Down Expand Up @@ -323,6 +335,28 @@ t_async_common_retries(_Config) ->
do_t_async_retries(Context, {error, something_else}, FnFail),
ok.

t_bad_bridge_config(_Config) ->
BridgeConfig = bridge_async_config(#{port => 12345}),
?assertMatch(
{ok,
{{_, 201, _}, _Headers, #{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Connection refused">>
}}},
emqx_bridge_testlib:create_bridge_api(
?BRIDGE_TYPE,
?BRIDGE_NAME,
BridgeConfig
)
),
%% try `/start` bridge
?assertMatch(
{error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}},
emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
ok.

%% helpers
do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),
Expand Down
29 changes: 25 additions & 4 deletions apps/emqx_connector/src/emqx_connector_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,31 @@ on_start(
base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined))
},
case ehttpc_sup:start_pool(InstId, PoolOpts) of
{ok, _} -> {ok, State};
{error, {already_started, _}} -> {ok, State};
{error, Reason} -> {error, Reason}
case start_pool(InstId, PoolOpts) of
ok ->
case do_get_status(InstId, ConnectTimeout) of
ok ->
{ok, State};
Error ->
ok = ehttpc_sup:stop_pool(InstId),
Error
end;
Error ->
Error
end.

start_pool(PoolName, PoolOpts) ->
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
{ok, _} ->
ok;
{error, {already_started, _}} ->
?SLOG(warning, #{
msg => "emqx_connector_on_start_already_started",
pool_name => PoolName
}),
ok;
Error ->
Error
end.

on_stop(InstId, _State) ->
Expand Down
2 changes: 2 additions & 0 deletions apps/emqx_connector/test/emqx_connector_http_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ wrap_auth_headers_test_() ->
fun() ->
meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}),
meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end),
meck:expect(ehttpc, workers, 1, [{self, self()}]),
meck:expect(ehttpc, health_check, 2, ok),
meck:expect(ehttpc_pool, pick_worker, 1, self()),
meck:expect(emqx_resource, allocate_resource, 3, ok),
[ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource]
Expand Down

0 comments on commit b2a5065

Please sign in to comment.