Skip to content

Commit

Permalink
Merge pull request #11637 from thalesmg/port-scan-mria-check-m-20230919
Browse files Browse the repository at this point in the history
feat: add port scan diagnostics to mria waiting for tables checks
  • Loading branch information
thalesmg committed Sep 22, 2023
2 parents d6a9798 + d6935b6 commit 5e40057
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 3 deletions.
2 changes: 1 addition & 1 deletion apps/emqx/rebar.config
Expand Up @@ -28,7 +28,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.14"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.15"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
Expand Down
116 changes: 116 additions & 0 deletions apps/emqx_machine/src/emqx_machine.erl
Expand Up @@ -26,6 +26,13 @@
update_vips/0
]).

-export([open_ports_check/0]).

-ifdef(TEST).
-export([create_plan/0]).
-endif.

-include_lib("kernel/include/inet.hrl").
-include_lib("emqx/include/logger.hrl").

%% @doc EMQX boot entrypoint.
Expand All @@ -42,6 +49,7 @@ start() ->
ok = set_backtrace_depth(),
start_sysmon(),
configure_shard_transports(),
set_mnesia_extra_diagnostic_checks(),
ekka:start(),
ok.

Expand Down Expand Up @@ -94,3 +102,111 @@ configure_shard_transports() ->
end,
maps:to_list(ShardTransports)
).

set_mnesia_extra_diagnostic_checks() ->
Checks = [{check_open_ports, ok, fun ?MODULE:open_ports_check/0}],
mria_config:set_extra_mnesia_diagnostic_checks(Checks),
ok.

-define(PORT_PROBE_TIMEOUT, 10_000).
open_ports_check() ->
Plan = create_plan(),
%% 2 ports to check: ekka/epmd and gen_rpc
Timeout = 2 * ?PORT_PROBE_TIMEOUT + 5_000,
try emqx_utils:pmap(fun do_check/1, Plan, Timeout) of
Results ->
verify_results(Results)
catch
Kind:Reason:Stacktrace ->
#{
msg => "error probing ports",
exception => Kind,
reason => Reason,
stacktrace => Stacktrace
}
end.

verify_results(Results0) ->
Errors = [
R
|| R = {_Node, #{status := Status}} <- Results0,
Status =/= ok
],
case Errors of
[] ->
%% all ok
ok;
_ ->
Results1 = maps:from_list(Results0),
#{results => Results1, msg => "some ports are unreachable"}
end.

create_plan() ->
%% expected core nodes according to mnesia schema
OtherNodes = mnesia:system_info(db_nodes) -- [node()],
lists:map(
fun(N) ->
IPs = node_to_ips(N),
{_GenRPCMod, GenRPCPort} = gen_rpc_helper:get_client_config_per_node(N),
%% 0 or 1 result
EkkaEPMDPort = get_ekka_epmd_port(IPs),
{N, #{
resolved_ips => IPs,
ports_to_check => [GenRPCPort | EkkaEPMDPort]
}}
end,
OtherNodes
).

get_ekka_epmd_port([IP | _]) ->
%% we're currently only checking the first IP, if there are many
case erl_epmd:names(IP) of
{ok, NamePorts} ->
choose_emqx_epmd_port(NamePorts);
_ ->
[]
end;
get_ekka_epmd_port([]) ->
%% failed to get?
[].

%% filter out remsh and take the first emqx port as epmd/ekka port
choose_emqx_epmd_port([{"emqx" ++ _, Port} | _]) ->
[Port];
choose_emqx_epmd_port([{_Name, _Port} | Rest]) ->
choose_emqx_epmd_port(Rest);
choose_emqx_epmd_port([]) ->
[].

