Skip to content
Browse files

shrinking of HTTP drains

Inactive drains (disconnected for too long) get shrunk down, and resized
when going back online.
  • Loading branch information...
1 parent 4b39434 commit a4f2df539620f4f39991aa16f002ecdd2c37d4ab @ferd ferd committed Oct 29, 2013
Showing with 122 additions and 8 deletions.
  1. +40 −7 src/logplex_http_drain.erl
  2. +82 −1 test/logplex_http_drain_SUITE.erl
View
47 src/logplex_http_drain.erl
@@ -27,6 +27,8 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("ex_uri/include/ex_uri.hrl").
-define(HIBERNATE_TIMEOUT, 5000).
+-define(SHRINK_TIMEOUT, timer:minutes(5)).
+-define(SHRINK_BUF_SIZE, 10).
-type drop_info() :: {erlang:timestamp(), pos_integer()}.
@@ -39,7 +41,10 @@
client :: pid(),
out_q = queue:new() :: queue(),
reconnect_tref :: reference() | 'undefined',
- drop_info :: drop_info() | 'undefined'
+ drop_info :: drop_info() | 'undefined',
+ %% Last time we connected or successfully sent data
+ last_good_time :: 'undefined' | erlang:timestamp(),
+ service = normal :: 'normal' | 'degraded'
}).
-record(frame, {frame :: iolist(),
@@ -235,6 +240,8 @@ handle_info(Info, StateName, State) ->
try_connect(State = #state{uri=Uri,
drain_id=DrainId,
channel_id=ChannelId,
+ buf=Buf,
+ service=Status,
client=undefined}) ->
{Scheme, Host, Port} = connection_info(Uri),
ConnectStart = os:timestamp(),
@@ -247,7 +254,13 @@ try_connect(State = #state{uri=Uri,
?INFO("drain_id=~p channel_id=~p dest=~s at=try_connect "
"attempt=success connect_time=~p",
log_info(State, [ltcy(ConnectStart, ConnectEnd)])),
- ready_to_send(State#state{client=Pid});
+ case Status of
+ normal ->
+ ok;
+ degraded ->
+ logplex_drain_buffer:resize_msg_buffer(Buf, default_buf_size())
+ end,
+ ready_to_send(State#state{client=Pid, service=normal});
Why ->
ConnectEnd = os:timestamp(),
?WARN("drain_id=~p channel_id=~p dest=~s at=try_connect "
@@ -257,7 +270,10 @@ try_connect(State = #state{uri=Uri,
end.
%% @private
-http_fail(State = #state{client = Client}) ->
+http_fail(State = #state{client=Client,
+ buf=Buf,
+ last_good_time=LastGood,
+ service=Status}) ->
%% Close any existing client connection.
NewState = case Client of
Pid when is_pid(Pid) ->
@@ -266,12 +282,24 @@ http_fail(State = #state{client = Client}) ->
undefined ->
State
end,
+ Service = case {(is_tuple(LastGood) andalso tuple_size(LastGood) =:= 3 andalso
+ now_to_msec(LastGood) < now_to_msec(os:timestamp())-?SHRINK_TIMEOUT)
+ orelse LastGood =:= undefined,
+ Status} of
+ {true, normal} ->
+ logplex_drain_buffer:resize_msg_buffer(Buf, ?SHRINK_BUF_SIZE),
+ degraded;
+ {_, _} ->
+ normal
+ end,
%% We hibernate only when we need to reconnect with a timer. The timer
%% acts as a rate limiter! If you remove the timer, you must re-think
%% the hibernation.
case set_reconnect_timer(NewState) of
- NewState -> {next_state, disconnected, NewState};
- ReconnectState -> {next_state, disconnected, ReconnectState, hibernate}
+ NewState ->
+ {next_state, disconnected, NewState#state{service=Service}};
+ ReconnectState ->
+ {next_state, disconnected, ReconnectState#state{service=Service}, hibernate}
end.
%% @private
@@ -435,7 +463,8 @@ lost_msgs(Lost, S=#state{drop_info={TS,Dropped}}) ->
%% @private
%% if we had failures, they should have been delivered with this frame
-sent_frame(#frame{msg_count=Count, loss_count=Lost}, State=#state{drop_info=Drop}) ->
+sent_frame(#frame{msg_count=Count, loss_count=Lost}, State0=#state{drop_info=Drop}) ->
+ State = State0#state{last_good_time=os:timestamp()},
msg_stat(drain_delivered, Count, State),
logplex_realtime:incr(drain_delivered, Count),
case {Lost,Drop} of
@@ -557,7 +586,11 @@ ltcy(Start, End) ->
start_drain_buffer(State = #state{channel_id=ChannelId,
buf = undefined}) ->
- Size = logplex_app:config(http_drain_buffer_size, 1024),
+ Size = default_buf_size(),
{ok, Buf} = logplex_drain_buffer:start_link(ChannelId, self(),
notify, Size),
State#state{buf = Buf}.
+
+now_to_msec({Mega,Sec,_}) -> (Mega*1000000 + Sec)*1000.
+
+default_buf_size() -> logplex_app:config(http_drain_buffer_size, 1024).
View
83 test/logplex_http_drain_SUITE.erl
@@ -1,13 +1,28 @@
-module(logplex_http_drain_SUITE).
-include_lib("common_test/include/ct.hrl").
+-include_lib("ex_uri/include/ex_uri.hrl").
-compile(export_all).
all() -> [{group, overflow},
{group, drain_buf}].
groups() -> [{overflow, [], [full_buffer_success, full_buffer_fail,
full_buffer_temp_fail, full_stack]},
- {drain_buf, [], [restart_drain_buf]}].
+ {drain_buf, [], [restart_drain_buf, shrink]}].
+
+-record(state, {drain_id :: logplex_drain:id(),
+ drain_tok :: logplex_drain:token(),
+ channel_id :: logplex_channel:id(),
+ uri :: #ex_uri{},
+ buf :: pid(),
+ client :: pid(),
+ out_q = queue:new() :: queue(),
+ reconnect_tref :: reference() | 'undefined',
+ drop_info,
+ %% Last time we connected or successfully sent data
+ last_good_time :: 'undefined' | erlang:timestamp(),
+ service = normal :: 'normal' | 'degraded'
+ }).
init_per_suite(Config) ->
set_os_vars(),
@@ -85,6 +100,36 @@ init_per_testcase(full_stack, Config) ->
unlink(Pid),
[{channel, ChannelId}, {drain_id, DrainId}, {drain_tok, DrainTok},
{uri, URI}, {drain,Pid}, {client, Tab} | Config];
+init_per_testcase(shrink, Config) ->
+ %% Drain data
+ ChannelId = 1337,
+ DrainId = 2198712,
+ DrainTok = "d.12931-e21-312213-12321",
+ {ok,URI,_} = ex_uri:decode("http://example.org:80"),
+ %% --- Mocks ---
+ %% Drain buffer
+ Ref = mock_drain_buffer(),
+ %% HTTP Client
+ meck:new(logplex_http_client, [passthrough]),
+ meck:expect(logplex_http_client, start_link,
+ fun(_Drain, _Channel, _Uri, _Scheme, _Host, _Port, _Timeout) ->
+ {ok, self()}
+ end),
+ meck:expect(logplex_http_client, close, fun(_Pid) -> ok end),
+ %% We make failure controleable by helper functions.
+ %% Rube goldberg-esque, but makes writing the actual test
+ %% a bit simpler. Succeeds by default
+ Tab = client_call_init(),
+ meck:expect(logplex_http_client, raw_request,
+ fun(_Pid, _Req, _Timeout) ->
+ Status = client_call_status(Tab),
+ {ok, Status, []}
+ end),
+ %% Starting the drain
+ {ok, Pid} = logplex_http_drain:start_link(ChannelId, DrainId, DrainTok, URI),
+ unlink(Pid),
+ [{channel, ChannelId}, {drain_id, DrainId}, {drain_tok, DrainTok},
+ {uri, URI}, {buffer, Ref}, {drain,Pid}, {client, Tab} | Config];
init_per_testcase(_, Config) ->
%% Drain data
ChannelId = 1337,
@@ -152,6 +197,11 @@ mock_drain_buffer() ->
fun(_Buf, _Bytes, _Fun) ->
ok
end),
+ meck:expect(logplex_drain_buffer, resize_msg_buffer,
+ fun(_Buf, Size) when Size > 0 ->
+ ct:pal("CALLED: (~p, ~p)", [_Buf, Size]),
+ ok
+ end),
Id.
client_call_init() ->
@@ -354,6 +404,37 @@ restart_drain_buf(Config) ->
ct:pal("New buf ~p", [Buf1]),
false = Buf0 =:= Buf1.
+shrink(Config) ->
+ Buf = ?config(buffer, Config),
+ State1 = #state{last_good_time={0,0,0},
+ buf = Buf,
+ uri = ?config(uri, Config),
+ service=normal},
+ meck:expect(logplex_http_client, start_link,
+ fun(_Drain, _Channel, _Uri, _Scheme, _Host, _Port, _Timeout) ->
+ {error, mocked}
+ end),
+ %% Simulate an event according to which we've been disconnected
+ %% for a long time. This goes -> cancel_reconnect -> try_connect.
+ %% This in turns tries to connect, which fails because of the mock above.
+ %% This in turns calls http_fail/1, which will call resize on the buffer
+ Res1 = logplex_http_drain:disconnected({logplex_drain_buffer, Buf, new_data}, State1),
+ %% This remains disconnected and degraded the service
+ {next_state, disconnected, State2=#state{service=degraded}, hibernate} = Res1,
+ %% also called the drain buffer for a resize down to 10
+ wait_for_mocked_call(logplex_drain_buffer, resize_msg_buffer, ['_',10], 1, 1000),
+ %% This worked, so let's see the opposite -- that the size is brought back up
+ %% to whatever configured value we have:
+ Val = logplex_app:config(http_drain_buffer_size, 1024),
+ meck:expect(logplex_http_client, start_link,
+ fun(_Drain, _Channel, _Uri, _Scheme, _Host, _Port, _Timeout) ->
+ {ok, self()}
+ end),
+ Res2 = logplex_http_drain:disconnected({logplex_drain_buffer, Buf, new_data}, State2),
+ {next_state, connected, #state{service=normal}, _} = Res2,
+ %% The buffer was resized
+ wait_for_mocked_call(logplex_drain_buffer, resize_msg_buffer, ['_',Val], 1, 1000).
+
%%% HELPERS
wait_for_mocked_call(Mod, Fun, Args, NumCalls, Time) ->
wait_for_mocked_call(Mod,Fun,Args, NumCalls, Time*1000,os:timestamp()).

0 comments on commit a4f2df5

Please sign in to comment.
Something went wrong with that request. Please try again.