Skip to content

Commit

Permalink
Merge branch 'sf/erts_de_busy_limit' into dev
Browse files Browse the repository at this point in the history
* sf/erts_de_busy_limit:
  Add flag-based setting for the distribution buffer busy limit
  • Loading branch information
rickard-green committed Nov 2, 2010
2 parents 96419fe + 8faf174 commit 158ed71
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 12 deletions.
19 changes: 19 additions & 0 deletions erts/doc/src/erl.xml
Expand Up @@ -906,6 +906,25 @@
<seealso marker="kernel:error_logger#warning_map/0">error_logger(3)</seealso>
for further information.</p>
</item>
<tag><c><![CDATA[+zFlag Value]]></c></tag>
<item>
<p>Miscellaneous flags.</p>
<taglist>
<tag><marker id="+zdbbl"><c>+zdbbl size</c></marker></tag>
<item>
<p>Set the distribution buffer busy limit
(<seealso marker="erlang#system_info_dist_buf_busy_limit">dist_buf_busy_limit</seealso>)
in kilobytes. Valid range is 1-2097151. Default is 128.</p>
<p>A larger buffer limit will allow processes to buffer
more outgoing messages over the distribution. When the
buffer limit has been reached, sending processes will be
suspended until the buffer size has shrunk. The buffer
limit is per distribution channel. A higher limit will
give lower latency and higher throughput at the expense
of higher memory usage.</p>
</item>
</taglist>
</item>
</taglist>
</section>

Expand Down
7 changes: 7 additions & 0 deletions erts/doc/src/erlang.xml
Expand Up @@ -5624,6 +5624,13 @@ true</pre>
The return value will always be <c>false</c> since
the elib_malloc allocator has been removed.</p>
</item>
<tag><marker id="system_info_dist_buf_busy_limit"><c>dist_buf_busy_limit</c></marker></tag>
<item>
<p>Returns the value of the distribution buffer busy limit
in bytes. This limit can be set on startup by passing the
<seealso marker="erl#+zdbbl">+zdbbl</seealso> command line
flag to <c>erl</c>.</p>
</item>
<tag><c>fullsweep_after</c></tag>
<item>
<p>Returns <c>{fullsweep_after, int()}</c> which is the
Expand Down
8 changes: 4 additions & 4 deletions erts/emulator/beam/dist.c
Expand Up @@ -97,6 +97,8 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
#define PASS_THROUGH 'p' /* This code should go */

int erts_is_alive; /* System must be blocked on change */
int erts_dist_buf_busy_limit;


/* distribution trap functions */
Export* dsend2_trap = NULL;
Expand Down Expand Up @@ -1453,8 +1455,6 @@ int erts_net_message(Port *prt,
return -1;
}

#define ERTS_DE_BUSY_LIMIT (128*1024)

