Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: HTTP bridge into the emqx_bridge_http application #11253

Merged
merged 8 commits into from
Jul 12, 2023
Merged
3 changes: 2 additions & 1 deletion apps/emqx_authn/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
{emqx_connector, {path, "../emqx_connector"}},
{emqx_mongodb, {path, "../emqx_mongodb"}},
{emqx_redis, {path, "../emqx_redis"}},
{emqx_mysql, {path, "../emqx_mysql"}}
{emqx_mysql, {path, "../emqx_mysql"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}.

{edoc_opts, [{preprocess, true}]}.
Expand Down
5 changes: 3 additions & 2 deletions apps/emqx_authn/src/emqx_authn.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authn, [
{description, "EMQX Authentication"},
{vsn, "0.1.22"},
{vsn, "0.1.23"},
{modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [
Expand All @@ -15,7 +15,8 @@
jose,
emqx_mongodb,
emqx_redis,
emqx_mysql
emqx_mysql,
emqx_bridge_http
]},
{mod, {emqx_authn_app, []}},
{env, []},
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ common_fields() ->
[
pool_type
],
maps:from_list(emqx_connector_http:fields(config))
maps:from_list(emqx_bridge_http_connector:fields(config))
)
).

Expand Down Expand Up @@ -185,14 +185,14 @@ create(Config0) ->
{Config, State} = parse_config(Config0),
{ok, _Data} = emqx_authn_utils:create_resource(
ResourceId,
emqx_connector_http,
emqx_bridge_http_connector,
Config
),
{ok, State#{resource_id => ResourceId}}.

update(Config0, #{resource_id := ResourceId} = _State) ->
{Config, NState} = parse_config(Config0),
case emqx_authn_utils:update_resource(emqx_connector_http, Config, ResourceId) of
case emqx_authn_utils:update_resource(emqx_bridge_http_connector, Config, ResourceId) of
{error, Reason} ->
error({load_config_error, Reason});
{ok, _} ->
Expand Down
3 changes: 2 additions & 1 deletion apps/emqx_authz/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
{emqx_connector, {path, "../emqx_connector"}},
{emqx_mongodb, {path, "../emqx_mongodb"}},
{emqx_redis, {path, "../emqx_redis"}},
{emqx_mysql, {path, "../emqx_mysql"}}
{emqx_mysql, {path, "../emqx_mysql"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}.

{shell, [
Expand Down
3 changes: 2 additions & 1 deletion apps/emqx_authz/src/emqx_authz.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
emqx_connector,
emqx_mongodb,
emqx_redis,
emqx_mysql
emqx_mysql,
emqx_bridge_http
]},
{env, []},
{modules, []},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_authz/src/emqx_authz_api_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ authz_http_common_fields() ->
[
pool_type
],
maps:from_list(emqx_connector_http:fields(config))
maps:from_list(emqx_bridge_http_connector:fields(config))
)
).

Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_authz/src/emqx_authz_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ description() ->
create(Config) ->
NConfig = parse_config(Config),
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
{ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_connector_http, NConfig),
{ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_bridge_http_connector, NConfig),
NConfig#{annotations => #{id => ResourceId}}.

update(Config) ->
NConfig = parse_config(Config),
case emqx_authz_utils:update_resource(emqx_connector_http, NConfig) of
case emqx_authz_utils:update_resource(emqx_bridge_http_connector, NConfig) of
{error, Reason} -> error({load_config_error, Reason});
{ok, Id} -> NConfig#{annotations => #{id => Id}}
end.
Expand Down
2 changes: 2 additions & 0 deletions apps/emqx_authz/src/emqx_authz_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ connector_fields(DB) ->
connector_fields(DB, config).
connector_fields(DB, Fields) when DB =:= redis; DB =:= mysql ->
connector_fields(DB, Fields, emqx);
connector_fields(DB, Fields) when DB =:= http ->
connector_fields(bridge_http_connector, Fields, emqx);
connector_fields(DB, Fields) ->
connector_fields(DB, Fields, emqx_connector).

Expand Down
8 changes: 4 additions & 4 deletions apps/emqx_bridge/src/emqx_bridge_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@
-if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
bridge_to_resource_type(webhook) -> emqx_connector_http;
bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
bridge_to_resource_type(webhook) -> emqx_bridge_http_connector;
bridge_to_resource_type(BridgeType) -> emqx_bridge_enterprise:resource_type(BridgeType).
-else.
bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector;
bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
bridge_to_resource_type(webhook) -> emqx_connector_http.
bridge_to_resource_type(<<"webhook">>) -> emqx_bridge_http_connector;
bridge_to_resource_type(webhook) -> emqx_bridge_http_connector.
-endif.

resource_id(BridgeId) when is_binary(BridgeId) ->
Expand Down
4 changes: 2 additions & 2 deletions apps/emqx_bridge/src/schema/emqx_bridge_schema.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ api_schema(Method) ->
Broker = [
{Type, ref(Mod, Method)}
|| {Type, Mod} <- [
{<<"webhook">>, emqx_bridge_webhook_schema},
{<<"webhook">>, emqx_bridge_http_schema},
{<<"mqtt">>, emqx_bridge_mqtt_schema}
]
],
Expand Down Expand Up @@ -158,7 +158,7 @@ fields(bridges) ->
[
{webhook,
mk(
hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")),
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{
desc => ?DESC("bridges_webhook"),
required => false,
Expand Down
3 changes: 2 additions & 1 deletion apps/emqx_bridge_gcp_pubsub/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
{deps, [
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
{emqx_bridge, {path, "../../apps/emqx_bridge"}},
{emqx_bridge_http, {path, "../emqx_bridge_http"}}
]}.

{xref_checks, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
kernel,
stdlib,
emqx_resource,
emqx_bridge,
emqx_bridge_http,
ehttpc
]},
{env, []},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ handle_result(
) ->
?SLOG(error, #{
msg => "gcp_pubsub_error_response",
request => emqx_connector_http:redact_request(Request),
request => emqx_bridge_http_connector:redact_request(Request),
connector => ResourceId,
status_code => StatusCode,
resp_body => RespBody
Expand All @@ -252,7 +252,7 @@ handle_result(
handle_result({error, #{status_code := StatusCode}} = Result, Request, _QueryMode, ResourceId) ->
?SLOG(error, #{
msg => "gcp_pubsub_error_response",
request => emqx_connector_http:redact_request(Request),
request => emqx_bridge_http_connector:redact_request(Request),
connector => ResourceId,
status_code => StatusCode
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ start_echo_http_server() ->
{versions, ['tlsv1.2', 'tlsv1.3']},
{ciphers, ["ECDHE-RSA-AES256-GCM-SHA384", "TLS_CHACHA20_POLY1305_SHA256"]}
] ++ certs(),
{ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
random, HTTPPath, ServerSSLOpts
),
ok = emqx_connector_web_hook_server:set_handler(success_http_handler()),
ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()),
HTTPHost = "localhost",
HostPort = HTTPHost ++ ":" ++ integer_to_list(HTTPPort),
true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort),
Expand All @@ -261,7 +261,7 @@ start_echo_http_server() ->

stop_echo_http_server() ->
os:unsetenv("PUBSUB_EMULATOR_HOST"),
ok = emqx_connector_web_hook_server:stop().
ok = emqx_bridge_http_connector_test_server:stop().

certs() ->
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
Expand Down Expand Up @@ -983,7 +983,7 @@ t_publish_econnrefused(Config) ->
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
assert_empty_metrics(ResourceId),
ok = emqx_connector_web_hook_server:stop(),
ok = emqx_bridge_http_connector_test_server:stop(),
do_econnrefused_or_timeout_test(Config, econnrefused).

t_publish_timeout(Config) ->
Expand Down Expand Up @@ -1019,7 +1019,7 @@ t_publish_timeout(Config) ->
),
{ok, Rep, State}
end,
ok = emqx_connector_web_hook_server:set_handler(TimeoutHandler),
ok = emqx_bridge_http_connector_test_server:set_handler(TimeoutHandler),
do_econnrefused_or_timeout_test(Config, timeout).

do_econnrefused_or_timeout_test(Config, Error) ->
Expand Down Expand Up @@ -1149,7 +1149,7 @@ t_success_no_body(Config) ->
),
{ok, Rep, State}
end,
ok = emqx_connector_web_hook_server:set_handler(SuccessNoBodyHandler),
ok = emqx_bridge_http_connector_test_server:set_handler(SuccessNoBodyHandler),
Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
Expand Down Expand Up @@ -1187,7 +1187,7 @@ t_failure_with_body(Config) ->
),
{ok, Rep, State}
end,
ok = emqx_connector_web_hook_server:set_handler(FailureWithBodyHandler),
ok = emqx_bridge_http_connector_test_server:set_handler(FailureWithBodyHandler),
Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
Expand Down Expand Up @@ -1225,7 +1225,7 @@ t_failure_no_body(Config) ->
),
{ok, Rep, State}
end,
ok = emqx_connector_web_hook_server:set_handler(FailureNoBodyHandler),
ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
Expand Down Expand Up @@ -1271,7 +1271,7 @@ t_unrecoverable_error(Config) ->
),
{ok, Rep, State}
end,
ok = emqx_connector_web_hook_server:set_handler(FailureNoBodyHandler),
ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler),
Topic = <<"t/topic">>,
{ok, _} = create_bridge(Config),
assert_empty_metrics(ResourceId),
Expand Down
36 changes: 36 additions & 0 deletions apps/emqx_bridge_http/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# EMQX HTTP Broker Bridge

