Skip to content

Commit

Permalink
Improve dist diagnostics
Browse files Browse the repository at this point in the history
 * Monitor nodes membership in mem3_distribution and log unexpected `nodedown`
   reasons.

 * Enhance mem3_distribution gen_sever to keep track of the last few events for
   each node. This can help detect nodes flapping or disconnecting often.

 * Expose the last few events from mem3_distribution in the .../_system
   endpoint, besides the already existing dist packet stats metrics.

 * Add `ping_nodes(...)` debug function. Use it to ping all connected nodes.
   This can help detect a still connected but slow network connection.

 * Add `dead_nodes(...)` debug function. Use it to detect a partitioned
   cluster, where some nodes may not be connected to each other.
  • Loading branch information
nickva committed Mar 24, 2024
1 parent 77c5418 commit e75b98f
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 30 deletions.
3 changes: 2 additions & 1 deletion src/chttpd/src/chttpd_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ get_stats() ->
{process_limit, erlang:system_info(process_limit)},
{message_queues, {MessageQueuesHist ++ MessageQueues}},
{internal_replication_jobs, mem3_sync:get_backlog()},
{distribution, {get_distribution_stats()}}
{distribution, {get_distribution_stats()}},
{distribution_events, mem3_distribution:events()}
].

db_pid_stats_formatted() ->
Expand Down
82 changes: 80 additions & 2 deletions src/couch/src/couch_debug.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@
restart/1,
restart_busy/2,
restart_busy/3,
restart_busy/4
restart_busy/4,
dead_nodes/0,
dead_nodes/1,
ping/1,
ping/2,
ping_nodes/0,
ping_nodes/1,
ping_nodes/2,
node_events/0
]).

