Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not run check in cets_discovery on nodedown #50

Merged
merged 13 commits into from
Mar 4, 2024
Merged
29 changes: 24 additions & 5 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,16 @@
}.
%% Status information returned `info/1'.

-type handle_down_fun() :: fun((#{remote_pid := server_pid(), table := table_name()}) -> ok).
-type handle_down_fun() :: fun(
(
#{
remote_pid := server_pid(),
remote_node := node(),
table := table_name(),
is_leader := boolean()
}
) -> ok
).
%% Handler function which is called when the remote node goes down.

-type handle_conflict_fun() :: fun((tuple(), tuple()) -> tuple()).
Expand Down Expand Up @@ -647,9 +656,12 @@ handle_down2(RemotePid, Reason, State = #{other_servers := Servers, ack_pid := A
case lists:member(RemotePid, Servers) of
true ->
cets_ack:send_remote_down(AckPid, RemotePid),
call_user_handle_down(RemotePid, State),
Servers2 = lists:delete(RemotePid, Servers),
update_node_down_history(RemotePid, Reason, set_other_servers(Servers2, State));
State3 = update_node_down_history(
RemotePid, Reason, set_other_servers(Servers2, State)
),
call_user_handle_down(RemotePid, State3),
State3;
false ->
%% This should not happen
?LOG_ERROR(#{
Expand Down Expand Up @@ -896,10 +908,17 @@ handle_get_info(

%% Cleanup
-spec call_user_handle_down(server_pid(), state()) -> ok.
call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts}) ->
call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts, is_leader := IsLeader}) ->
case Opts of
#{handle_down := F} ->
FF = fun() -> F(#{remote_pid => RemotePid, table => Tab}) end,
FF = fun() ->
F(#{
remote_pid => RemotePid,
remote_node => node(RemotePid),
table => Tab,
is_leader => IsLeader
})
end,
Info = #{
task => call_user_handle_down,
table => Tab,
Expand Down
69 changes: 52 additions & 17 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
-type get_nodes_result() :: {ok, [node()]} | {error, term()}.
%% Result of `get_nodes/2' call.

-type retry_type() :: initial | after_error | regular.
-type retry_type() :: initial | after_error | regular | after_nodedown.
%% Retry logic type.