do_check({Node, #{resolved_ips := []} = Plan}) ->
{Node, Plan#{status => failed_to_resolve_ip}};
do_check({Node, #{resolved_ips := [IP | _]} = Plan}) ->
%% check other IPs too?
PortsToCheck = maps:get(ports_to_check, Plan),
PortStatus0 = lists:map(fun(P) -> is_tcp_port_open(IP, P) end, PortsToCheck),
case lists:all(fun(IsOpen) -> IsOpen end, PortStatus0) of
true ->
{Node, Plan#{status => ok}};
false ->
PortStatus1 = maps:from_list(lists:zip(PortsToCheck, PortStatus0)),
{Node, Plan#{status => bad_ports, open_ports => PortStatus1}}
end.

node_to_ips(Node) ->
NodeBin0 = atom_to_binary(Node),
HostOrIP = re:replace(NodeBin0, <<"^.+@">>, <<"">>, [{return, list}]),
case inet:gethostbyname(HostOrIP, inet) of
{ok, #hostent{h_addr_list = AddrList}} ->
AddrList;
_ ->
[]
end.

is_tcp_port_open(IP, Port) ->
case gen_tcp:connect(IP, Port, [], ?PORT_PROBE_TIMEOUT) of
{ok, P} ->
gen_tcp:close(P),
true;
_ ->
false
end.
62 changes: 62 additions & 0 deletions apps/emqx_machine/test/emqx_machine_SUITE.erl
Expand Up @@ -22,6 +22,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").

all() -> emqx_common_test_helpers:all(?MODULE).

Expand Down Expand Up @@ -67,6 +68,15 @@ end_per_suite(_Config) ->
init_per_testcase(t_custom_shard_transports, Config) ->
OldConfig = application:get_env(emqx_machine, custom_shard_transports),
[{old_config, OldConfig} | Config];
init_per_testcase(t_open_ports_check = TestCase, Config) ->
AppSpecs = [emqx],
Cluster = [
{emqx_machine_SUITE1, #{role => core, apps => AppSpecs}},
{emqx_machine_SUITE2, #{role => core, apps => AppSpecs}},
{emqx_machine_SUITE3, #{role => replicant, apps => AppSpecs}}
],
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
[{nodes, Nodes} | Config];
init_per_testcase(_TestCase, Config) ->
Config.

Expand All @@ -80,6 +90,10 @@ end_per_testcase(t_custom_shard_transports, Config) ->
application:unset_env(emqx_machine, custom_shard_transports)
end,
ok;
end_per_testcase(t_open_ports_check, Config) ->
Nodes = ?config(nodes, Config),
ok = emqx_cth_cluster:stop(Nodes),
ok;
end_per_testcase(_TestCase, _Config) ->
ok.

Expand Down Expand Up @@ -112,3 +126,51 @@ t_node_status(_Config) ->
},
jsx:decode(JSON)
).

t_open_ports_check(Config) ->
[Core1, Core2, Replicant] = ?config(nodes, Config),

Plan = erpc:call(Core1, emqx_machine, create_plan, []),
?assertMatch(
[{Core2, #{ports_to_check := [_GenRPC0, _Ekka0], resolved_ips := [_]}}],
Plan
),
[{Core2, #{ports_to_check := [GenRPCPort, EkkaPort], resolved_ips := [_]}}] = Plan,
?assertMatch(
[{Core1, #{ports_to_check := [_GenRPC1, _Ekka1], resolved_ips := [_]}}],
erpc:call(Core2, emqx_machine, create_plan, [])
),
?assertMatch(
[],
erpc:call(Replicant, emqx_machine, create_plan, [])
),

?assertEqual(ok, erpc:call(Core1, emqx_machine, open_ports_check, [])),
?assertEqual(ok, erpc:call(Core2, emqx_machine, open_ports_check, [])),
?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])),

ok = emqx_cth_cluster:stop_node(Core2),

?assertEqual(ok, erpc:call(Replicant, emqx_machine, open_ports_check, [])),
?assertMatch(
#{
msg := "some ports are unreachable",
results :=
#{
Core2 :=
#{
open_ports := #{
GenRPCPort := _,
EkkaPort := _
},
ports_to_check := [_, _],
resolved_ips := [_],
status := bad_ports
}
}
},
erpc:call(Core1, emqx_machine, open_ports_check, []),
#{core2 => Core2}
),

ok.
3 changes: 3 additions & 0 deletions changes/ce/feat-11637.en.md
@@ -0,0 +1,3 @@
Added an extra diagnostic to help debug issues when mnesia is waiting for tables.

Updated libraries: `ekka` -> 0.15.15, `mria` -> 0.6.4.
2 changes: 1 addition & 1 deletion mix.exs
Expand Up @@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.7", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-1", override: true},
{:ekka, github: "emqx/ekka", tag: "0.15.14", override: true},
{:ekka, github: "emqx/ekka", tag: "0.15.15", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "3.1.0", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.13", override: true},
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Expand Up @@ -62,7 +62,7 @@
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-1"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.14"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.15"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.1.0"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.13"}}}
Expand Down

0 comments on commit 5e40057

Please sign in to comment.