Skip to content

Commit

Permalink
Add support for setting recvbuf buffer size on a socket.
Browse files Browse the repository at this point in the history
Signed-off-by: Fred Dushin <fred@dushin.net>
  • Loading branch information
fadushin committed Dec 5, 2023
1 parent 8a97964 commit c2356cc
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added support for `crypto:one_time/4,5` on Unix and Pico as well as for `crypto:hash/2` on Pico
- Added ability to configure STM32 Nucleo boards onboard UART->USB-COM using the `-DBOARD=nucleo` cmake option
- Added STM32 cmake option `-DAVM_CFG_CONSOLE=` to select a different uart peripheral for the system console
- Added support for setting the default receive buffer size for sockets via `socket:setopt/3`

## [0.6.0-alpha.1] - 2023-10-09

Expand Down
2 changes: 2 additions & 0 deletions doc/src/programmers-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1804,12 +1804,14 @@ Currently, the following options are supported:
|------------|--------------|-------------|
| `{socket, reuseaddr}` | `boolean()` | Sets `SO_REUSEADDR` on the socket. |
| `{socket, linger}` | `#{onoff => boolean(), linger => non_neg_integer()}` | Sets `SO_LINGER` on the socekt. |
| `{otp, rcvbuf}` | `non_neg_integer()` | Sets the default buffer size (in bytes) on receive calls. This value is only used if the `Length` parameter of the `socket:recv` family of functions has the value `0`; otherwise, the specified non-zero length in the `socket:recv` takes precendence. |

For example:

%% erlang
ok = socket:setopt(Socket, {socket, reuseaddr}, true),
ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}),
ok = socket:setopt(Socket, {otp, rcvbuf}, 1024),

### UDP Socket Programming

Expand Down
170 changes: 115 additions & 55 deletions src/libAtomVM/otp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ struct SocketResource
uint64_t ref_ticks;
int32_t selecting_process_id;
ErlNifMonitor selecting_process_monitor;
size_t buf_size;
};
#elif OTP_SOCKET_LWIP
struct SocketResource
Expand All @@ -166,15 +167,19 @@ struct SocketResource
int linger_sec;
size_t pos;
struct ListHead received_list;
size_t buf_size;
};
#endif

static const char *const addr_atom = ATOM_STR("\x4", "addr");
static const char *const any_atom = ATOM_STR("\x3", "any");
static const char *const invalid_option_atom = ATOM_STR("\xE", "invalid_option");
static const char *const invalid_value_atom = ATOM_STR("\xD", "invalid_value");
static const char *const linger_atom = ATOM_STR("\x6", "linger");
static const char *const loopback_atom = ATOM_STR("\x8", "loopback");
static const char *const onoff_atom = ATOM_STR("\x5", "onoff");
static const char *const port_atom = ATOM_STR("\x4", "port");
static const char *const rcvbuf_atom = ATOM_STR("\x6", "rcvbuf");
static const char *const reuseaddr_atom = ATOM_STR("\x9", "reuseaddr");

#define CLOSED_FD 0
Expand All @@ -199,6 +204,19 @@ static const AtomStringIntPair otp_socket_shutdown_direction_table[] = {
SELECT_INT_DEFAULT(OtpSocketInvalidShutdownDirection)
};

enum otp_socket_setopt_level
{
OtpSocketInvalidSetoptLevel = 0,
OtpSocketSetoptLevelSocket,
OtpSocketSetoptLevelOTP
};

static const AtomStringIntPair otp_socket_setopt_level_table[] = {
{ ATOM_STR("\x6", "socket"), OtpSocketSetoptLevelSocket },
{ ATOM_STR("\x3", "otp"), OtpSocketSetoptLevelOTP },
SELECT_INT_DEFAULT(OtpSocketInvalidSetoptLevel)
};

#define DEFAULT_BUFFER_SIZE 512

#ifndef MIN
Expand Down Expand Up @@ -529,6 +547,7 @@ static term nif_socket_open(Context *ctx, int argc, term argv[])
LWIP_END();
}
#endif
rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;