This application enables EMQX to connect to any HTTP API, conforming to the
HTTP standard. The connection is established via the [HTTP][1] bridge abstraction,
which facilitates the unidirectional flow of data from EMQX to the HTTP API
(egress).

Users can define a rule and efficiently transfer data to a remote HTTP API
utilizing [EMQX Rules][2].

# Documentation

- For instructions on how to use the EMQX dashboard to set up an egress bridge,
refer to [Bridge Data into HTTP API][3].

- To understand the EMQX rules engine, please refer to [EMQX Rules][2].

# HTTP APIs

We provide a range of APIs for bridge management. For more detailed
information, refer to [API Docs -Bridges][4].

# Contributing

For those interested in contributing, please consult our
[contributing guide](../../CONTRIBUTING.md).

# License

This software is under the Apache License 2.0. For more details, see
[LICENSE](../../APL.txt).

[1]: https://tools.ietf.org/html/rfc2616
[2]: https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html
[3]: https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-webhook.html
[4]: https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges
10 changes: 10 additions & 0 deletions apps/emqx_bridge_http/rebar.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
]}.
thalesmg marked this conversation as resolved.
Show resolved Hide resolved

{shell, [
{apps, [emqx_bridge_http]}
]}.
9 changes: 9 additions & 0 deletions apps/emqx_bridge_http/src/emqx_bridge_http.app.src
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{application, emqx_bridge_http, [
{description, "EMQX HTTP Bridge and Connector Application"},
{vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, emqx_connector, emqx_resource, emqx_bridge, ehttpc]},
{env, []},
{modules, []},
{links, []}
]}.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_connector_http).
-module(emqx_bridge_http_connector).

