Skip to content

Commit

Permalink
Reconnect to epmd
Browse files Browse the repository at this point in the history
When the connection to epmd is lost, try to reconnect every 2s
  • Loading branch information
dotsimon authored and Simon Cornish committed Feb 2, 2021
1 parent b5d5141 commit 180d855
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 7 deletions.
3 changes: 0 additions & 3 deletions erts/doc/src/erl_dist_protocol.xml
Expand Up @@ -430,9 +430,6 @@ io:format("old/unused name ~ts at port ~p, fd = ~p ~n",

<p>where n = <c>Length</c> - 1.</p>

<p>The current implementation of Erlang does not care if the connection
to the EPMD is broken.</p>

<p>The response for a <c>STOP_REQ</c> is as follows:</p>

<table align="left">
Expand Down
6 changes: 4 additions & 2 deletions lib/kernel/doc/src/erl_epmd.xml
Expand Up @@ -56,8 +56,10 @@
<desc>
<p>Registers the node with <c>epmd</c> and tells epmd what port will be
used for the current node. It returns a creation number. This number is
incremented on each register to help with identifying if a node is
reconnecting to epmd.</p>
incremented on each register to help differentiate a new node instance
connecting to epmd with the same name.</p>
<p>After the node has successfully registered with epmd it will automatically
attempt reconnect to the daemon if the connection is broken.</p>
</desc>
</func>

Expand Down
17 changes: 15 additions & 2 deletions lib/kernel/src/erl_epmd.erl
Expand Up @@ -53,13 +53,15 @@

-import(lists, [reverse/1]).

-record(state, {socket, port_no = -1, name = ""}).
-record(state, {socket, port_no = -1, name = "", family}).
-type state() :: #state{}.

-include("inet_int.hrl").
-include("erl_epmd.hrl").
-include_lib("kernel/include/inet.hrl").

-define(RECONNECT_TIME, 2000).

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
Expand Down Expand Up @@ -228,7 +230,8 @@ handle_call({register, Name, PortNo, Family}, _From, State) ->
{alive, Socket, Creation} ->
S = State#state{socket = Socket,
port_no = PortNo,
name = Name},
name = Name,
family = Family},
{reply, {ok, Creation}, S};
Error ->
case init:get_argument(erl_epmd_port) of
Expand Down Expand Up @@ -263,7 +266,17 @@ handle_cast(_, State) ->
-spec handle_info(term(), state()) -> {'noreply', state()}.