if (UNLIKELY(memory_ensure_free(ctx, TERM_BOXED_RESOURCE_SIZE) != MEMORY_GC_OK)) {
AVM_LOGW(TAG, "Failed to allocate memory: %s:%i.", __FILE__, __LINE__);
Expand Down Expand Up @@ -701,6 +720,7 @@ static term nif_socket_close(Context *ctx, int argc, term argv[])
}

#endif
rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;

return OK_ATOM;
}
Expand All @@ -722,6 +742,7 @@ static struct SocketResource *make_accepted_socket_resource(struct tcp_pcb *newp
conn_rsrc_obj->pos = 0;
conn_rsrc_obj->linger_on = false;
conn_rsrc_obj->linger_sec = 0;
conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
list_init(&conn_rsrc_obj->received_list);

tcp_arg(newpcb, conn_rsrc_obj);
Expand Down Expand Up @@ -1016,62 +1037,100 @@ static term nif_socket_setopt(Context *ctx, int argc, term argv[])
term level_tuple = argv[1];
term value = argv[2];

term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) {
int option_value = (value == TRUE_ATOM);
term level = term_get_tuple_element(level_tuple, 0);
int level_val = interop_atom_term_select_int(otp_socket_setopt_level_table, level, global);
switch (level_val) {

case OtpSocketSetoptLevelSocket: {

term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, reuseaddr_atom)) {
int option_value = (value == TRUE_ATOM);
#if OTP_SOCKET_BSD
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_REUSEADDR, &option_value, sizeof(int));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
#elif OTP_SOCKET_LWIP
LWIP_BEGIN();
if (option_value) {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
} else {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
}
LWIP_END();
return OK_ATOM;
LWIP_BEGIN();
if (option_value) {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_set_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_set_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
} else {
if (rsrc_obj->socket_state & SocketStateTCP) {
ip_reset_option(rsrc_obj->tcp_pcb, SOF_REUSEADDR);
} else {
ip_reset_option(rsrc_obj->udp_pcb, SOF_REUSEADDR);
}
}
LWIP_END();
return OK_ATOM;
#endif
} else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) {
term onoff = interop_kv_get_value(value, onoff_atom, ctx->global);
term linger = interop_kv_get_value(value, linger_atom, ctx->global);
VALIDATE_VALUE(linger, term_is_integer);
} else if (globalcontext_is_term_equal_to_atom_string(global, opt, linger_atom)) {
term onoff = interop_kv_get_value(value, onoff_atom, ctx->global);
term linger = interop_kv_get_value(value, linger_atom, ctx->global);
VALIDATE_VALUE(linger, term_is_integer);

#if OTP_SOCKET_BSD
struct linger sl;
sl.l_onoff = (onoff == TRUE_ATOM);
sl.l_linger = term_to_int(linger);
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
struct linger sl;
sl.l_onoff = (onoff == TRUE_ATOM);
sl.l_linger = term_to_int(linger);
int res = setsockopt(rsrc_obj->fd, SOL_SOCKET, SO_LINGER, &sl, sizeof(sl));
if (UNLIKELY(res != 0)) {
return make_errno_tuple(ctx);
} else {
return OK_ATOM;
}
#elif OTP_SOCKET_LWIP
rsrc_obj->linger_on = (onoff == TRUE_ATOM);
rsrc_obj->linger_sec = term_to_int(linger);
return OK_ATOM;
rsrc_obj->linger_on = (onoff == TRUE_ATOM);
rsrc_obj->linger_sec = term_to_int(linger);
return OK_ATOM;
#endif
// TODO add more as needed
// int flag = 1;
// int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
// if (UNLIKELY(res != 0)) {
// AVM_LOGW(TAG, "Failed to set TCP_NODELAY.");
// }
} else {
RAISE_ERROR(BADARG_ATOM);
// TODO add more as needed
// int flag = 1;
// int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
// if (UNLIKELY(res != 0)) {
// AVM_LOGW(TAG, "Failed to set TCP_NODELAY.");
// }
} else {
RAISE_ERROR(BADARG_ATOM);
}

