Permalink
Browse files

Merge branch 'sv-socket-active-n'

OTP-11368

* sv-socket-active-n:
  Update preloaded
  add {active,N} socket option for TCP, UDP, and SCTP
  • Loading branch information...
garazdawi committed Oct 7, 2013
2 parents 036727c + 1e6c600 commit 5ee95f1f10f879a9caec67248c27e74dacbf276d
@@ -86,6 +86,13 @@
#endif
typedef unsigned long long llu_t;
#ifndef INT16_MIN
#define INT16_MIN (-32768)
#endif
#ifndef INT16_MAX
#define INT16_MAX (32767)
#endif
#ifdef __WIN32__
#define STRNCASECMP strncasecmp
@@ -581,6 +588,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_PASSIVE 0 /* false */
#define INET_ACTIVE 1 /* true */
#define INET_ONCE 2 /* true; active once then passive */
#define INET_MULTI 3 /* true; active N then passive */
/* INET_REQ_GETSTATUS enumeration */
#define INET_F_OPEN 0x0001
@@ -925,6 +933,7 @@ typedef struct {
inet_async_op op_queue[INET_MAX_ASYNC]; /* call queue */
int active; /* 0 = passive, 1 = active, 2 = active once */
Sint16 active_count; /* counter for {active,N} */
int stype; /* socket type:
SOCK_STREAM/SOCK_DGRAM/SOCK_SEQPACKET */
int sprotocol; /* socket protocol:
@@ -1160,22 +1169,36 @@ static int packet_inet_output(udp_descriptor* udesc, HANDLE event);
static int async_ref = 0; /* async reference id generator */
#define NEW_ASYNC_ID() ((async_ref++) & 0xffff)
/* check for transition from active to passive */
#define INET_CHECK_ACTIVE_TO_PASSIVE(inet) \
do { \
if ((inet)->active == INET_ONCE) \
(inet)->active = INET_PASSIVE; \
else if ((inet)->active == INET_MULTI && --((inet)->active_count) == 0) { \
(inet)->active = INET_PASSIVE; \
packet_passive_message(inet); \
} \
} while (0)
static ErlDrvTermData am_ok;
static ErlDrvTermData am_tcp;
static ErlDrvTermData am_udp;
static ErlDrvTermData am_error;
static ErlDrvTermData am_einval;
static ErlDrvTermData am_inet_async;
static ErlDrvTermData am_inet_reply;
static ErlDrvTermData am_timeout;
static ErlDrvTermData am_closed;
static ErlDrvTermData am_tcp_passive;
static ErlDrvTermData am_tcp_closed;
static ErlDrvTermData am_tcp_error;
static ErlDrvTermData am_udp_passive;
static ErlDrvTermData am_udp_error;
static ErlDrvTermData am_empty_out_q;
static ErlDrvTermData am_ssl_tls;
#ifdef HAVE_SCTP
static ErlDrvTermData am_sctp;
static ErlDrvTermData am_sctp_passive;
static ErlDrvTermData am_sctp_error;
static ErlDrvTermData am_true;
static ErlDrvTermData am_false;
@@ -1185,6 +1208,7 @@ static ErlDrvTermData am_list;
static ErlDrvTermData am_binary;
static ErlDrvTermData am_active;
static ErlDrvTermData am_once;
static ErlDrvTermData am_multi;
static ErlDrvTermData am_buffer;
static ErlDrvTermData am_linger;
static ErlDrvTermData am_recbuf;
@@ -3336,6 +3360,34 @@ static int packet_binary_message
return erl_drv_output_term(desc->dport, spec, i);
}
/*
** active mode message: send active-to-passive transition message
** {tcp_passive, S} or
** {udp_passive, S} or
** {sctp_passive, S}
*/
static int packet_passive_message(inet_descriptor* desc)
{
ErlDrvTermData spec[6];
int i = 0;
DEBUGF(("packet_passive_message(%ld):\r\n", (long)desc->port));
if (desc->sprotocol == IPPROTO_TCP)
i = LOAD_ATOM(spec, i, am_tcp_passive);
else {
#ifdef HAVE_SCTP
i = LOAD_ATOM(spec, i, IS_SCTP(desc) ? am_sctp_passive : am_udp_passive);
#else
i = LOAD_ATOM(spec, i, am_udp_passive);
#endif
}
i = LOAD_PORT(spec, i, desc->dport);
i = LOAD_TUPLE(spec, i, 2);
ASSERT(i <= 6);
return erl_drv_output_term(desc->dport, spec, i);
}
/*
** send active message {udp_error|sctp_error, S, Error}
*/
@@ -3378,7 +3430,7 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len)
int code;
const char* body = buf;
int bodylen = len;
packet_get_body(desc->inet.htype, &body, &bodylen);
if (desc->inet.deliver == INET_DELIVER_PORT) {
@@ -3396,8 +3448,7 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len)
if (code < 0)
return code;
if (desc->inet.active == INET_ONCE)
desc->inet.active = INET_PASSIVE;
INET_CHECK_ACTIVE_TO_PASSIVE(INETP(desc));
return code;
}
@@ -3424,8 +3475,7 @@ tcp_reply_binary_data(tcp_descriptor* desc, ErlDrvBinary* bin, int offs, int len
}
if (code < 0)
return code;
if (desc->inet.active == INET_ONCE)
desc->inet.active = INET_PASSIVE;
INET_CHECK_ACTIVE_TO_PASSIVE(INETP(desc));
return code;
}
@@ -3448,8 +3498,7 @@ packet_reply_binary_data(inet_descriptor* desc, unsigned int hsz,
code = packet_binary_message(desc, bin, offs, len, extra);
if (code < 0)
return code;
if (desc->active == INET_ONCE)
desc->active = INET_PASSIVE;
INET_CHECK_ACTIVE_TO_PASSIVE(desc);
return code;
}
}
@@ -3494,6 +3543,7 @@ sock_init(void) /* May be called multiple times. */
#ifdef HAVE_SCTP
static void inet_init_sctp(void) {
INIT_ATOM(sctp);
INIT_ATOM(sctp_passive);
INIT_ATOM(sctp_error);
INIT_ATOM(true);
INIT_ATOM(false);
@@ -3503,6 +3553,7 @@ static void inet_init_sctp(void) {
INIT_ATOM(binary);
INIT_ATOM(active);
INIT_ATOM(once);
INIT_ATOM(multi);
INIT_ATOM(buffer);
INIT_ATOM(linger);
INIT_ATOM(recbuf);
@@ -3628,12 +3679,15 @@ static int inet_init()
INIT_ATOM(tcp);
INIT_ATOM(udp);
INIT_ATOM(error);
INIT_ATOM(einval);
INIT_ATOM(inet_async);
INIT_ATOM(inet_reply);
INIT_ATOM(timeout);
INIT_ATOM(closed);
INIT_ATOM(tcp_passive);
INIT_ATOM(tcp_closed);
INIT_ATOM(tcp_error);
INIT_ATOM(udp_passive);
INIT_ATOM(udp_error);
INIT_ATOM(empty_out_q);
INIT_ATOM(ssl_tls);
@@ -5508,8 +5562,25 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
case INET_LOPT_ACTIVE:
DEBUGF(("inet_set_opts(%ld): s=%d, ACTIVE=%d\r\n",
(long)desc->port, desc->s,ival));
(long)desc->port, desc->s, ival));
desc->active = ival;
if (desc->active == INET_MULTI) {
long ac = desc->active_count;
Sint16 nval = get_int16(ptr);
ptr += 2;
len -= 2;
ac += nval;
if (ac > INT16_MAX || ac < INT16_MIN)
return -1;
desc->active_count += nval;
if (desc->active_count < 0)
desc->active_count = 0;
if (desc->active_count == 0) {
desc->active = INET_PASSIVE;
packet_passive_message(desc);
}
} else
desc->active_count = 0;
if ((desc->stype == SOCK_STREAM) && (desc->active != INET_PASSIVE) &&
(desc->state == INET_STATE_CLOSED)) {
tcp_closed_message((tcp_descriptor *) desc);
@@ -5820,7 +5891,8 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
/* XXX fprintf(stderr,"desc->htype == %d, old_htype == %d,
desc->active == %d, old_active == %d\r\n",(int)desc->htype,
(int) old_htype, (int) desc->active, (int) old_active );*/
return 1+(desc->htype == old_htype && desc->active == INET_ONCE);
return 1+(desc->htype == old_htype &&
(desc->active == INET_ONCE || desc->active == INET_MULTI));
}
return 0;
}
@@ -5953,6 +6025,21 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len)
case INET_LOPT_ACTIVE:
desc->active = get_int32(curr); curr += 4;
if (desc->active == INET_MULTI) {
long ac = desc->active_count;
Sint16 nval = get_int16(curr); curr += 2;
ac += nval;
if (ac > INT16_MAX || ac < INT16_MIN)
return -1;
desc->active_count += nval;
if (desc->active_count < 0)
desc->active_count = 0;
if (desc->active_count == 0) {
desc->active = INET_PASSIVE;
packet_passive_message(desc);
}
} else
desc->active_count = 0;
res = 0;
continue;
@@ -6475,6 +6562,11 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
case INET_LOPT_ACTIVE:
*ptr++ = opt;
put_int32(desc->active, ptr);
if (desc->active == INET_MULTI) {
PLACE_FOR(2,ptr);
put_int16(desc->active_count, ptr);
ptr += 2;
}
continue;
case INET_LOPT_PACKET:
*ptr++ = opt;
@@ -6847,7 +6939,10 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc,
}
case INET_LOPT_ACTIVE:
{
PLACE_FOR(spec, i, 2*LOAD_ATOM_CNT + LOAD_TUPLE_CNT);
if (desc->active == INET_MULTI)
PLACE_FOR(spec, i, LOAD_ATOM_CNT + LOAD_INT_CNT + LOAD_TUPLE_CNT);
else
PLACE_FOR(spec, i, 2*LOAD_ATOM_CNT + LOAD_TUPLE_CNT);
i = LOAD_ATOM (spec, i, am_active);
switch (desc->active)
{
@@ -6860,6 +6955,9 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc,
case INET_ONCE :
{ i = LOAD_ATOM (spec, i, am_once); break; }
case INET_MULTI :
{ i = LOAD_INT(spec, i, desc->active_count); break; }
default: ASSERT (0);
}
i = LOAD_TUPLE (spec, i, 2);
@@ -7656,6 +7754,7 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
socket */
desc->deliver = INET_DELIVER_TERM; /* standard term format */
desc->active = INET_PASSIVE; /* start passive */
desc->active_count = 0;
desc->oph = NULL;
desc->opt = NULL;
Binary file not shown.
@@ -1237,7 +1237,8 @@ type_opt_1(buffer) -> int;
type_opt_1(active) ->
{enum,[{false, ?INET_PASSIVE},
{true, ?INET_ACTIVE},
{once, ?INET_ONCE}]};
{once, ?INET_ONCE},
{multi, ?INET_MULTI}]};
type_opt_1(packet) ->
{enum,[{0, ?TCP_PB_RAW},
{1, ?TCP_PB_1},
@@ -1716,11 +1717,14 @@ encode_opt_val(Opts) ->
Reason -> {error,Reason}
end.
%% {active, once} and {active, N} are specially optimized because they will
%% be used for every packet or every N packets, not only once when
%% initializing the socket. Measurements show that this optimization is
%% worthwhile.
enc_opt_val([{active,once}|Opts], Acc) ->
%% Specially optimized because {active,once} will be used for
%% every packet, not only once when initializing the socket.
%% Measurements show that this optimization is worthwhile.
enc_opt_val(Opts, [<<?INET_LOPT_ACTIVE:8,?INET_ONCE:32>>|Acc]);
enc_opt_val([{active,N}|Opts], Acc) when is_integer(N), N < 32768, N >= -32768 ->
enc_opt_val(Opts, [<<?INET_LOPT_ACTIVE:8,?INET_MULTI:32,N:16>>|Acc]);
enc_opt_val([{raw,P,O,B}|Opts], Acc) ->
enc_opt_val(Opts, Acc, raw, {P,O,B});
enc_opt_val([{Opt,Val}|Opts], Acc) ->
@@ -1810,6 +1814,14 @@ dec_opt_val([]) -> [].
dec_opt_val(Buf, raw, Type) ->
{{P,O,B},T} = dec_value(Type, Buf),
[{raw,P,O,B}|dec_opt_val(T)];
dec_opt_val(Buf, active, Type) ->
case dec_value(Type, Buf) of
{multi,[M0,M1|T]} ->
<<N:16>> = list_to_binary([M0,M1]),
[{active,N}|dec_opt_val(T)];
{Val,T} ->
[{active,Val}|dec_opt_val(T)]
end;
dec_opt_val(Buf, Opt, Type) ->
{Val,T} = dec_value(Type, Buf),
[{Opt,Val}|dec_opt_val(T)].
@@ -496,9 +496,11 @@
orthogonal to the sets of TCP, UDP and generic INET options:
only those options which are explicitly listed below are allowed
for SCTP sockets. Options can be set on the socket using
<c>gen_sctp:open/1,2</c> or <c>inet:setopts/2</c>,
retrieved using <c>inet:getopts/2</c>, and when calling
<c>gen_sctp:connect/4,5</c> options can be changed.</p>
<seealso marker="#open/1"><c>gen_sctp:open/1,2</c></seealso>
or <seealso marker="inet#setopts/2"><c>inet:setopts/2</c></seealso>,
retrieved using <seealso marker="inet#getopts/2"><c>inet:getopts/2</c></seealso>,
and when calling <seealso marker="#connect/4"><c>gen_sctp:connect/4,5</c></seealso>
options can be changed.</p>
<marker id="option-binary"></marker>
<marker id="option-list"></marker>
<taglist>
@@ -507,7 +509,7 @@
<p>Determines the type of data returned from <c>gen_sctp:recv/1,2</c>.</p>
<marker id="option-active"></marker>
</item>
<tag><c>{active, true|false|once}</c></tag>
<tag><c>{active, true|false|once|N}</c></tag>
<item>
<list type="bulleted">
<item>
@@ -524,11 +526,28 @@
</item>
<item>
<p>If <c>once</c>, only one message is automatically placed
in the message queue, after that the mode is automatically
re-set to passive. This provides flow control as well as
in the message queue, and after that the mode is automatically
reset to passive. This provides flow control as well as
the possibility for the receiver to listen for its incoming
SCTP data interleaved with other inter-process messages.</p>
</item>
<item>
<p>If <c>active</c> is specified as an integer <c>N</c> in the
range -32768 to 32767 (inclusive), then that number is added to
the socket's count of the number of data messages to be
delivered to the controlling process. If the result of the
addition would be negative, the count is set to 0. Once the
count reaches 0, either through the delivery of messages or by
being explicitly set with <seealso
marker="inet#setopts/2">inet:setopts/2</seealso>, the socket's
mode is automatically reset to passive (<c>{active,
false}</c>) mode. When a socket in this active mode transitions to
passive mode, the message <c>{sctp_passive, Socket}</c> is sent
to the controlling process to notify it that if it wants to
receive more data messages from the socket, it must call
<seealso marker="inet#setopts/2">inet:setopts/2</seealso> to set
the socket back into an active mode.</p>
</item>
</list>
</item>
<tag><c>{tos, integer()}</c></tag>
@@ -148,6 +148,12 @@ do_recv(Sock, Bs) ->
as messages:</p>
<code type="none">
{tcp, Socket, Data}</code>
<p>If the socket is in <c>{active, N}</c> mode (see <seealso marker="inet#setopts/2">
inet:setopts/2</seealso> for details) and its message counter
drops to 0, the following message is delivered to indicate that the
socket has transitioned to passive (<c>{active, false}</c>) mode:</p>
<code type="none">
{tcp_passive, Socket}</code>
<p>If the socket is closed, the following message is delivered:</p>
<code type="none">
{tcp_closed, Socket}</code>
Oops, something went wrong.

0 comments on commit 5ee95f1

Please sign in to comment.