Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed ESP32 I2C driver resource leaks, half-closed state, and close-during-transmission errors
- Fixed several underallocation issues that could trigger data corruption on `binary:replace`, `zlib:compress` and bsd socket recv code.
- Fixed a bug where `catch` would raise on regular atom results
- Fixed ESP32 socket driver holding the global socket-list lock across blocking TCP connects

## [0.7.0-alpha.1] - 2026-04-06

Expand Down
129 changes: 92 additions & 37 deletions src/platforms/esp32/components/avm_builtins/socket_driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,14 @@ EventListener *socket_events_handler(GlobalContext *glb, EventListener *listener
while (xQueueReceive(netconn_events, &event, 1) == pdTRUE) {
TRACE("Got netconn: %p, len = %d\n", (void *) event.netconn, event.len);
struct SocketData *socket = NULL;
int32_t socket_process_id = 0;
struct ListHead *socket_head;
struct ListHead *socket_list = synclist_rdlock(&platform->sockets);
LIST_FOR_EACH (socket_head, socket_list) {
struct SocketData *current_socket = GET_LIST_ENTRY(socket_head, struct SocketData, sockets_head);
if (current_socket->conn == event.netconn) {
socket = current_socket;
socket_process_id = current_socket->process_id;
break;
}
}
Expand All @@ -319,7 +321,7 @@ EventListener *socket_events_handler(GlobalContext *glb, EventListener *listener
term message = term_alloc_tuple(2, &heap);
term_put_tuple_element(message, 0, globalcontext_make_atom(glb, netconn_event_internal));
term_put_tuple_element(message, 1, term_from_int(event.len));
globalcontext_send_message(glb, socket->process_id, message);
globalcontext_send_message(glb, socket_process_id, message);
END_WITH_STACK_HEAP(heap, glb)
}
}
Expand Down Expand Up @@ -491,27 +493,50 @@ static void do_send_error_reply(Context *ctx, err_t status, uint64_t ref_ticks,
do_send_reply(ctx, error_tuple, ref_ticks, pid);
}

static void accept_conn(Context *ctx, struct TCPServerSocketData *tcp_data, uint64_t ref_ticks, int32_t pid)
static void purge_ready_connections(struct ESP32PlatformData *platform, struct netconn *conn)
{
struct ListHead *item;
struct ListHead *tmp;
MUTABLE_LIST_FOR_EACH (item, tmp, &platform->ready_connections) {
struct ReadyConnection *ready = GET_LIST_ENTRY(item, struct ReadyConnection, ready_connection_head);
if (ready->netconn == conn) {
list_remove(item);
free(ready);
}
}
}

static err_t accept_conn(Context *ctx, struct TCPServerSocketData *tcp_data, uint64_t ref_ticks, int32_t pid)
{
TRACE("Going to accept a TCP connection\n");
GlobalContext *glb = ctx->global;
struct ESP32PlatformData *platform = ctx->global->platform_data;

struct netconn *accepted_conn;

// There is a bug in lwIP: the callbacks may be called before
// netconn_accept is called. Locking the list of sockets is not going
// to help in this case.
struct ListHead *sockets = socket_data_preinit(platform);
// The accept is non-blocking: a spurious ready event (a stale event
// carrying a recycled conn pointer) yields ERR_WOULDBLOCK and the caller
// re-registers the accepter instead of blocking forever.
// Note there is a bug in lwIP: the callbacks may be called before
// netconn_accept is called; events received before the accepted socket
// is published are kept in ready_connections and drained below.
netconn_set_nonblocking(tcp_data->socket_data.conn, 1);
err_t status = netconn_accept(tcp_data->socket_data.conn, &accepted_conn);
if (UNLIKELY(status != ERR_OK)) {
socket_data_postinit(platform);
do_send_error_reply(ctx, status, ref_ticks, pid);
return;
if (status != ERR_WOULDBLOCK) {
do_send_error_reply(ctx, status, ref_ticks, pid);
}
return status;
}

TRACE("accepted conn: %p\n", (void *) accepted_conn);
// Check if it's in the list of ready_connections

Context *new_ctx = context_new(glb);
new_ctx->native_handler = socket_consume_mailbox;

term socket_pid = term_port_from_local_process_id(new_ctx->process_id);

struct ListHead *sockets = socket_data_preinit(platform);

struct ListHead *ready_connections_head;
struct ListHead *tmp;
Expand All @@ -529,11 +554,6 @@ static void accept_conn(Context *ctx, struct TCPServerSocketData *tcp_data, uint
}
}

Context *new_ctx = context_new(glb);
new_ctx->native_handler = socket_consume_mailbox;

term socket_pid = term_port_from_local_process_id(new_ctx->process_id);

struct TCPClientSocketData *new_tcp_data = tcp_client_socket_data_new(new_ctx, accepted_conn, sockets, pid);
socket_data_postinit(platform);
if (IS_NULL_PTR(new_tcp_data)) {
Expand All @@ -559,6 +579,7 @@ static void accept_conn(Context *ctx, struct TCPServerSocketData *tcp_data, uint

do_send_reply(ctx, result_tuple, ref_ticks, pid);

return ERR_OK;
}

static void do_accept(Context *ctx, const GenMessage *gen_message)
Expand All @@ -571,9 +592,14 @@ static void do_accept(Context *ctx, const GenMessage *gen_message)
if (tcp_data->ready_connections) {
TRACE("accepting existing connections.\n");

accept_conn(ctx, tcp_data, ref_ticks, pid);
tcp_data->ready_connections--;
} else {
if (accept_conn(ctx, tcp_data, ref_ticks, pid) == ERR_WOULDBLOCK) {
tcp_data->ready_connections = 0;
} else {
tcp_data->ready_connections--;
return;
}
}
{
struct TCPServerAccepter *accepter = malloc(sizeof(struct TCPServerAccepter));
accepter->accepting_process_pid = pid;
accepter->ref_ticks = ref_ticks;
Expand Down Expand Up @@ -655,8 +681,13 @@ static void do_tcp_server_netconn_event(Context *ctx)
}

if (accepter) {
accept_conn(ctx, tcp_data, accepter->ref_ticks, accepter->accepting_process_pid);
free(accepter);
if (accept_conn(ctx, tcp_data, accepter->ref_ticks, accepter->accepting_process_pid)
== ERR_WOULDBLOCK) {
// Spurious wake-up: keep waiting for a real connection.
list_append(&tcp_data->accepters_list_head, &accepter->accepter_head);
} else {
free(accepter);
}
} else {
tcp_data->ready_connections++;
}
Expand All @@ -672,16 +703,18 @@ static NativeHandlerResult do_receive_data(Context *ctx)
if (socket_data->type == TCPClientSocket) {
// Close socket in case of errors or finish closing if it's closed
// on the other end.
#ifndef AVM_NO_SMP
struct ESP32PlatformData *platform = ctx->global->platform_data;
#endif
synclist_remove(&platform->sockets, &socket_data->sockets_head);
if (UNLIKELY(netconn_close(socket_data->conn) != ERR_OK)) {
TRACE("do_receive_data: netconn_close failed\n");
}
if (UNLIKELY(netconn_delete(socket_data->conn) != ERR_OK)) {
TRACE("do_receive_data: netconn_delete failed\n");
}
struct ListHead *sockets_head = synclist_wrlock(&platform->sockets);
UNUSED(sockets_head);
list_remove(&socket_data->sockets_head);
purge_ready_connections(platform, socket_data->conn);
synclist_unlock(&platform->sockets);
socket_data->conn = NULL;
}
if (socket_data->type == TCPClientSocket && (status == ERR_CLSD || status == ERR_CONN)) {
Expand Down Expand Up @@ -921,32 +954,40 @@ static void do_connect(Context *ctx, const GenMessage *gen_message)

free(address_string);

// Lock list of sockets before the event callback is called
struct ListHead *sockets = socket_data_preinit(platform);
struct netconn *conn = netconn_new_with_proto_and_callback(NETCONN_TCP, 0, socket_callback);
if (IS_NULL_PTR(conn)) {
AVM_ABORT();
}

// Publish the socket before the blocking connect
struct ListHead *sockets = socket_data_preinit(platform);
struct TCPClientSocketData *tcp_data = tcp_client_socket_data_new(ctx, conn, sockets, controlling_process_pid);
if (IS_NULL_PTR(tcp_data)) {
AVM_ABORT();
}
tcp_data->socket_data.active = active;
tcp_data->socket_data.binary = binary;
socket_data_postinit(platform);

status = netconn_connect(conn, &remote_ip, port);
if (UNLIKELY(status != ERR_OK)) {
TRACE("tcp: failed connect: %i\n", status);
// Stop new events first, then unpublish
netconn_delete(conn);
socket_data_postinit(platform);
struct ListHead *sockets_head = synclist_wrlock(&platform->sockets);
UNUSED(sockets_head);
list_remove(&tcp_data->socket_data.sockets_head);
purge_ready_connections(platform, conn);
synclist_unlock(&platform->sockets);
free(tcp_data);
// Mark platform_data as NULL to drop other messages
ctx->platform_data = NULL;
do_send_error_reply(ctx, status, ref_ticks, pid);
return;
}

TRACE("tcp: connected.\n");

struct TCPClientSocketData *tcp_data = tcp_client_socket_data_new(ctx, conn, sockets, controlling_process_pid);
socket_data_postinit(platform);
if (IS_NULL_PTR(tcp_data)) {
AVM_ABORT();
}
tcp_data->socket_data.active = active;
tcp_data->socket_data.binary = binary;

do_send_reply(ctx, OK_ATOM, ref_ticks, pid);
}

Expand Down Expand Up @@ -991,6 +1032,8 @@ static void do_listen(Context *ctx, const GenMessage *gen_message)

err_t status = netconn_bind(conn, IP_ADDR_ANY, port);
if (UNLIKELY(status != ERR_OK)) {
netconn_delete(conn);
socket_data_postinit(platform);
do_send_error_reply(ctx, status, ref_ticks, pid);
return;
}
Expand All @@ -999,12 +1042,16 @@ static void do_listen(Context *ctx, const GenMessage *gen_message)
u16_t nport;
status = netconn_getaddr(conn, &naddr, &nport, 1);
if (UNLIKELY(status != ERR_OK)) {
netconn_delete(conn);
socket_data_postinit(platform);
do_send_error_reply(ctx, status, ref_ticks, pid);
return;
}

status = netconn_listen_with_backlog(conn, backlog);
if (UNLIKELY(status != ERR_OK)) {
netconn_delete(conn);
socket_data_postinit(platform);
do_send_error_reply(ctx, status, ref_ticks, pid);
return;
}
Expand Down Expand Up @@ -1262,11 +1309,14 @@ static void do_close(Context *ctx, const GenMessage *gen_message)
}
err_t delete_res = netconn_delete(socket_data->conn);

struct netconn *closed_conn = socket_data->conn;
socket_data->conn = NULL;
#ifndef AVM_NO_SMP
struct ESP32PlatformData *platform = ctx->global->platform_data;
#endif
synclist_remove(&platform->sockets, &socket_data->sockets_head);
struct ListHead *sockets_head = synclist_wrlock(&platform->sockets);
UNUSED(sockets_head);
list_remove(&socket_data->sockets_head);
purge_ready_connections(platform, closed_conn);
synclist_unlock(&platform->sockets);

if (UNLIKELY(close_disconnect_res != ERR_OK)) {
do_send_error_reply(ctx, close_disconnect_res, ref_ticks, pid);
Expand Down Expand Up @@ -1430,6 +1480,11 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx)
TRACE("\n");

if (term_is_tuple(msg) && term_get_tuple_element(msg, 0) == globalcontext_make_atom(glb, netconn_event_internal)) {
if (IS_NULL_PTR(ctx->platform_data)) {
// Event for a connection whose init failed and was deleted.
mailbox_remove_message(&ctx->mailbox, &ctx->heap);
continue;
}
int len = term_to_int32(term_get_tuple_element(msg, 1));
NativeHandlerResult result = do_netconn_event(ctx, len);
if (result == NativeTerminate) {
Expand Down
Loading