Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for setting rcvbuf buffer size on a socket #907

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added STM32 cmake option `-DAVM_CFG_CONSOLE=` to select a different uart peripheral for the system console
- Added `crypto:strong_rand_bytes/1` using Mbed-TLS (only on generic_unix, ESP32 and RP2040
platforms)
- Added support for setting the default receive buffer size for sockets via `socket:setopt/3`

### Removed

Expand Down
2 changes: 2 additions & 0 deletions doc/src/programmers-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1804,12 +1804,14 @@ Currently, the following options are supported:
|------------|--------------|-------------|
| `{socket, reuseaddr}` | `boolean()` | Sets `SO_REUSEADDR` on the socket. |
| `{socket, linger}` | `#{onoff => boolean(), linger => non_neg_integer()}` | Sets `SO_LINGER` on the socekt. |
| `{otp, rcvbuf}` | `non_neg_integer()` | Sets the default buffer size (in bytes) on receive calls. This value is only used if the `Length` parameter of the `socket:recv` family of functions has the value `0`; otherwise, the specified non-zero length in the `socket:recv` takes precendence. Note that the OTP option value `default` is not currently supported.|

For example:

%% erlang
ok = socket:setopt(Socket, {socket, reuseaddr}, true),
ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}),
ok = socket:setopt(Socket, {otp, rcvbuf}, 1024),

### UDP Socket Programming

Expand Down
171 changes: 116 additions & 55 deletions src/libAtomVM/otp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ struct SocketResource
uint64_t ref_ticks;
int32_t selecting_process_id;
ErlNifMonitor selecting_process_monitor;
size_t buf_size;
pguyot marked this conversation as resolved.
Show resolved Hide resolved
};
#elif OTP_SOCKET_LWIP
struct SocketResource
Expand All @@ -166,15 +167,19 @@ struct SocketResource
int linger_sec;
size_t pos;
struct ListHead received_list;
size_t buf_size;
fadushin marked this conversation as resolved.
Show resolved Hide resolved
};
#endif

static const char *const addr_atom = ATOM_STR("\x4", "addr");
static const char *const any_atom = ATOM_STR("\x3", "any");
static const char *const invalid_option_atom = ATOM_STR("\xE", "invalid_option");
static const char *const invalid_value_atom = ATOM_STR("\xD", "invalid_value");
static const char *const linger_atom = ATOM_STR("\x6", "linger");
static const char *const loopback_atom = ATOM_STR("\x8", "loopback");
static const char *const onoff_atom = ATOM_STR("\x5", "onoff");
static const char *const port_atom = ATOM_STR("\x4", "port");
static const char *const rcvbuf_atom = ATOM_STR("\x6", "rcvbuf");
static const char *const reuseaddr_atom = ATOM_STR("\x9", "reuseaddr");

#define CLOSED_FD 0
Expand All @@ -199,6 +204,19 @@ static const AtomStringIntPair otp_socket_shutdown_direction_table[] = {
SELECT_INT_DEFAULT(OtpSocketInvalidShutdownDirection)
};

enum otp_socket_setopt_level
{
OtpSocketInvalidSetoptLevel = 0,
OtpSocketSetoptLevelSocket,
OtpSocketSetoptLevelOTP
};

static const AtomStringIntPair otp_socket_setopt_level_table[] = {
{ ATOM_STR("\x6", "socket"), OtpSocketSetoptLevelSocket },
{ ATOM_STR("\x3", "otp"), OtpSocketSetoptLevelOTP },
SELECT_INT_DEFAULT(OtpSocketInvalidSetoptLevel)
};

#define DEFAULT_BUFFER_SIZE 512

#ifndef MIN
Expand Down Expand Up @@ -529,6 +547,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[])
LWIP_END();
}
#endif
rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;

if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_RESOURCE_SIZE) != MEMORY_GC_OK)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
Expand Down Expand Up @@ -701,6 +720,7 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])
}

#endif
rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
fadushin marked this conversation as resolved.
Show resolved Hide resolved

