Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

More flexible TCP Syslog drain shrinking

Shrinking is now dependent on number of messages in the buffer, presence
of dropped messages, and number of sequential failures instead of time
only.
  • Loading branch information...
commit ffe84cb118a027ab242f114b530d2e6f75b442eb 1 parent 52cfe2f
@ferd ferd authored
View
1  src/logplex_msg_buffer.erl
@@ -29,6 +29,7 @@
,push_ext/2
,len/1
,max_size/1
+ ,full/1
,empty/1
,pop/1
,resize/2
View
22 src/logplex_tcpsyslog_drain.erl
@@ -14,7 +14,7 @@
-define(SEND_TIMEOUT_MSG, send_timeout).
-define(SEND_TIMEOUT, timer:seconds(4)).
-define(HIBERNATE_TIMEOUT, 5000).
--define(SHRINK_TIMEOUT, timer:minutes(5)).
+-define(SHRINK_TRIES, 10).
-define(SHRINK_BUF_SIZE, 10).
@@ -391,13 +391,13 @@ reconnect(State = #state{failures = 0, last_good_time=T})
_EnoughTime ->
do_reconnect(State)
end;
-reconnect(State = #state{failures = F, last_good_time=LastGood, buf=Buf}) ->
+reconnect(State = #state{failures = F, buf=Buf}) ->
Max = logplex_app:config(tcp_syslog_backoff_max, 300),
BackOff = case length(integer_to_list(Max, 2)) of
MaxExp when F > MaxExp -> Max;
_ -> 1 bsl F
end,
- NewBuf = maybe_shrink(Buf, LastGood),
+ NewBuf = maybe_shrink(Buf, F),
%% We hibernate only when we need to reconnect with a timer. The timer
%% acts as a rate limiter! If you remove the timer, you must re-think
%% the hibernation.
@@ -566,16 +566,18 @@ maybe_resize(Buf) ->
false -> Buf
end.
-maybe_shrink(Buf, LastGood) ->
- case logplex_msg_buffer:max_size(Buf) =:= ?SHRINK_BUF_SIZE of
+maybe_shrink(Buf, Tries) ->
+ Max = logplex_msg_buffer:max_size(Buf),
+ case Max =:= ?SHRINK_BUF_SIZE of
true ->
Buf;
false ->
%% Shrink if we have never connected before or the last update time
- %% is more than ?SHRINK_TIMEOUT milliseconds old
- case (is_tuple(LastGood) andalso tuple_size(LastGood) =:= 3 andalso
- now_to_msec(LastGood) < now_to_msec(os:timestamp())-?SHRINK_TIMEOUT)
- orelse LastGood =:= undefined of
+ %% is more than ?SHRINK_TRIES old, and if the buffer is
+ %% currently full and dropping data
+ case full =:= logplex_msg_buffer:full(Buf) andalso
+ logplex_msg_buffer:lost(Buf) > 0 andalso
+ Tries > ?SHRINK_TRIES of
true ->
logplex_msg_buffer:resize(?SHRINK_BUF_SIZE, Buf);
false ->
@@ -583,6 +585,4 @@ maybe_shrink(Buf, LastGood) ->
end
end.
-now_to_msec({Mega,Sec,_}) -> (Mega*1000000 + Sec)*1000.
-
default_buf_size() -> logplex_app:config(tcp_drain_buffer_size, 1024).
View
30 test/logplex_tcp_drain_SUITE.erl
@@ -191,6 +191,7 @@ shrink(Config) ->
logplex_msg_buffer:new(10000),
[Msg(integer_to_binary(N)) || N <- lists:seq(1,10000)]
),
+ HalfBuff = logplex_msg_buffer:resize(20000, FullBuff),
Ref = make_ref(),
State0 = #state{
drain_id=1337,
@@ -208,27 +209,40 @@ shrink(Config) ->
%% failed and will set up a backoff + timer, and attempt to shrink the
%% drains. Note that this is different than if we had received the timer
%% timeout event (which would have called do_reconnect)
+ %%
+ %% First try, don't resize because we don't have enough failures
{next_state, disconnected, State1, _} = logplex_tcpsyslog_drain:disconnected({post,Msg("a")}, State0),
#state{buf=Buf1} = State1,
- 10 = logplex_msg_buffer:len(Buf1),
+ 10000 = logplex_msg_buffer:len(Buf1),
+ 10000 = logplex_msg_buffer:max_size(Buf1),
+ full = logplex_msg_buffer:full(Buf1),
+ %% then we don't resize because the buffer isn't full/hasn't lost data
+ {next_state, disconnected, State2, _} = logplex_tcpsyslog_drain:disconnected({post,Msg("a")}, State0#state{buf=HalfBuff, failures=100}),
+ #state{buf=Buf2} = State2,
+ 20000 = logplex_msg_buffer:max_size(Buf2),
+ have_space = logplex_msg_buffer:full(Buf2),
+ %% Then we lose because we have all the good criteria
+ {next_state, disconnected, State3, _} = logplex_tcpsyslog_drain:disconnected({post,Msg("a")}, State0#state{failures=100}),
+ #state{buf=Buf3} = State3,
+ 10 = logplex_msg_buffer:len(Buf3),
%% 9990 drops + the one triggered by {post, Msg}
- {{loss_indication, 9991, _}, _} = logplex_msg_buffer:pop(Buf1),
- 10 = logplex_msg_buffer:len(logplex_msg_buffer:push(msg, Buf1)),
+ {{loss_indication, 9991, _}, _} = logplex_msg_buffer:pop(Buf3),
+ 10 = logplex_msg_buffer:len(logplex_msg_buffer:push(msg, Buf3)),
%% Buffer seems sane. Now let's reconnect and see if it gets resized.
%% The server we used accepts only one connection and this one should
%% be established.
- State2 = State1#state{sock=undefined, failures = 0, reconnect_tref = Ref},
+ State4 = State3#state{sock=undefined, failures = 0, reconnect_tref = Ref},
%% force a reconnect. This calls 'do_reconnect', which
%% should succeed, then try to send data (which should succeed),
%% then go to 'sending'
- {next_state, sending, State3} = logplex_tcpsyslog_drain:disconnected({timeout, Ref, reconnect}, State2),
- #state{buf=Buf2} = State3,
- 0 = logplex_msg_buffer:len(Buf2), % all stuff was sent
+ {next_state, sending, State5} = logplex_tcpsyslog_drain:disconnected({timeout, Ref, reconnect}, State4),
+ #state{buf=Buf4} = State5,
+ 0 = logplex_msg_buffer:len(Buf4), % all stuff was sent
%% resized to configured default
Default = logplex_app:config(tcp_drain_buffer_size, 1024),
Default = logplex_msg_buffer:len(lists:foldl(
fun(M,Buf) -> logplex_msg_buffer:push(M,Buf) end,
- Buf2,
+ Buf4,
[Msg(integer_to_binary(N)) || N <- lists:seq(1,10000)]
)).
View
2  upgrades/v69.9_v69.10/live_upgrade.erl
@@ -23,7 +23,7 @@ UpgradeNode = fun () ->
%% resume all drains before going for the stateless drain buffer update
l(logplex_msg_buffer),
l(logplex_tcpsyslog_drain),
- [logplex_tcpsyslog_drain:resize_msg_buffer(Pid,1024)
+ [catch logplex_tcpsyslog_drain:resize_msg_buffer(Pid,1024)
|| {Pid, tcpsyslog} <- gproc:lookup_local_properties(drain_type)],
Please sign in to comment.
Something went wrong with that request. Please try again.