Skip to content

Commit

Permalink
feat(mongodb): add configurable option to override legacy protocol usage
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Aug 14, 2023
1 parent 65aee88 commit d93e1bb
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 9 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_authn/src/emqx_authn.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authn, [
{description, "EMQX Authentication"},
{vsn, "0.1.24"},
{vsn, "0.1.25"},
{modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [
Expand Down
58 changes: 54 additions & 4 deletions apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ group_tests() ->
t_payload_template,
t_collection_template,
t_mongo_date_rule_engine_functions,
t_get_status_server_selection_too_short
t_get_status_server_selection_too_short,
t_use_legacy_protocol_option
].

groups() ->
Expand Down Expand Up @@ -180,6 +181,7 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
" replica_set_name = rs0\n"
" servers = [~p]\n"
" w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
Expand All @@ -205,6 +207,7 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
" collection = mycol\n"
" servers = [~p]\n"
" w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
Expand All @@ -230,6 +233,7 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
" collection = mycol\n"
" server = ~p\n"
" w_mode = safe\n"
" use_legacy_protocol = auto\n"
" database = mqtt\n"
" resource_opts = {\n"
" query_mode = ~s\n"
Expand Down Expand Up @@ -286,10 +290,8 @@ clear_db(Config) ->
mongo_api:disconnect(Client).

find_all(Config) ->
Type = mongo_type_bin(?config(mongo_type, Config)),
Name = ?config(mongo_name, Config),
#{<<"collection">> := Collection} = ?config(mongo_config, Config),
ResourceID = emqx_bridge_resource:resource_id(Type, Name),
ResourceID = resource_id(Config),
emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}).

find_all_wait_until_non_empty(Config) ->
Expand Down Expand Up @@ -340,6 +342,27 @@ probe_bridge_api(Config, Overrides) ->
ct:pal("bridge probe result: ~p", [Res]),
Res.

resource_id(Config) ->
Type0 = ?config(mongo_type, Config),
Name = ?config(mongo_name, Config),
Type = mongo_type_bin(Type0),
emqx_bridge_resource:resource_id(Type, Name).

get_worker_pids(Config) ->
ResourceID = resource_id(Config),
%% abusing health check api a bit...
GetWorkerPid = fun(TopologyPid) ->
mongoc:transaction_query(TopologyPid, fun(#{pool := WorkerPid}) -> WorkerPid end)
end,
{ok, WorkerPids = [_ | _]} =
emqx_resource_pool:health_check_workers(
ResourceID,
GetWorkerPid,
5_000,
#{return_values => true}
),
WorkerPids.

%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -494,3 +517,30 @@ t_get_status_server_selection_too_short(Config) ->
emqx_utils_json:decode(Body)
),
ok.

t_use_legacy_protocol_option(Config) ->
ResourceID = resource_id(Config),
{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"true">>}),
?retry(
_Interval0 = 200,
_NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
WorkerPids0 = get_worker_pids(Config),
Expected0 = maps:from_keys(WorkerPids0, true),
LegacyOptions0 = maps:from_list([{Pid, mc_utils:use_legacy_protocol(Pid)} || Pid <- WorkerPids0]),
?assertEqual(Expected0, LegacyOptions0),
{ok, _} = delete_bridge(Config),

{ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"false">>}),
?retry(
_Interval0 = 200,
_NAttempts0 = 20,
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceID))
),
WorkerPids1 = get_worker_pids(Config),
Expected1 = maps:from_keys(WorkerPids1, false),
LegacyOptions1 = maps:from_list([{Pid, mc_utils:use_legacy_protocol(Pid)} || Pid <- WorkerPids1]),
?assertEqual(Expected1, LegacyOptions1),

ok.
2 changes: 1 addition & 1 deletion apps/emqx_machine/src/emqx_machine.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{id, "emqx_machine"},
{description, "The EMQX Machine"},
% strict semver, bump manually!
{vsn, "0.2.11"},
{vsn, "0.2.12"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, emqx_ctl]},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_mongodb/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
{erl_opts, [debug_info]}.
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
, {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.20"}}}
, {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.21"}}}
]}.
2 changes: 1 addition & 1 deletion apps/emqx_mongodb/src/emqx_mongodb.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, emqx_mongodb, [
{description, "EMQX MongoDB Connector"},
{vsn, "0.1.1"},
{vsn, "0.1.2"},
{registered, []},
{applications, [
kernel,
Expand Down
7 changes: 7 additions & 0 deletions apps/emqx_mongodb/src/emqx_mongodb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ mongo_fields() ->
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun emqx_connector_schema_lib:username/1},
{password, fun emqx_connector_schema_lib:password/1},
{use_legacy_protocol,
hoconsc:mk(hoconsc:enum([auto, true, false]), #{
default => auto,
desc => ?DESC("use_legacy_protocol")
})},
{auth_source, #{
type => binary(),
required => false,
Expand Down Expand Up @@ -429,6 +434,8 @@ init_worker_options([{w_mode, V} | R], Acc) ->
init_worker_options(R, [{w_mode, V} | Acc]);
init_worker_options([{r_mode, V} | R], Acc) ->
init_worker_options(R, [{r_mode, V} | Acc]);
init_worker_options([{use_legacy_protocol, V} | R], Acc) ->
init_worker_options(R, [{use_legacy_protocol, V} | Acc]);
init_worker_options([_ | R], Acc) ->
init_worker_options(R, Acc);
init_worker_options([], Acc) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_retainer/src/emqx_retainer.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{application, emqx_retainer, [
{description, "EMQX Retainer"},
% strict semver, bump manually!
{vsn, "5.0.16"},
{vsn, "5.0.17"},
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]},
Expand Down
1 change: 1 addition & 0 deletions changes/ce/feat-11429.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added option to configure detection of legacy protocol in MondoDB connectors and bridges.
6 changes: 6 additions & 0 deletions rel/i18n/emqx_mongodb.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,10 @@ wait_queue_timeout.desc:
wait_queue_timeout.label:
"""Wait Queue Timeout"""

use_legacy_protocol.desc:
"""Whether to use MongoDB's legacy protocol for communicating with the database. The default is to attempt to automatically determine if the newer protocol is supported."""

use_legacy_protocol.label:
"""Use legacy protocol"""

}

0 comments on commit d93e1bb

Please sign in to comment.