Skip to content

Commit

Permalink
MB-3224: Rate limit ns_port_server log messages to 100/sec
Browse files Browse the repository at this point in the history
This will help us avoid running the node out of RAM with messages in
error_logger's mailbox if we get a spammy port under heavy disk I/O
conditions.

Change-Id: I141c6e3fecec92a895c8a2972578140e1e65337c
Reviewed-on: http://review.membase.org/4143
Reviewed-by: Steve Yen <steve.yen@gmail.com>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information
Sean Lynch authored and steveyen committed Dec 21, 2010
1 parent 2a5e989 commit 1f072e5
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions src/ns_port_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,18 @@ init({Name, _Cmd, _Args, _Opts} = Params) ->
messages = ringbuffer:new(?NUM_MESSAGES)}}.

handle_info({_Port, {data, {_, Msg}}}, State) ->
State1 = log_messages([Msg], State),
{noreply, State1};
{Msgs, Dropped} = fetch_messages(99, 1000),
L = [Msg|Msgs],
%% Store the last messages in case of a crash
Messages = lists:foldl(fun ringbuffer:add/2, State#state.messages, L),
error_logger:info_msg(format_lines(State#state.name, L)),
if Dropped > 0 ->
?log_warning("Dropped ~p log lines from ~p",
[Dropped, State#state.name]);
true ->
ok
end,
{noreply, State#state{messages=Messages}};
handle_info({_Port, {exit_status, 0}}, State) ->
{stop, normal, State};
handle_info({_Port, {exit_status, Status}}, State) ->
Expand Down Expand Up @@ -87,21 +97,31 @@ code_change(_OldVsn, State, _Extra) ->

%% Internal functions

format_lines(Name, Lines) ->
Prefix = io_lib:format("~p~p: ", [Name, self()]),
[[Prefix, Line, $\n] || Line <- Lines].
%% @doc Fetch up to Max messages from the queue, discarding any more
%% received up to Timeout. The goal is to remove messages from the
%% queue as fast as possible if the port is spamming, avoiding
%% spamming the log server.
fetch_messages(Max, Timeout) ->
fetch_messages(Max, Timeout, now(), [], 0).

log_messages(L, State) ->
State1 = State#state{messages=ringbuffer:add(hd(L), State#state.messages)},
fetch_messages(Max, Timeout, Now, L, Dropped) ->
receive
{_Port, {data, {_, Msg}}} ->
log_messages([Msg|L], State1)
after 0 ->
%% split_log will reverse the lines
split_log(format_lines(State#state.name, L)),
State1
{L1, D1} = if length(L) < Max ->
{[Msg|L], Dropped};
true ->
%% Drop the message
{L, Dropped + 1}
end,
fetch_messages(Max, Timeout, Now, L1, D1)
after erlang:max(Timeout - timer:now_diff(now(), Now), 0) ->
{lists:reverse(L), Dropped}
end.

format_lines(Name, Lines) ->
Prefix = io_lib:format("~p~p: ", [Name, self()]),
[[Prefix, Line, $\n] || Line <- Lines].

open_port({_Name, Cmd, Args, OptsIn}) ->
%% Incoming options override existing ones (specified in proplists docs)
Opts0 = OptsIn ++ [{args, Args}, exit_status, {line, 8192},
Expand All @@ -116,18 +136,3 @@ open_port({_Name, Cmd, Args, OptsIn}) ->
Port ! {self(), {command, Data}}
end,
Port.

%% @doc Split the log into <64k chunks. The lines are fed in backwards.
split_log(L) ->
split_log(L, [], 0).

split_log([H|T] = L, M, N) ->
NewLength = N + lists:flatlength(H),
if NewLength > 65000 ->
error_logger:info_msg(lists:flatten(M)),
split_log(L);
true ->
split_log(T, [H|M], NewLength)
end;
split_log([], M, _) ->
error_logger:info_msg(lists:flatten(M)).

0 comments on commit 1f072e5

Please sign in to comment.