case OtpSocketSetoptLevelOTP: {
term opt = term_get_tuple_element(level_tuple, 1);
if (globalcontext_is_term_equal_to_atom_string(global, opt, rcvbuf_atom)) {
// socket:setopt(Socket, {otp, rcvbuf}, BufSize :: non_neg_integer())

if (UNLIKELY(!term_is_integer(value))) {
AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value must be an integer");
return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx);
}

avm_int_t buf_size = term_to_int(value);
if (UNLIKELY(buf_size < 0)) {
AVM_LOGE(TAG, "socket:setopt: otp rcvbuf value may not be negative");
return make_error_tuple(globalcontext_make_atom(global, invalid_value_atom), ctx);
}

rsrc_obj->buf_size = (size_t) buf_size;

return OK_ATOM;
} else {
AVM_LOGE(TAG, "socket:setopt: Unsupported otp option");
return make_error_tuple(globalcontext_make_atom(global, invalid_option_atom), ctx);
}
}

default: {
AVM_LOGE(TAG, "socket:setopt: Unsupported level");
RAISE_ERROR(BADARG_ATOM);
}
}
}
}

Expand Down Expand Up @@ -1442,6 +1501,7 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
struct SocketResource *conn_rsrc_obj = enif_alloc_resource(socket_resource_type, sizeof(struct SocketResource));
conn_rsrc_obj->fd = fd;
conn_rsrc_obj->selecting_process_id = INVALID_PROCESS_ID;
conn_rsrc_obj->buf_size = DEFAULT_BUFFER_SIZE;
TRACE("nif_socket_accept: Created socket on accept fd=%i\n", rsrc_obj->fd);

term obj = enif_make_resource(erl_nif_env_from_context(ctx), conn_rsrc_obj);
Expand Down Expand Up @@ -1652,8 +1712,7 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_
GlobalContext *global = ctx->global;

int flags = MSG_WAITALL;
// TODO parameterize buffer size
ssize_t res = recvfrom(rsrc_obj->fd, NULL, DEFAULT_BUFFER_SIZE, MSG_PEEK | flags, NULL, NULL);
ssize_t res = recvfrom(rsrc_obj->fd, NULL, rsrc_obj->buf_size, MSG_PEEK | flags, NULL, NULL);
TRACE("%li bytes available.\n", (long int) res);
if (res < 0) {
AVM_LOGI(TAG, "Unable to receive data on fd %i. errno=%i", rsrc_obj->fd, errno);
Expand All @@ -1662,7 +1721,9 @@ static term nif_socket_recv_with_peek(Context *ctx, struct SocketResource *rsrc_
TRACE("Peer closed socket %i.\n", rsrc_obj->fd);
return make_error_tuple(CLOSED_ATOM, ctx);
} else {
ssize_t buffer_size = len == 0 ? (ssize_t) res : MIN((size_t) res, len);
// user-supplied len has higher precedence than the default buffer size, but we also
// want the configured default buffer size to be a lower bound on anything we peek
ssize_t buffer_size = MIN(len == 0 ? (ssize_t) rsrc_obj->buf_size : (ssize_t) len, res);

// {ok, Data :: binary()}
// {ok, {Source :: #{addr => Address :: {0..255, 0..255, 0..255, 0..255}, port => Port :: non_neg_integer()}, Data :: binary()}}
Expand Down Expand Up @@ -1703,9 +1764,8 @@ static term nif_socket_recv_without_peek(Context *ctx, struct SocketResource *rs

GlobalContext *global = ctx->global;

// TODO plumb through buffer size
size_t buffer_size = len == 0 ? DEFAULT_BUFFER_SIZE : len;
uint8_t *buffer = malloc(buffer_size);
size_t buffer_size = len == 0 ? rsrc_obj->buf_size : len;
uint8_t *buffer = (uint8_t *) malloc(buffer_size);
term payload = term_invalid_term();

if (IS_NULL_PTR(buffer)) {
Expand Down
Loading

0 comments on commit c2356cc

Please sign in to comment.