-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_webhook_schema).
-module(emqx_bridge_http_schema).

-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
Expand Down Expand Up @@ -68,7 +68,7 @@ basic_config() ->
)}
] ++ webhook_creation_opts() ++
proplists:delete(
max_retries, emqx_connector_http:fields(config)
max_retries, emqx_bridge_http_connector:fields(config)
).

request_config() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emqx_bridge_webhook_SUITE).
-module(emqx_bridge_http_SUITE).

%% This suite should contains testcases that are specific for the webhook
%% bridge. There are also some test cases that implicitly tests the webhook
Expand Down Expand Up @@ -64,18 +64,18 @@ init_per_testcase(t_send_async_connection_timeout, Config) ->
init_per_testcase(t_path_not_found, Config) ->
HTTPPath = <<"/nonexisting/path">>,
ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts
),
ok = emqx_connector_web_hook_server:set_handler(not_found_http_handler()),
ok = emqx_bridge_http_connector_test_server:set_handler(not_found_http_handler()),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(t_too_many_requests, Config) ->
HTTPPath = <<"/path">>,
ServerSSLOpts = false,
{ok, {HTTPPort, _Pid}} = emqx_connector_web_hook_server:start_link(
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
_Port = random, HTTPPath, ServerSSLOpts
),
ok = emqx_connector_web_hook_server:set_handler(too_many_requests_http_handler()),
ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()),
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
init_per_testcase(_TestCase, Config) ->
Server = start_http_server(#{response_delay_ms => 0}),
Expand All @@ -85,7 +85,7 @@ end_per_testcase(TestCase, _Config) when
TestCase =:= t_path_not_found;
TestCase =:= t_too_many_requests
->
ok = emqx_connector_web_hook_server:stop(),
ok = emqx_bridge_http_connector_test_server:stop(),
persistent_term:erase({?MODULE, times_called}),
emqx_bridge_testlib:delete_all_bridges(),
emqx_common_test_helpers:call_janitor(),
Expand Down Expand Up @@ -552,7 +552,7 @@ do_t_async_retries(TestContext, Error, Fn) ->
Attempts + 1
end,
emqx_common_test_helpers:with_mock(
emqx_connector_http,
emqx_bridge_http_connector,
reply_delegator,
fun(Context, ReplyFunAndArgs, Result) ->
Attempts = GetAndBump(),
Expand Down