Skip to content

Commit

Permalink
Merge pull request #4525 from apache/prometheus_erlang_dist
Browse files Browse the repository at this point in the history
feat (prometheus): additional metrics to match _system
  • Loading branch information
willholley committed Apr 14, 2023
2 parents 865d5f8 + daecd3f commit 98a356c
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// apache/couchdbci-debian:bullseye-erlang-24.3.4.2
// apache/couchdbci-debian:bullseye-erlang-23.3.4.15
//
"COUCHDB_IMAGE": "apache/couchdbci-debian:bullseye-erlang-25.0.2"
"COUCHDB_IMAGE": "apache/couchdbci-debian:bullseye-erlang-24.3.4.10"
}
},

Expand Down
41 changes: 26 additions & 15 deletions src/chttpd/src/chttpd_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
-export([
handle_node_req/1,
get_stats/0,
run_queues/0
run_queues/0,
message_queues/0,
db_pid_stats/0
]).

-include_lib("couch/include/couch_db.hrl").
Expand Down Expand Up @@ -284,14 +286,13 @@ get_stats() ->
],
{NumberOfGCs, WordsReclaimed, _} = statistics(garbage_collection),
{{input, Input}, {output, Output}} = statistics(io),
{CF, CDU} = db_pid_stats(),
MessageQueues0 = [

{CF, CDU} = db_pid_stats_formatted(),
MessageQueuesHist = [
{couch_file, {CF}},
{couch_db_updater, {CDU}},
{couch_server, couch_server:aggregate_queue_len()},
{index_server, couch_index_server:aggregate_queue_len()}
{couch_db_updater, {CDU}}
],
MessageQueues = MessageQueues0 ++ message_queues(registered()),
MessageQueues = message_queues(),
{SQ, DCQ} = run_queues(),
[
{uptime, couch_app:uptime() div 1000},
Expand All @@ -309,11 +310,15 @@ get_stats() ->
{stale_proc_count, couch_proc_manager:get_stale_proc_count()},
{process_count, erlang:system_info(process_count)},
{process_limit, erlang:system_info(process_limit)},
{message_queues, {MessageQueues}},
{message_queues, {MessageQueuesHist ++ MessageQueues}},
{internal_replication_jobs, mem3_sync:get_backlog()},
{distribution, {get_distribution_stats()}}
].

db_pid_stats_formatted() ->
{CF, CDU} = db_pid_stats(),
{format_pid_stats(CF), format_pid_stats(CDU)}.

db_pid_stats() ->
{monitors, M} = process_info(whereis(couch_stats_process_tracker), monitors),
Candidates = [Pid || {process, Pid} <- M],
Expand All @@ -322,7 +327,7 @@ db_pid_stats() ->
{CouchFiles, CouchDbUpdaters}.

db_pid_stats(Mod, Candidates) ->
Mailboxes = lists:foldl(
lists:foldl(
fun(Pid, Acc) ->
case process_info(Pid, [message_queue_len, dictionary]) of
undefined ->
Expand All @@ -342,8 +347,7 @@ db_pid_stats(Mod, Candidates) ->
end,
[],
Candidates
),
format_pid_stats(Mailboxes).
).

format_pid_stats([]) ->
[];
Expand Down Expand Up @@ -385,15 +389,22 @@ get_distribution_stats() ->
erlang:system_info(dist_ctrl)
).

message_queues(Registered) ->
lists:map(
-spec message_queues() ->
[{Name :: atom(), Length :: pos_integer()}].
message_queues() ->
MessageQueuesAgg = [
{couch_server, couch_server:aggregate_queue_len()},
{index_server, couch_index_server:aggregate_queue_len()}
],
MessageQueuesReg = lists:map(
fun(Name) ->
Type = message_queue_len,
{Type, Length} = process_info(whereis(Name), Type),
{Name, Length}
end,
Registered
).
registered()
),
MessageQueuesAgg ++ MessageQueuesReg.

%% Workaround for https://bugs.erlang.org/browse/ERL-1355
run_queues() ->
Expand Down
205 changes: 172 additions & 33 deletions src/couch_prometheus/src/couch_prometheus_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-import(couch_prometheus_util, [
couch_to_prom/3,
to_prom/4,
to_prom/2,
to_prom_summary/2
]).