-type from() :: {pid(), reference()}.
Expand All @@ -107,7 +107,7 @@
phase := initial | regular,
results := [join_result()],
nodes := ordsets:ordset(node()),
%% The nodes that returned pang, sorted
%% The nodes that returned pang or nodedown, sorted
unavailable_nodes := ordsets:ordset(node()),
tables := [atom()],
backend_module := module(),
Expand Down Expand Up @@ -216,7 +216,7 @@ wait_for_get_nodes(Server, Timeout) ->
%% @private
-spec init(term()) -> {ok, state()}.
init(Opts) ->
StartTime = erlang:system_time(millisecond),
StartTime = get_time(),
%% Sends nodeup / nodedown
ok = net_kernel:monitor_nodes(true),
Mod = maps:get(backend_module, Opts, cets_discovery_file),
Expand Down Expand Up @@ -308,8 +308,6 @@ handle_info({nodeup, Node}, State) ->
{noreply, try_joining(State3)};
handle_info({nodedown, Node}, State) ->
State2 = handle_nodedown(Node, State),
%% Do another check to update unavailable_nodes list
self() ! check,
{noreply, State2};
handle_info({start_time, Node, StartTime}, State) ->
{noreply, handle_receive_start_time(Node, StartTime, State)};
Expand Down Expand Up @@ -409,6 +407,9 @@ prune_unavailable_nodes_if_needed(State = #{nodes := Nodes, unavailable_nodes :=
%% Unavailable nodes is a subset of discovered nodes
State#{unavailable_nodes := ordsets:intersection(Nodes, UnNodes)}.

%% We should not ping nodes that just got disconnected.
%% Let the disconnected node to connect if it restarts on its own.
%% Or reconnect to it after a timeout.
-spec ping_not_connected_nodes([node()]) -> ok.
ping_not_connected_nodes(Nodes) ->
Self = self(),
Expand Down Expand Up @@ -454,16 +455,36 @@ choose_retry_type(#{last_get_nodes_result := {error, _}}) ->
after_error;
choose_retry_type(#{phase := initial}) ->
initial;
choose_retry_type(_) ->
regular.
choose_retry_type(State) ->
case last_node_down(State) of
false ->
regular;
Node ->
%% Allow to reconnect after a netsplit but not too quick.
GracePeriod = retry_type_to_timeout(after_nodedown),
case get_downtime(Node, State) < GracePeriod of
true ->
after_nodedown;
false ->
regular
end
end.

-spec last_node_down(state()) -> false | node().
last_node_down(#{nodedown_timestamps := Map}) when map_size(Map) =:= 0 ->
false;
last_node_down(#{nodedown_timestamps := Map}) ->
{Node, _TS} = lists:last(lists:keysort(2, maps:to_list(Map))),
Node.

%% Returns timeout in milliseconds to retry calling the get_nodes function.
%% get_nodes is called after add_table without waiting.
%% It is also would be retried without waiting if should_retry_get_nodes set to true.
-spec retry_type_to_timeout(retry_type()) -> non_neg_integer().
retry_type_to_timeout(initial) -> timer:seconds(5);
retry_type_to_timeout(after_error) -> timer:seconds(1);
retry_type_to_timeout(regular) -> timer:minutes(5).
retry_type_to_timeout(regular) -> timer:minutes(5);
retry_type_to_timeout(after_nodedown) -> timer:seconds(30).

-spec cancel_old_timer(state()) -> ok.
cancel_old_timer(#{timer_ref := OldRef}) when is_reference(OldRef) ->
Expand All @@ -482,8 +503,9 @@ flush_all_checks() ->

-spec do_join(atom(), node()) -> join_result().
do_join(Tab, Node) ->
%% Possible race condition: Node got disconnected
LocalPid = whereis(Tab),
%% That would trigger autoconnect for the first time
%% That could trigger autoconnect if Node is not connected
case rpc:call(Node, erlang, whereis, [Tab]) of
Pid when is_pid(Pid), is_pid(LocalPid) ->
Result = cets_join:join(cets_discovery, #{table => Tab}, LocalPid, Pid),
Expand Down Expand Up @@ -560,10 +582,10 @@ has_join_result_for(Node, Table, #{results := Results}) ->

-spec handle_system_info(state()) -> system_info().
handle_system_info(State) ->
State#{verify_ready => verify_ready(State)}.
State#{verify_ready => verify_ready(State), retry_type => choose_retry_type(State)}.

-spec handle_nodedown(node(), state()) -> state().
handle_nodedown(Node, State) ->
handle_nodedown(Node, State = #{unavailable_nodes := UnNodes}) ->
State2 = remember_nodedown_timestamp(Node, State),
{NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2),
?LOG_WARNING(
Expand All @@ -574,7 +596,8 @@ handle_nodedown(Node, State) ->
time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State)
})
),
State3.
State4 = State3#{unavailable_nodes := ordsets:add_element(Node, UnNodes)},
trigger_verify_ready(State4).

-spec handle_nodeup(node(), state()) -> state().
handle_nodeup(Node, State) ->
Expand All @@ -595,13 +618,13 @@ handle_nodeup(Node, State) ->

-spec remember_nodeup_timestamp(node(), state()) -> state().
remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Time = get_time(),
Map2 = Map#{Node => Time},
State#{nodeup_timestamps := Map2}.

-spec remember_nodedown_timestamp(node(), state()) -> state().
remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Time = get_time(),
Map2 = Map#{Node => Time},
State#{nodedown_timestamps := Map2}.

Expand All @@ -617,6 +640,7 @@ calculate_uptime(undefined) ->
calculate_uptime(StartTime) ->
time_since(StartTime).

-spec get_downtime(node(), state()) -> milliseconds() | undefined.
get_downtime(Node, #{nodedown_timestamps := Map}) ->
case maps:get(Node, Map, undefined) of
undefined ->
Expand All @@ -633,8 +657,13 @@ set_defined(Key, Value, Map) ->
time_since_startup_in_milliseconds(#{start_time := StartTime}) ->
time_since(StartTime).

time_since(StartTime) ->
erlang:system_time(millisecond) - StartTime.
-spec time_since(integer()) -> integer().
time_since(StartTime) when is_integer(StartTime) ->
get_time() - StartTime.

-spec get_time() -> milliseconds().
get_time() ->
erlang:system_time(millisecond).

send_start_time_to(Node, #{start_time := StartTime}) ->
case erlang:process_info(self(), registered_name) of
Expand All @@ -659,4 +688,10 @@ handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Ma
%% Restarted node reconnected, this is fine during the rolling updates
ok
end,
State#{node_start_timestamps := maps:put(Node, StartTime, Map)}.
%% We are in the regular phase,
%% once we get contact with another disco server.
%% It affects the check intervals.
State#{
node_start_timestamps := maps:put(Node, StartTime, Map),
phase := regular
}.
2 changes: 1 addition & 1 deletion src/cets_join.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) ->
LockRequest = {LockKey, self()},
%% Just lock all nodes, no magic here :)
Nodes = [node() | nodes()],
Retries = 1,
Retries = 0,
%% global could abort the transaction when one of the nodes goes down.
%% It could usually abort it during startup or update.
case global:trans(LockRequest, F, Nodes, Retries) of
Expand Down
Loading
Loading