Permalink
Browse files

Add tests for TCP drain resizing

  • Loading branch information...
1 parent bfa68f6 commit 6cd725a033a1d287c3f989bddb2faae7d75d5f7a @ferd ferd committed Oct 25, 2013
Showing with 96 additions and 11 deletions.
  1. +96 −11 test/logplex_tcp_drain_SUITE.erl
@@ -5,9 +5,9 @@
-define(PORT, 9601).
-define(DRAIN_BUFFER_SIZE,5).
-all() -> [full_stack].
+all() -> [full_stack, {group, with_tcp_server}].
-groups() -> [].
+groups() -> [{with_tcp_server,[],[shrink]}].
%%%%%%%%%%%%%%%%%%%%%%%
%%% SETUP / TEADOWN %%%
@@ -49,6 +49,8 @@ init_per_group(_, Config) ->
end_per_group(_, _Config) ->
ok.
+init_per_testcase(shrink, Config) ->
+ with_tcp_server([{channel, 1337} | Config]);
init_per_testcase(_, Config) ->
application:set_env(logplex, tcp_drain_buffer_size, 5),
Port = start_server(Config),
@@ -61,6 +63,8 @@ init_per_testcase(_, Config) ->
unlink(Pid),
[{channel,ChannelId},{port,Port},{drain,Pid} | Config].
+end_per_testcase(shrink, Config) ->
+ end_tcp_server(Config);
end_per_testcase(_, Config) ->
Drain = ?config(drain,Config),
erlang:monitor(process, Drain),
@@ -86,14 +90,6 @@ set_os_vars() ->
{"LOGPLEX_COOKIE", "ct test"}
]].
-start_server(_Config) ->
- {ok, Listen} = gen_tcp:listen(?PORT, [binary, {active,false}, {reuseaddr,true}]),
- Listen.
-
-stop_server(Config) ->
- Port = ?config(port, Config),
- catch gen_tcp:close(Port).
-
mock_drain_buffer() ->
Id = self(),
@@ -109,6 +105,40 @@ mock_drain_buffer() ->
end),
Id.
+%% start_server starts a listener and hands the port over, whereas
+%% with_tcp_server starts a real TCP server good for one request being accepted
+start_server(_Config) ->
+ {ok, Listen} = gen_tcp:listen(?PORT, [binary, {active,false}, {reuseaddr,true}]),
+ Listen.
+
+stop_server(Config) ->
+ Port = ?config(port, Config),
+ catch gen_tcp:close(Port).
+
+with_tcp_server(Config) ->
+ Self = self(),
+ Server = fun() ->
+ {ok, Listen} = gen_tcp:listen(0,[]),
+ {ok, Port} = inet:port(Listen),
+ Self ! {endpoint, self(), {{127,0,0,1}, Port}},
+ {ok, _Accept} = gen_tcp:accept(Listen),
+ timer:sleep(infinity)
+ end,
+ Pid = spawn_link(Server),
+ receive
+ {endpoint, Pid, {Ip, Port}} ->
+ [{endpoint, {Pid, Ip, Port}},
+ {channel, 1337} | Config]
+ after 5000 ->
+ error(bad_server)
+ end.
+
+end_tcp_server(Config) ->
+ {Pid, _, _} = ?config(endpoint, Config),
+ unlink(Pid),
+ exit(Pid, kill),
+ Config.
+
%%%%%%%%%%%%%
%%% TESTS %%%
@@ -141,7 +171,6 @@ full_stack(Config) ->
end),
{ok, Sock} = gen_tcp:accept(Listen, 5000),
Logs = receive_logs(Sock, 7),
- ct:pal("Res: ~p",[Logs]),
{match, _} = re:run(Logs, "mymsg1"),
nomatch = re:run(Logs, "mymsg2"),
{match, _} = re:run(Logs, "L10.*1 messages? dropped"),
@@ -151,6 +180,62 @@ full_stack(Config) ->
{match, _} = re:run(Logs, "mymsg6"),
{match, _} = re:run(Logs, "mymsg7").
+shrink(Config) ->
+ Msg = fun(M) -> {user, debug, logplex_syslog_utils:datetime(now),
+ "fakehost", "erlang", M}
+ end,
+ {_, Ip, Port} = ?config(endpoint, Config),
+ ChannelId = ?config(channel, Config),
+ FullBuff = lists:foldl(
+ fun(M,Buf) -> logplex_msg_buffer:push(M,Buf) end,
+ logplex_msg_buffer:new(10000),
+ [Msg(integer_to_binary(N)) || N <- lists:seq(1,10000)]
+ ),
+ Ref = make_ref(),
+ State0 = #state{
+ drain_id=1337,
+ drain_tok= <<"some token">>,
+ channel_id=ChannelId,
+ host=Ip,
+ port=Port,
+ buf = FullBuff,
+ reconnect_tref = Ref,
+ failures=1,
+ last_good_time = {0,0,0}
+ },
+ %% Posting will prompt a reconnection. Because our reference is invalid
+ %% *and* because we've had a failure before, the drain will assume we just
+ %% failed and will set up a backoff + timer, and attempt to shrink the
+ %% drains. Note that this is different than if we had received the timer
+ %% timeout event (which would have called do_reconnect)
+ {next_state, disconnected, State1, _} = logplex_tcpsyslog_drain:disconnected({post,Msg("a")}, State0),
+ #state{buf=Buf1} = State1,
+ 10 = logplex_msg_buffer:len(Buf1),
+ %% 9990 drops + the one triggered by {post, Msg}
+ {{loss_indication, 9991, _}, _} = logplex_msg_buffer:pop(Buf1),
+ 10 = logplex_msg_buffer:len(logplex_msg_buffer:push(msg, Buf1)),
+ %% Buffer seems sane. Now let's reconnect and see if it gets resized.
+ %% The server we used accepts only one connection and this one should
+ %% be established.
+ State2 = State1#state{sock=undefined, failures = 0, reconnect_tref = Ref},
+ %% force a reconnect. This calls 'do_reconnect', which
+ %% should succeed, then try to send data (which should succeed),
+ %% then go to 'sending'
+ {next_state, sending, State3} = logplex_tcpsyslog_drain:disconnected({timeout, Ref, reconnect}, State2),
+ #state{buf=Buf2} = State3,
+ 0 = logplex_msg_buffer:len(Buf2), % all stuff was sent
+ %% resized to configured default
+ Default = logplex_app:config(tcp_drain_buffer_size, 1024),
+ Default = logplex_msg_buffer:len(lists:foldl(
+ fun(M,Buf) -> logplex_msg_buffer:push(M,Buf) end,
+ Buf2,
+ [Msg(integer_to_binary(N)) || N <- lists:seq(1,10000)]
+ )).
+
+
+
+
+
%%%%%%%%%%%%%
%%% UTILS %%%
%%%%%%%%%%%%%

0 comments on commit 6cd725a

Please sign in to comment.