Expand Down Expand Up @@ -110,11 +111,13 @@ get_system_stats() ->
get_uptime_stat(),
get_io_stats(),
get_message_queue_stats(),
get_db_pid_stats(),
get_run_queue_stats(),
get_vm_stats(),
get_ets_stats(),
get_internal_replication_jobs_stat(),
get_membership_stat()
get_membership_stat(),
get_distribution_stats()
]).

get_uptime_stat() ->
Expand Down Expand Up @@ -198,8 +201,7 @@ get_io_stats() ->
].

get_message_queue_stats() ->
QFun = fun(Name) -> {Name, message_queue_len(whereis(Name))} end,
Queues = lists:map(QFun, registered()),
Queues = chttpd_node:message_queues(),
QueueLens = lists:map(fun({_, Len}) -> Len end, Queues),
QueueLenByLabel = lists:map(fun({Name, Len}) -> {[{queue_name, Name}], Len} end, Queues),
[
Expand All @@ -221,33 +223,181 @@ get_message_queue_stats() ->
to_prom(erlang_message_queue_size, gauge, "size of message queue", QueueLenByLabel)
].

message_queue_len(undefined) ->
0;
message_queue_len(Pid) when is_pid(Pid) ->
case erlang:process_info(Pid, message_queue_len) of
{message_queue_len, N} ->
N;
_ ->
0
end.
get_db_pid_stats() ->
{CF, CDU} = chttpd_node:db_pid_stats(),
[
pid_to_prom_summary(
"erlang_message_queue_couch_file",
"size of message queue across couch_file processes",
CF
),
pid_to_prom_summary(
"erlang_message_queue_couch_db_updater",
"size of message queue across couch_db_updater processes",
CDU
)
].

pid_to_prom_summary(_, _, []) ->
[];
pid_to_prom_summary(Metric, Desc, Mailboxes) ->
Sorted = lists:sort(Mailboxes),
Count = length(Sorted),
Quantiles = [
{[{quantile, <<"0.5">>}], lists:nth(round(Count * 0.5), Sorted)},
{[{quantile, <<"0.9">>}], lists:nth(round(Count * 0.9), Sorted)},
{[{quantile, <<"0.99">>}], lists:nth(round(Count * 0.99), Sorted)}
],
SumStat = to_prom(Metric ++ ["_sum"], lists:sum(Sorted)),
CountStat = to_prom(Metric ++ ["_count"], length(Sorted)),
MinStat = to_prom(Metric ++ ["_min"], hd(Sorted)),
MaxStat = to_prom(Metric ++ ["_max"], lists:last(Sorted)),
to_prom(Metric, summary, Desc, Quantiles) ++ [SumStat, CountStat, MinStat, MaxStat].

get_run_queue_stats() ->
%% Workaround for https://bugs.erlang.org/browse/ERL-1355
{Normal, Dirty} =
case erlang:system_info(dirty_cpu_schedulers) > 0 of
false ->
{statistics(run_queue), 0};
true ->
[DCQ | SQs] = lists:reverse(statistics(run_queue_lengths)),
{lists:sum(SQs), DCQ}
end,
{SQ, DCQ} = chttpd_node:run_queues(),
[
to_prom(erlang_scheduler_queues, gauge, "the total size of all normal run queues", Normal),
to_prom(erlang_scheduler_queues, gauge, "the total size of all normal run queues", SQ),
to_prom(
erlang_dirty_cpu_scheduler_queues,
gauge,
"the total size of all dirty CPU scheduler run queues",
Dirty
DCQ
)
].

% gets the socket stat for the specified socket,
% inverting the result from inet:getstat/1 to
% return a map keyed on the stat_option and
% with a value representing the node and stat value
% e.g.
% #{
% recv_oct => [{[{node="node2@127.0.0.1"}], 30609}]
% recv_cnt => [{[{node="node2@127.0.0.1"}], 123}]
% ...
% }
% where there is an error fetching the socket stats,
% return no result for the specified node.
-spec get_sock_stats({Node, Socket}, MapAcc) ->
#{OptionValue := [{[{node, Node}], Value}]}
when
Node :: node(),
Socket :: inet:socket(),
OptionValue :: inet:stat_option(),
Value :: integer(),
MapAcc :: #{OptionValue := [{[{node, Node}], Value}]}.
get_sock_stats({Node, Socket}, MapAcc) ->
try inet:getstat(Socket) of
{ok, Stats} ->
% For each Key/Value pair in Stats, append
% an entry for the current Node to the result.
% This relies on lists:foldl returning the final
% accumulated map
lists:foldl(
fun({StatOption, Value}, Map0) ->
maps:update_with(StatOption, fun(V) -> V ++ [{[{node, Node}], Value}] end, Map0)
end,
MapAcc,
Stats
)
catch
_:_ ->
% no result
MapAcc
end.