return OK_ATOM;
}
Expand All @@ -722,6 +742,7 @@ static struct SocketResource *make_accepted_socket_resource(struct tcp_pcb *newp
conn_rsrc_obj->pos = 0;
conn_rsrc_obj->linger_on = false;
conn_rsrc_obj->linger_sec = 0;
conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
list_init(&conn_rsrc_obj->received_list);

tcp_arg(newpcb, conn_rsrc_obj);
Expand Down Expand Up @@ -1016,62 +1037,101 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
term level_tuple = argv[1];
term value = argv[2];

term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) {
int option_value = (value == TRUE_ATOM);
term level = term_get_tuple_element(level_tuple, 0);
int level_val = interop_atom_term_select_int(otp_socket_setopt_level_table, level, global);
switch (level_val) {

case OtpSocketSetoptLevelSocket: {

term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) {
int option_value = (value == TRUE_ATOM);
#if OTP_SOCKET_BSD
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
#elif OTP_SOCKET_LWIP
LWIP_BEGIN();
if (option_value) {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
} else {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
}
LWIP_END();
return OK_ATOM;
LWIP_BEGIN();
if (option_value) {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
} else {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
}
LWIP_END();
return OK_ATOM;
#endif
} else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) {
term onoff = interop_kv_get_value(value, onoff_atom, ctx->global);
term linger = interop_kv_get_value(value, linger_atom, ctx->global);
VALIDATE_VALUE(linger, term_is_integer);
} else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) {
term onoff = interop_kv_get_value(value, onoff_atom, ctx->global);
term linger = interop_kv_get_value(value, linger_atom, ctx->global);
VALIDATE_VALUE(linger, term_is_integer);

#if OTP_SOCKET_BSD
struct linger sl;
sl.l_onoff = (onoff == TRUE_ATOM);
sl.l_linger = term_to_int(linger);
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
struct linger sl;
sl.l_onoff = (onoff == TRUE_ATOM);
sl.l_linger = term_to_int(linger);
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
#elif OTP_SOCKET_LWIP
rsrc_obj->linger_on = (onoff == TRUE_ATOM);
rsrc_obj->linger_sec = term_to_int(linger);
return OK_ATOM;
rsrc_obj->linger_on = (onoff == TRUE_ATOM);
rsrc_obj->linger_sec = term_to_int(linger);
return OK_ATOM;
#endif
// TODO add more as needed
// int flag = 1;
// int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
// if (UNLIKELY(res != 0)) {
// AVM_LOGW(TAG, "Failed to set TCP_NODELAY.");
// }
} else {
RAISE_ERROR(BADARG_ATOM);
// TODO add more as needed
// int flag = 1;
// int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
// if (UNLIKELY(res != 0)) {
// AVM_LOGW(TAG, "Failed to set TCP_NODELAY.");
// }
} else {
RAISE_ERROR(BADARG_ATOM);
}

case OtpSocketSetoptLevelOTP: {
term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, rcvbuf_atom)) {
// socket:setopt(Socket, {otp, rcvbuf}, BufSize :: non_neg_integer())

// TODO support the atom `default` as a value to roll back to the default buffer size
if (UNLIKELY(!term_is_integer(value))) {
AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value must be an integer");
return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx);
}

avm_int_t buf_size = term_to_int(value);
if (UNLIKELY(buf_size < 0)) {
AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value may not be negative");
return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx);
}

rsrc_obj->buf_size = (size_t) buf_size;
pguyot marked this conversation as resolved.
Show resolved Hide resolved

return OK_ATOM;
} else {
AVM_LOGE(TAG, "socket:setopt: Unsupported otp option");
return make_error_tuple(globalcontext_make_atom(global, invalid_option_atom), ctx);
}
}

default: {
AVM_LOGE(TAG, "socket:setopt: Unsupported level");
RAISE_ERROR(BADARG_ATOM);
}
}
}
}

Expand Down Expand Up @@ -1442,6 +1502,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource));
conn_rsrc_obj->fd = fd;
conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd);

term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj);
Expand Down Expand Up @@ -1652,8 +1713,7 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_
GlobalContext *global = ctx->global;

int flags = MSG_WAITALL;
// TODO parameterize buffer size
ssize_t res = recvfrom(rsrc_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL);
ssize_t res = recvfrom(rsrc_obj->fd, NULL, rsrc_obj->buf_size, MSG_PEEK | flags, NULL, NULL);
pguyot marked this conversation as resolved.
Show resolved Hide resolved
TRACE("%li bytes available.\n", (long int) res);
if (res < 0) {
AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno);
Expand All @@ -1662,7 +1722,9 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_
TRACE("Peer closed socket %i.\n", rsrc_obj->fd);
return make_error_tuple(CLOSED_ATOM, ctx);
} else {
ssize_t buffer_size = len == 0 ? (ssize_t) res : MIN((size_t) res, len);
// user-supplied len has higher precedence than the default buffer size, but we also
// want the configured default buffer size to be a lower bound on anything we peek
ssize_t buffer_size = MIN(len == 0 ? (ssize_t) rsrc_obj->buf_size : (ssize_t) len, res);

// {ok, Data :: binary()}
// {ok, {Source :: #{addr => Address :: {0..255, 0..255, 0..255, 0..255}, port => Port :: non_neg_integer()}, Data :: binary()}}
Expand Down Expand Up @@ -1703,9 +1765,8 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs

GlobalContext *global = ctx->global;

// TODO plumb through buffer size
size_t buffer_size = len == 0 ? DEFAULT_BUFFER_SIZE : len;
uint8_t *buffer = malloc(buffer_size);
size_t buffer_size = len == 0 ? rsrc_obj->buf_size : len;
pguyot marked this conversation as resolved.
Show resolved Hide resolved
uint8_t *buffer = (uint8_t *) malloc(buffer_size);
term payload = term_invalid_term();

if (IS_NULL_PTR(buffer)) {
Expand Down
Loading