handle_info({tcp_closed, Socket}, State) when State#state.socket =:= Socket ->
erlang:send_after(?RECONNECT_TIME, self(), reconnect),
{noreply, State#state{socket = -1}};
handle_info(reconnect, State) when State#state.socket =:= -1 ->
case do_register_node(State#state.name, State#state.port_no, State#state.family) of
{alive, Socket, _Creation} ->
%% ignore the received creation
{noreply, State#state{socket = Socket}};
_Error ->
erlang:send_after(?RECONNECT_TIME, self(), reconnect),
{noreply, State}
end;
handle_info(_, State) ->
{noreply, State}.

Expand Down
88 changes: 88 additions & 0 deletions lib/kernel/test/erl_distribution_SUITE.erl
Expand Up @@ -30,6 +30,7 @@
nodenames/1, hostnames/1,
illegal_nodenames/1, hidden_node/1,
dyn_node_name/1,
epmd_reconnect/1,
setopts/1,
table_waste/1, net_setuptime/1,
inet_dist_options_options/1,
Expand All @@ -54,6 +55,7 @@
tick_serv_test/2, tick_serv_test1/1,
run_remote_test/1,
dyn_node_name_do/2,
epmd_reconnect_do/2,
setopts_do/2,
keep_conn/1, time_ping/1]).

Expand All @@ -64,6 +66,8 @@
-export([pinger/1]).

-define(DUMMY_NODE,dummy@test01).
-define(ALT_EPMD_PORT, "12321").
-define(ALT_EPMD_CMD, "epmd -port "++?ALT_EPMD_PORT).

%%-----------------------------------------------------------------
%% The distribution is mainly tested in the big old test_suite.
Expand All @@ -82,6 +86,7 @@ all() ->
tick, tick_change, nodenames, hostnames, illegal_nodenames,
connect_node,
dyn_node_name,
epmd_reconnect,
hidden_node, setopts,
table_waste, net_setuptime, inet_dist_options_options,
{group, monitor_nodes},
Expand Down Expand Up @@ -117,9 +122,15 @@ init_per_testcase(TC, Config) when TC == hostnames;
file:make_dir("hostnames_nodedir"),
file:write_file("hostnames_nodedir/ignore_core_files",""),
Config;
init_per_testcase(epmd_reconnect, Config) ->
[] = os:cmd(?ALT_EPMD_CMD++" -relaxed_command_check -daemon"),
Config;
init_per_testcase(Func, Config) when is_atom(Func), is_list(Config) ->
Config.

end_per_testcase(epmd_reconnect, _Config) ->
os:cmd(?ALT_EPMD_CMD++" -kill"),
ok;
end_per_testcase(_Func, _Config) ->
ok.

Expand Down Expand Up @@ -427,6 +438,83 @@ tick_cli_test1(Node) ->
end
end.

epmd_reconnect(Config) when is_list(Config) ->
NodeNames = [N1,N2,N3] = get_nodenames(3, ?FUNCTION_NAME),
Nodes = [atom_to_list(full_node_name(NN)) || NN <- NodeNames],

DCfg = "-epmd_port "++?ALT_EPMD_PORT,

{_N1F,Port1} = start_node_unconnected(DCfg, N1, ?MODULE, run_remote_test,
["epmd_reconnect_do", atom_to_list(node()), "1" | Nodes]),
{_N2F,Port2} = start_node_unconnected(DCfg, N2, ?MODULE, run_remote_test,
["epmd_reconnect_do", atom_to_list(node()), "2" | Nodes]),
{_N3F,Port3} = start_node_unconnected(DCfg, N3, ?MODULE, run_remote_test,
["epmd_reconnect_do", atom_to_list(node()), "3" | Nodes]),
Ports = [Port1, Port2, Port3],

ok = reap_ports(Ports),

ok.

reap_ports([]) ->
ok;
reap_ports(Ports) ->
case (receive M -> M end) of
{Port, Message} ->
case lists:member(Port, Ports) andalso Message of
{data,String} ->
io:format("~p: ~s\n", [Port, String]),
reap_ports(Ports);
{exit_status,0} ->
reap_ports(Ports -- [Port])
end
end.

epmd_reconnect_do(_Node, ["1", Node1, Node2, Node3]) ->
Names = [Name || Name <- [hd(string:tokens(Node, "@")) || Node <- [Node1, Node2, Node3]]],
%% wait until all nodes are registered
ok = wait_for_names(Names),
"Killed" ++_ = os:cmd(?ALT_EPMD_CMD++" -kill"),
open_port({spawn, ?ALT_EPMD_CMD}, []),
%% check that all nodes reregister with epmd
ok = wait_for_names(Names),
lists:foreach(fun(Node) ->
ANode = list_to_atom(Node),
pong = net_adm:ping(ANode),
{epmd_reconnect_do, ANode} ! {stop, Node1, Node}
end, [Node2, Node3]),
ok;
epmd_reconnect_do(_Node, ["2", Node1, Node2, _Node3]) ->
register(epmd_reconnect_do, self()),
receive {stop, Node1, Node2} ->
ok
after 7000 ->
exit(timeout)
end;
epmd_reconnect_do(_Node, ["3", Node1, _Node2, Node3]) ->
register(epmd_reconnect_do, self()),
receive {stop, Node1, Node3} ->
ok
after 7000 ->
exit(timeout)
end.

wait_for_names(Names) ->
%% wait for up to 3 seconds (the current retry timer in erl_epmd is 2s)
wait_for_names(lists:sort(Names), 30, 100).

wait_for_names(Names, N, Wait) when N > 0 ->
try
{ok, Info} = erl_epmd:names(),
Names = lists:sort([Name || {Name, _Port} <- Info]),
ok
catch
error:{badmatch, _} ->
timer:sleep(Wait),
wait_for_names(Names, N-1, Wait)
end.


dyn_node_name(Config) when is_list(Config) ->
%%run_dist_configs(fun dyn_node_name/2, Config).
dyn_node_name("", Config).
Expand Down

0 comments on commit 180d855

Please sign in to comment.