get_distribution_stats() ->
% each distribution metric has a different type,
% so expose each as a different metric with the erlang
% node as a label.
% This is the inverse of the structure returned by
% inet:getstat/1.

% This fold accumulates a map keyed on the socket
% stat_option (https://www.erlang.org/doc/man/inet.html#getstat-2)
% where the value is a list of labels/value pairs for that stat
% e.g.
% recv_oct => [{[{node="node2@127.0.0.1"}], 30609}, {[{node="node3@127.0.0.1"}], 28392}]
% recv_cnt => [{[{node="node2@127.0.0.1"}], 123}, {[{node="node3@127.0.0.1"}], 134}]
DefaultMap = #{
recv_oct => [],
recv_cnt => [],
recv_max => [],
recv_avg => [],
recv_dvi => [],
send_oct => [],
send_cnt => [],
send_max => [],
send_avg => [],
send_pend => []
},
NodeStats = erlang:system_info(dist_ctrl),
DistStats = lists:foldl(
fun get_sock_stats/2,
DefaultMap,
NodeStats
),
[
to_prom(
erlang_distribution_recv_oct_bytes_total,
counter,
"Number of bytes received by the socket.",
maps:get(recv_oct, DistStats)
),
to_prom(
erlang_distribution_recv_cnt_packets_total,
counter,
"number of packets received by the socket.",
maps:get(recv_cnt, DistStats)
),
to_prom(
erlang_distribution_recv_max_bytes,
gauge,
"size of the largest packet, in bytes, received by the socket.",
maps:get(recv_max, DistStats)
),
to_prom(
erlang_distribution_recv_avg_bytes,
gauge,
"average size of packets, in bytes, received by the socket.",
maps:get(recv_avg, DistStats)
),
to_prom(
erlang_distribution_recv_dvi_bytes,
gauge,
"average packet size deviation, in bytes, received by the socket.",
maps:get(recv_dvi, DistStats)
),
to_prom(
erlang_distribution_send_oct_bytes_total,
counter,
"Number of bytes sent by the socket.",
maps:get(send_oct, DistStats)
),
to_prom(
erlang_distribution_send_cnt_packets_total,
counter,
"number of packets sent by the socket.",
maps:get(send_cnt, DistStats)
),
to_prom(
erlang_distribution_send_max_bytes,
gauge,
"size of the largest packet, in bytes, sent by the socket.",
maps:get(send_max, DistStats)
),
to_prom(
erlang_distribution_send_avg_bytes,
gauge,
"average size of packets, in bytes, sent by the socket.",
maps:get(send_avg, DistStats)
),
to_prom(
erlang_distribution_send_pend_bytes,
gauge,
"number of bytes waiting to be sent by the socket.",
maps:get(send_pend, DistStats)
)
].

Expand All @@ -271,17 +421,6 @@ update_refresh_timer() ->

-include_lib("couch/include/couch_eunit.hrl").

message_queue_len_test() ->
self() ! refresh,
?assert(message_queue_len(self()) >= 1),
?assertEqual(0, message_queue_len(undefined)),
{Pid, Ref} = spawn_monitor(fun() -> ok end),
receive
{'DOWN', Ref, process, Pid, _} ->
ok
end,
?assertEqual(0, message_queue_len(Pid)).

drain_refresh_messages_test() ->
self() ! refresh,
{messages, Mq0} = erlang:process_info(self(), messages),
Expand Down
3 changes: 3 additions & 0 deletions src/couch_prometheus/src/couch_prometheus_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
couch_to_prom/3,
to_bin/1,
to_prom/4,
to_prom/2,
to_prom_summary/2
]).

Expand Down Expand Up @@ -124,6 +125,8 @@ type_def(Metric, Type, Desc) ->

% support creating a metric series with multiple label/values.
% Instances is of the form [{[{LabelName, LabelValue}], Value}, ...]
to_prom(_Metric, _Type, _Desc, []) ->
[];
to_prom(Metric, Type, Desc, Instances) when is_list(Instances) ->
TypeStr = type_def(Metric, Type, Desc),
[TypeStr] ++ lists:flatmap(fun(Inst) -> to_prom(Metric, Inst) end, Instances);
Expand Down

0 comments on commit 98a356c

Please sign in to comment.