static int
dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
{
Expand Down Expand Up @@ -1540,7 +1540,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
ErtsProcList *plp = NULL;
erts_smp_spin_lock(&dep->qlock);
dep->qsize += size_obuf(obuf);
if (dep->qsize >= ERTS_DE_BUSY_LIMIT)
if (dep->qsize >= erts_dist_buf_busy_limit)
dep->qflgs |= ERTS_DE_QFLG_BUSY;
if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
erts_smp_spin_unlock(&dep->qlock);
Expand Down Expand Up @@ -1911,7 +1911,7 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
if (de_busy && !prt_busy && dep->qsize < ERTS_DE_BUSY_LIMIT) {
if (de_busy && !prt_busy && dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
Expand Down
3 changes: 2 additions & 1 deletion erts/emulator/beam/dist.h
Expand Up @@ -99,7 +99,8 @@ typedef struct {
#define ERTS_DE_IS_CONNECTED(DEP) \
(!ERTS_DE_IS_NOT_CONNECTED((DEP)))


#define ERTS_DE_BUSY_LIMIT (128*1024)
extern int erts_dist_buf_busy_limit;
extern int erts_is_alive;

/*
Expand Down
7 changes: 7 additions & 0 deletions erts/emulator/beam/erl_bif_info.c
Expand Up @@ -2533,6 +2533,13 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1)
BIF_RET(erts_nif_taints(BIF_P));
} else if (ERTS_IS_ATOM_STR("reader_groups_map", BIF_ARG_1)) {
BIF_RET(erts_get_reader_groups_map(BIF_P));
} else if (ERTS_IS_ATOM_STR("dist_buf_busy_limit", BIF_ARG_1)) {
Uint hsz = 0;

(void) erts_bld_uint(NULL, &hsz, erts_dist_buf_busy_limit);
hp = hsz ? HAlloc(BIF_P, hsz) : NULL;
res = erts_bld_uint(&hp, NULL, erts_dist_buf_busy_limit);
BIF_RET(res);
}

BIF_ERROR(BIF_P, BADARG);
Expand Down
25 changes: 23 additions & 2 deletions erts/emulator/beam/erl_init.c
Expand Up @@ -535,7 +535,8 @@ void erts_usage(void)

erts_fprintf(stderr, "-W<i|w> set error logger warnings mapping,\n");
erts_fprintf(stderr, " see error_logger documentation for details\n");

erts_fprintf(stderr, "-zdbbl size set the distribution buffer busy limit in kilobytes\n");
erts_fprintf(stderr, " valid range is [1-%d]\n", INT_MAX/1024);
erts_fprintf(stderr, "\n");
erts_fprintf(stderr, "Note that if the emulator is started with erlexec (typically\n");
erts_fprintf(stderr, "from the erl script), these flags should be specified with +.\n");
Expand Down Expand Up @@ -818,7 +819,7 @@ early_init(int *argc, char **argv) /*
erl_sys_args(argc, argv);

erts_ets_realloc_always_moves = 0;

erts_dist_buf_busy_limit = ERTS_DE_BUSY_LIMIT;
}

#ifndef ERTS_SMP
Expand Down Expand Up @@ -1346,6 +1347,26 @@ erl_start(int argc, char **argv)
}
break;

case 'z': {
char *sub_param = argv[i]+2;
int new_limit;

if (has_prefix("dbbl", sub_param)) {
arg = get_arg(sub_param+4, argv[i+1], &i);
new_limit = atoi(arg);
if (new_limit < 1 || INT_MAX/1024 < new_limit) {
erts_fprintf(stderr, "Invalid dbbl limit: %d\n", new_limit);
erts_usage();
} else {
erts_dist_buf_busy_limit = new_limit*1024;
}
} else {
erts_fprintf(stderr, "bad -z option %s\n", argv[i]);
erts_usage();
}
break;
}

default:
erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]);
erts_usage();
Expand Down
93 changes: 91 additions & 2 deletions erts/emulator/test/distribution_SUITE.erl
Expand Up @@ -27,6 +27,7 @@
-export([all/1,
ping/1, bulk_send/1, bulk_send_small/1,
bulk_send_big/1,
bulk_send_bigbig/1,
local_send/1, local_send_small/1, local_send_big/1,
local_send_legal/1, link_to_busy/1, exit_to_busy/1,
lost_exit/1, link_to_dead/1, link_to_dead_new_node/1,
Expand All @@ -50,7 +51,8 @@
-export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0,
roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1,
dist_parallel_sender/3, dist_parallel_receiver/0,
dist_evil_parallel_receiver/0]).
dist_evil_parallel_receiver/0,
sendersender/4, sendersender2/4]).

all(suite) -> [
ping, bulk_send, local_send, link_to_busy, exit_to_busy,
Expand Down Expand Up @@ -121,14 +123,17 @@ bulk_send(doc) ->
"the time. This tests that a process that is suspended on a ",
"busy port will eventually be resumed."];
bulk_send(suite) ->
[bulk_send_small, bulk_send_big].
[bulk_send_small, bulk_send_big, bulk_send_bigbig].

bulk_send_small(Config) when is_list(Config) ->
?line bulk_send(64, 32).

bulk_send_big(Config) when is_list(Config) ->
?line bulk_send(32, 64).

bulk_send_bigbig(Config) when is_list(Config) ->
?line bulk_sendsend(32*5, 4).

bulk_send(Terms, BinSize) ->
?line Dog = test_server:timetrap(test_server:seconds(30)),

Expand All @@ -145,6 +150,53 @@ bulk_send(Terms, BinSize) ->
?line test_server:timetrap_cancel(Dog),
{comment, integer_to_list(trunc(Size/1024/Elapsed+0.5)) ++ " K/s"}.

bulk_sendsend(Terms, BinSize) ->
{Rate1, MonitorCount1} = bulk_sendsend2(Terms, BinSize, 5),
{Rate2, MonitorCount2} = bulk_sendsend2(Terms, BinSize, 995),
Ratio = if MonitorCount2 == 0 -> MonitorCount1 / 1.0;
true -> MonitorCount1 / MonitorCount2
end,
%% A somewhat arbitrary ratio, but hopefully one that will accomodate
%% a wide range of CPU speeds.
true = (Ratio > 8.0),
{comment,
integer_to_list(Rate1) ++ " K/s, " ++
integer_to_list(Rate2) ++ " K/s, " ++
integer_to_list(MonitorCount1) ++ " monitor msgs, " ++
integer_to_list(MonitorCount2) ++ " monitor msgs, " ++
float_to_list(Ratio) ++ " monitor ratio"}.

bulk_sendsend2(Terms, BinSize, BusyBufSize) ->
?line Dog = test_server:timetrap(test_server:seconds(30)),

?line io:format("Sending ~w binaries, each of size ~w K",
[Terms, BinSize]),
?line {ok, NodeRecv} = start_node(bulk_receiver),
?line Recv = spawn(NodeRecv, erlang, apply, [fun receiver/2, [0, 0]]),
?line Bin = list_to_binary(lists:duplicate(BinSize*1024, 253)),
?line Size = Terms*size(Bin),

%% SLF LEFT OFF HERE.
%% When the caller uses small hunks, like 4k via
%% bulk_sendsend(32*5, 4), then (on my laptop at least), we get
%% zero monitor messages. But if we use "+zdbbl 5", then we
%% get a lot of monitor messages. So, if we can count up the
%% total number of monitor messages that we get when running both
%% default busy size and "+zdbbl 5", and if the 5 case gets
%% "many many more" monitor messages, then we know we're working.

?line {ok, NodeSend} = start_node(bulk_sender, "+zdbbl " ++ integer_to_list(BusyBufSize)),
?line _Send = spawn(NodeSend, erlang, apply, [fun sendersender/4, [self(), Recv, Bin, Terms]]),
?line {Elapsed, {TermsN, SizeN}, MonitorCount} =
receive {sendersender, BigRes} ->
BigRes
end,
?line stop_node(NodeRecv),
?line stop_node(NodeSend),

?line test_server:timetrap_cancel(Dog),
{trunc(SizeN/1024/Elapsed+0.5), MonitorCount}.

sender(To, _Bin, 0) ->
To ! {done, self()},
receive
Expand All @@ -155,6 +207,43 @@ sender(To, Bin, Left) ->
To ! {term, Bin},
sender(To, Bin, Left-1).

%% Sender process to be run on a slave node

sendersender(Parent, To, Bin, Left) ->
erlang:system_monitor(self(), [busy_dist_port]),
[spawn(fun() -> sendersender2(To, Bin, Left, false) end) ||
_ <- lists:seq(1,1)],
{USec, {Res, MonitorCount}} =
timer:tc(?MODULE, sendersender2, [To, Bin, Left, true]),
Parent ! {sendersender, {USec/1000000, Res, MonitorCount}}.

sendersender2(To, Bin, Left, SendDone) ->
sendersender3(To, Bin, Left, SendDone, 0).

sendersender3(To, _Bin, 0, SendDone, MonitorCount) ->
if SendDone ->
To ! {done, self()};
true ->
ok
end,
receive
{monitor, _Pid, _Type, _Info} = M ->
sendersender3(To, _Bin, 0, SendDone, MonitorCount + 1)
after 0 ->
if SendDone ->
receive
Any when is_tuple(Any), size(Any) == 2 ->
{Any, MonitorCount}
end;
true ->
exit(normal)
end
end;
sendersender3(To, Bin, Left, SendDone, MonitorCount) ->
To ! {term, Bin},
%%timer:sleep(50),
sendersender3(To, Bin, Left-1, SendDone, MonitorCount).

%% Receiver process to be run on a slave node.

receiver(Terms, Size) ->
Expand Down
22 changes: 21 additions & 1 deletion erts/etc/common/erlexec.c
Expand Up @@ -138,6 +138,12 @@ static char *plusr_val_switches[] = {
NULL
};

/* +z arguments with values */
static char *plusz_val_switches[] = {
"dbbl",
NULL
};


/*
* Define sleep(seconds) in terms of Sleep() on Windows.
Expand Down Expand Up @@ -909,6 +915,20 @@ int main(int argc, char **argv)
i++;
}
break;
case 'z':
if (!is_one_of_strings(&argv[i][2], plusz_val_switches)) {
goto the_default;
} else {
if (i+1 >= argc
|| argv[i+1][0] == '-'
|| argv[i+1][0] == '+')
usage(argv[i]);
argv[i][0] = '-';
add_Eargs(argv[i]);
add_Eargs(argv[i+1]);
i++;
}
break;
default:
the_default:
argv[i][0] = '-'; /* Change +option to -option. */
Expand Down Expand Up @@ -1096,7 +1116,7 @@ usage_aux(void)
"[+l] [+M<SUBSWITCH> <ARGUMENT>] [+P MAX_PROCS] [+R COMPAT_REL] "
"[+r] [+rg READER_GROUPS_LIMIT] [+s SCHEDULER_OPTION] "
"[+S NO_SCHEDULERS:NO_SCHEDULERS_ONLINE] [+T LEVEL] [+V] [+v] "
"[+W<i|w>] [args ...]\n");
"[+W<i|w>] [+z MISC_OPTION] [args ...]\n");
exit(1);
}

Expand Down
24 changes: 22 additions & 2 deletions erts/test/erlexec_SUITE.erl
Expand Up @@ -33,7 +33,7 @@

-export([all/1, init_per_testcase/2, fin_per_testcase/2]).

-export([args_file/1, evil_args_file/1, env/1, args_file_env/1, otp_7461/1, otp_7461_remote/1, otp_8209/1]).
-export([args_file/1, evil_args_file/1, env/1, args_file_env/1, otp_7461/1, otp_7461_remote/1, otp_8209/1, zdbbl_dist_buf_busy_limit/1]).

-include_lib("test_server/include/test_server.hrl").

Expand All @@ -53,7 +53,8 @@ fin_per_testcase(_Case, Config) ->

all(doc) -> [];
all(suite) ->
[args_file, evil_args_file, env, args_file_env, otp_7461, otp_8209].
[args_file, evil_args_file, env, args_file_env, otp_7461, otp_8209,
zdbbl_dist_buf_busy_limit].


otp_8209(doc) ->
Expand Down Expand Up @@ -330,6 +331,25 @@ otp_7461_remote([halt, Pid]) ->
io:format("halt order from ~p to node ~p\n",[Pid,node()]),
halt().

zdbbl_dist_buf_busy_limit(doc) ->
["Check +zdbbl flag"];
zdbbl_dist_buf_busy_limit(suite) ->
[];
zdbbl_dist_buf_busy_limit(Config) when is_list(Config) ->
LimKB = 1122233,
LimB = LimKB*1024,
?line {ok,[[PName]]} = init:get_argument(progname),
?line SNameS = "erlexec_test_02",
?line SName = list_to_atom(SNameS++"@"++
hd(tl(string:tokens(atom_to_list(node()),"@")))),
?line Cmd = PName ++ " -sname "++SNameS++" -setcookie "++
atom_to_list(erlang:get_cookie()) ++
" +zdbbl " ++ integer_to_list(LimKB),
?line open_port({spawn,Cmd},[]),
?line pong = loop_ping(SName,40),
?line LimB = rpc:call(SName,erlang,system_info,[dist_buf_busy_limit]),
?line ok = cleanup_node(SNameS, 10),
ok.


%%
Expand Down

0 comments on commit 158ed71

Please sign in to comment.