-export([
Expand Down Expand Up @@ -85,7 +93,10 @@ help() ->
print_report,
print_report_with_info_width,
restart,
restart_busy
restart_busy,
dead_nodes,
ping_nodes,
node_events
].

-spec help(Function :: function_name()) -> ok.
Expand Down Expand Up @@ -435,6 +446,49 @@ help(analyze_resource_hoggers) ->
along with the number of top processes to include in result, TopN. See `couch_debug:help(resource_hoggers_snapshot)` for an
example and more info.
---
", []);
help(dead_nodes) ->
io:format("
dead_nodes()
dead_nodes(Timeout)
--------------------------------
Get the list of 'dead' nodes, that is node which appears in the
mem3:nodes() list but are not connected to some nodes of the cluster.
---
", []);
help(ping) ->
io:format("
ping(Node)
ping(Node, Timeout)
--------------------------------
Ping a node and return either a time in microseconds or an error term.
---
", []);
help(ping_nodes) ->
io:format("
ping_nodes()
ping_nodes(Timeout)
ping_nodes(Nodes, Timeout)
--------------------------------
Ping the list of currently connected nodes. Return a list of {Node,
Result} tuples where Result is either a time in microseconds or an
error term.
---
", []);
help(node_events) ->
io:format("
node_events()
--------------------------------
Return the list of nodeup/nodedown events for each node in the cluster.
---
", []);
help(Unknown) ->
Expand Down Expand Up @@ -910,6 +964,30 @@ restart_busy(ProcessList, Threshold, DelayInMsec, Property) when
busy(ProcessList, Threshold, Property)
).

dead_nodes() ->
mem3:dead_nodes().

dead_nodes(Timeout) ->
mem3:dead_nodes(Timeout).

ping(Node) ->
mem3:ping(Node).

ping(Node, Timeout) ->
mem3:ping(Node, Timeout).

ping_nodes() ->
mem3:ping_nodes().

ping_nodes(Timeout) ->
mem3:ping_nodes(Timeout).

ping_nodes(Nodes, Timeout) ->
mem3:ping_nodes(Nodes, Timeout).

node_events() ->
mem3_distribution:events().

%% Pretty print functions

%% Limitations:
Expand Down
135 changes: 116 additions & 19 deletions src/mem3/src/mem3.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
-export([belongs/2, owner/3]).
-export([get_placement/1]).
-export([ping/1, ping/2]).
-export([ping_nodes/0, ping_nodes/1, ping_nodes/2]).
-export([dead_nodes/0, dead_nodes/1]).
-export([db_is_current/1]).
-export([shard_creation_time/1]).
-export([generate_shard_suffix/0]).
Expand All @@ -40,7 +42,6 @@
-export([name/1, node/1, range/1, engine/1]).

-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").

-define(PING_TIMEOUT_IN_MS, 60000).

Expand Down Expand Up @@ -423,31 +424,127 @@ engine(Opts) when is_list(Opts) ->
%% Check whether a node is up or down
%% side effect: set up a connection to Node if there not yet is one.

-spec ping(Node :: atom()) -> pong | pang.
-spec ping(node()) -> pos_integer() | Error :: term().

ping(Node) ->
ping(Node, ?PING_TIMEOUT_IN_MS).
[{Node, Res}] = ping_nodes([Node]),
Res.

-spec ping(Node :: atom(), Timeout :: pos_integer()) -> pong | pang.
-spec ping(node(), Timeout :: pos_integer()) -> pos_integer() | Error :: term().

ping(Node, Timeout) when is_atom(Node) ->
%% The implementation of the function is copied from
%% lib/kernel/src/net_adm.erl with addition of a Timeout
case
catch gen:call(
{net_kernel, Node},
'$gen_call',
{is_auth, node()},
Timeout
)
of
{ok, yes} ->
pong;
_ ->
erlang:disconnect_node(Node),
pang
[{Node, Res}] = ping_nodes([Node], Timeout),
Res.

-spec ping_nodes() -> [{node(), pos_integer() | Error :: term()}].

ping_nodes() ->
ping_nodes(live_cluster_nodes(), ?PING_TIMEOUT_IN_MS).

-spec ping_nodes(Timeout :: pos_integer()) -> [{node(), pos_integer() | Error :: term()}].

ping_nodes(Timeout) when is_integer(Timeout), Timeout > 0 ->
ping_nodes(live_cluster_nodes(), Timeout).

ping_nodes(Nodes, Timeout) ->
PidRefs = [spawn_monitor(fun() -> exit(do_ping(N, Timeout)) end) || N <- Nodes],
Refs = maps:from_keys([Ref || {_Pid, Ref} <- PidRefs], true),
UntilMSec = erlang:monotonic_time(millisecond) + Timeout,
Results = gather_ping_results(Refs, UntilMSec, #{}),
Fun = fun(Node, {_Pid, Ref}) -> {Node, map_get(Ref, Results)} end,
lists:sort(lists:zipwith(Fun, Nodes, PidRefs)).

% Gather ping results but use an absolute time limit to avoid
% waiting up to Timeout's worth of time per individual node.
%
gather_ping_results(Refs, _Until, Results) when map_size(Refs) == 0 ->
Results;
gather_ping_results(Refs, Until, Results) ->
Timeout = Until - erlang:monotonic_time(millisecond),
case Timeout >= 0 of
true ->
receive
{'DOWN', Ref, _, _, Res} when is_map_key(Ref, Refs) ->
Refs1 = maps:remove(Ref, Refs),
Results1 = Results#{Ref => Res},
gather_ping_results(Refs1, Until, Results1)
after min(100, Timeout) ->
gather_ping_results(Refs, Until, Results)
end;
false ->
Fun = fun(Ref, true, Acc) ->
erlang:demonitor(Ref, [flush]),
Acc#{Ref => timeout}
end,
maps:fold(Fun, Results, Refs)
end.

live_cluster_nodes() ->
mem3_util:live_nodes() -- [node()].

do_ping(Node, Timeout) ->
T0 = erlang:monotonic_time(),
% This is the function called by net_adm ping. One difference is that
% net_adm ping on a failure will also forcibly disconnect a node
% which we don't do here:
% https://github.com/erlang/otp/blob/master/lib/kernel/src/net_adm.erl#L97
try gen_server:call({net_kernel, Node}, {is_auth, node()}, Timeout) of
yes ->
erlang:convert_time_unit(erlang:monotonic_time() - T0, native, microsecond);
Error ->
Error
catch
exit:{GenServerErr, _Stack} ->
GenServerErr;
Tag:Err ->
{Tag, Err}
end.

-spec dead_nodes() -> [node() | Error :: term()].

%% @doc Returns a list of dead nodes from the cluster.
%%
%% "dead" node is a node which appears in the mem3:nodes() list but is not
%% connected. Dead nodes are included in the result if it's considered
%% "dead" by any of the reachable nodes or doesn't respond or timeout when
%% queried by the multicall/2.
%%
%% dead_nodes(TimeoutInMSec) will use the timeout for the rpc:multicall. If any
%% node fails to respond in that time, it will be added to the dead nodes
%% response list as well.
%%
%% The default timeout is 30 seconds
%%
dead_nodes() ->
dead_nodes(?PING_TIMEOUT_IN_MS).

-spec dead_nodes(Timeout :: pos_integer()) -> [node() | Error :: term()].

dead_nodes(Timeout) when is_integer(Timeout), Timeout > 0 ->
% Here we are trying to detect overlapping partitions where not all the
% nodes connect to each other. For example: n1 connects to n2 and n3, but
% n2 and n3 are not connected.
DeadFun = fun() ->
Expected = ordsets:from_list(mem3:nodes()),
Live = ordsets:from_list(mem3_util:live_nodes()),
Dead = ordsets:subtract(Expected, Live),
ordsets:to_list(Dead)
end,
{Responses, BadNodes} = multicall(DeadFun, Timeout),
AccF = lists:foldl(
fun
(Dead, Acc) when is_list(Dead) -> ordsets:union(Acc, Dead);
(Error, Acc) -> ordsets:union(Acc, [Error])
end,
ordsets:from_list(BadNodes),
Responses
),
ordsets:to_list(AccF).

multicall(Fun, Timeout) when is_integer(Timeout), Timeout > 0 ->
F = fun() -> catch Fun() end,
rpc:multicall(erlang, apply, [F, []], Timeout).

db_is_current(#shard{name = Name}) ->
db_is_current(Name);
db_is_current(<<"shards/", _/binary>> = Name) ->
Expand Down
69 changes: 64 additions & 5 deletions src/mem3/src/mem3_distribution.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

-export([
start_link/0,
connect_node/1
connect_node/1,
events/0
]).

-export([
Expand All @@ -30,9 +31,13 @@
]).

-define(JITTER_PERCENT, 0.25).
% init + 3 up/down pairs
-define(EVENT_MAX_HISTORY, 7).

-record(st, {
tref
tref,
% #{Node => [Evt, ...]}
events = #{}
}).

start_link() ->
Expand All @@ -41,20 +46,38 @@ start_link() ->
connect_node(Node) when is_atom(Node) ->
net_kernel:connect_node(Node).

events() ->
gen_server:call(?MODULE, events, infinity).

init(_) ->
connect(false),
{ok, #st{tref = erlang:send_after(wait_msec(), self(), connect)}}.

net_kernel:monitor_nodes(true, [nodedown_reason]),
St = #st{
tref = schedule_connect(),
events = init_events(nodes())
},
{ok, St}.

handle_call(events, _From, #st{} = St) ->
{reply, St#st.events, St};
handle_call(Msg, _From, #st{} = St) ->
{stop, {bad_call, Msg}, {bad_call, Msg}, St}.

handle_cast(Msg, #st{} = St) ->
{stop, {bad_cast, Msg}, St}.

handle_info({nodeup, Node, _Info}, #st{events = Events} = St) ->
Events1 = update_event(Node, nodeup, null, Events),
{noreply, St#st{events = Events1}};
handle_info({nodedown, Node, Info}, #st{events = Events} = St) ->
Reason = couch_util:get_value(nodedown_reason, Info),
log_node_down(Node, Reason),
Events1 = update_event(Node, nodedown, Reason, Events),
{noreply, St#st{events = Events1}};
handle_info(connect, #st{} = St) ->
erlang:cancel_timer(St#st.tref),
ok = connect(true),
{noreply, St#st{tref = erlang:send_after(wait_msec(), self(), connect)}};
{noreply, St#st{tref = schedule_connect()}};
handle_info(Msg, St) ->
{stop, {bad_info, Msg}, St}.

Expand All @@ -78,6 +101,18 @@ log(_, _, _) ->
% Failed to connect or we don't want to log it
ok.

log_node_down(_Node, connection_closed) ->
% Received on a regular node shutdown
ok;
log_node_down(_Node, disconnect) ->
% This node requested a disconnect
ok;
log_node_down(Node, Reason) ->
couch_log:warning("~s : node ~s down, reason: ~p", [?MODULE, Node, Reason]).

schedule_connect() ->
erlang:send_after(wait_msec(), self(), connect).

wait_msec() ->
IntervalSec = config:get_integer("cluster", "reconnect_interval_sec", 37),
IntervalMSec = IntervalSec * 1000,
Expand All @@ -87,3 +122,27 @@ jitter(Interval) ->
Jitter = round(Interval * ?JITTER_PERCENT),
% rand:uniform(0) crashes!
rand:uniform(max(1, Jitter)).

init_events(Nodes) ->
NowSec = erlang:system_time(second),
maps:from_keys(Nodes, [make_event(NowSec, init, null)]).

update_event(Node, Event, Reason, EventMap) ->
NowSec = erlang:system_time(second),
Fun = fun(Events) ->
History = Events ++ [make_event(NowSec, Event, Reason)],
Rev = lists:reverse(History),
RevTrim = lists:sublist(Rev, ?EVENT_MAX_HISTORY),
lists:reverse(RevTrim)
end,
maps:update_with(Node, Fun, [make_event(NowSec, Event, Reason)], EventMap).

make_event(TSec, Event, Reason) when is_integer(TSec), TSec > 0, is_atom(Event) ->
TStr = calendar:system_time_to_rfc3339(TSec, [{offset, "Z"}]),
Res = [list_to_binary(TStr), Event],
% For non-TCP (custom) dist protocol reason maybe a non-atom
case Reason of
null -> Res;
Atom when is_atom(Atom) -> Res ++ [Atom];
Val -> Res ++ [couch_util:to_binary(Val)]
end.
Loading

0 comments on commit e75b98f

Please sign in to comment.