Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/libAtomVM/erl_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);
* Please note that `kqueue(2)` and `poll(2)` behave differently for some
* objects, for example for vnodes and EOF.
*
* On `esp32`, this is currently implemented using `poll(2)`.
*
* @param env current environment
* @param event event object (typically a file descriptor)
* @param mode select mode (`ERL_NIF_SELECT_READ` and/or `ERL_NIF_SELECT_WRITE`)
Expand Down
2 changes: 1 addition & 1 deletion src/platforms/esp32/components/avm_sys/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ endif()
idf_component_register(
SRCS ${AVM_SYS_COMPONENT_SRCS}
INCLUDE_DIRS "include"
REQUIRES "spi_flash" "soc" "newlib" "pthread" ${ADDITIONAL_COMPONENTS}
REQUIRES "spi_flash" "soc" "newlib" "pthread" "vfs" ${ADDITIONAL_COMPONENTS}
PRIV_REQUIRES "libatomvm" "esp_timer" ${ADDITIONAL_PRIV_REQUIRES}
)

Expand Down
9 changes: 9 additions & 0 deletions src/platforms/esp32/components/avm_sys/include/esp32_sys.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
#include "freertos/FreeRTOS.h"
#include <esp_partition.h>
#include <freertos/queue.h>
#include "esp_pthread.h"

#if ESP_IDF_VERSION_MAJOR >= 5
#include <spi_flash_mmap.h>
#endif

#include <time.h>
#include <sys/poll.h>

#include "sys.h"

Expand Down Expand Up @@ -81,6 +83,13 @@ struct EventListener

struct ESP32PlatformData
{
pthread_t select_thread;
bool select_thread_exit;
bool eventfd_registered;
int signal_fd;
int ATOMIC select_events_poll_count;
struct pollfd *fds;

// socket_driver
EventListener *socket_listener;
struct SyncList sockets;
Expand Down
147 changes: 144 additions & 3 deletions src/platforms/esp32/components/avm_sys/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@
#include <esp_log.h>
#include <esp_partition.h>
#include <limits.h>
#include <pthread.h>
#include <stdint.h>
#include <sys/socket.h>

// introduced starting with 4.4
#if ESP_IDF_VERSION_MAJOR >= 5
#include "esp_chip_info.h"
#endif

#include <esp_vfs_eventfd.h>

#ifdef HAVE_SOC_CPU_CORES_NUM
#include "soc/soc_caps.h"
#endif
Expand All @@ -61,6 +63,9 @@

static Context *port_driver_create_port(const char *port_name, GlobalContext *global, term opts);

static void *select_thread_loop(void *);
static void select_thread_signal(struct ESP32PlatformData *platform);

// clang-format off
static const char *const esp_free_heap_size_atom = "\x14" "esp32_free_heap_size";
static const char *const esp_largest_free_block_atom = "\x18" "esp32_largest_free_block";
Expand Down Expand Up @@ -182,6 +187,29 @@ void sys_init_platform(GlobalContext *glb)
{
struct ESP32PlatformData *platform = malloc(sizeof(struct ESP32PlatformData));
glb->platform_data = platform;
platform->select_thread_exit = false;
platform->select_events_poll_count = -1;
esp_vfs_eventfd_config_t eventfd_config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
esp_err_t err = esp_vfs_eventfd_register(&eventfd_config);
if (err == ESP_OK) {
platform->eventfd_registered = true;
} else {
if (UNLIKELY(err != ESP_ERR_INVALID_STATE)) {
// Function can return fatal ESP_ERR_NO_MEM
fprintf(stderr, "Cannot register eventfd, unexpected error = %d\n", err);
AVM_ABORT();
}
platform->eventfd_registered = false;
}
int signal_fd = eventfd(0, 0);
if (UNLIKELY(signal_fd < 0)) {
fprintf(stderr, "Cannot create signal_fd\n");
AVM_ABORT();
}
platform->signal_fd = signal_fd;
if (UNLIKELY(pthread_create(&platform->select_thread, NULL, select_thread_loop, glb))) {
AVM_ABORT();
}
#ifndef AVM_NO_SMP
// Use the ESP-IDF API to change the default thread attributes
// We use the current main thread priority.
Expand All @@ -208,6 +236,16 @@ void sys_init_platform(GlobalContext *glb)
void sys_free_platform(GlobalContext *glb)
{
struct ESP32PlatformData *platform = glb->platform_data;
platform->select_thread_exit = true;
select_thread_signal(platform);
pthread_join(platform->select_thread, NULL);
close(platform->signal_fd);
if (platform->eventfd_registered) {
if (UNLIKELY(esp_vfs_eventfd_unregister() != ESP_OK)) {
fprintf(stderr, "Cannot unregister eventfd\n");
AVM_ABORT();
}
}
free(platform);
}

Expand Down Expand Up @@ -560,16 +598,119 @@ void sys_unregister_listener(GlobalContext *global, struct EventListener *listen

void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write)
{
UNUSED(global);
UNUSED(event);
UNUSED(is_write);

struct ESP32PlatformData *platform = global->platform_data;
platform->select_events_poll_count = -1;
select_thread_signal(platform);
}

void sys_unregister_select_event(GlobalContext *global, ErlNifEvent event, bool is_write)
{
UNUSED(global);
UNUSED(event);
UNUSED(is_write);

struct ESP32PlatformData *platform = global->platform_data;
platform->select_events_poll_count = -1;
select_thread_signal(platform);
}

static void *select_thread_loop(void *arg)
{
GlobalContext *glb = arg;
struct ESP32PlatformData *platform = glb->platform_data;
struct pollfd *fds = malloc(0);
while (!platform->select_thread_exit) {
int select_events_poll_count = platform->select_events_poll_count;
int poll_count = 1;
int fd_index;
if (select_events_poll_count < 0) {
// Means it is dirty and should be rebuilt.
size_t select_events_new_count;
if (select_events_poll_count < 0) {
select_event_count_and_destroy_closed(NULL, NULL, &select_events_new_count, glb);
} else {
select_events_new_count = select_events_poll_count;
}

fds = realloc(fds, sizeof(struct pollfd) * (poll_count + select_events_new_count));

fds[0].fd = platform->signal_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;

fd_index = poll_count;

struct ListHead *item;
struct ListHead *select_events = synclist_rdlock(&glb->select_events);
LIST_FOR_EACH (item, select_events) {
struct SelectEvent *select_event = GET_LIST_ENTRY(item, struct SelectEvent, head);
if (select_event->read || select_event->write) {
fds[fd_index].fd = select_event->event;
fds[fd_index].events = (select_event->read ? POLLIN : 0) | (select_event->write ? POLLOUT : 0);
fds[fd_index].revents = 0;

fd_index++;
}
}
synclist_unlock(&glb->select_events);

select_events_poll_count = select_events_new_count;
platform->select_events_poll_count = select_events_new_count;
}

poll_count += select_events_poll_count;

int nb_descriptors = poll(fds, poll_count, -1);
fd_index = 0;
if (nb_descriptors > 0) {
if ((fds[0].revents & fds[0].events)) {
// We've been signaled
uint64_t ignored;
if (UNLIKELY(read(platform->signal_fd, &ignored, sizeof(ignored)) < 0)) {
fprintf(stderr, "Reading event_fd failed -- errno = %d\n", errno);
AVM_ABORT();
}
nb_descriptors--;
}
fd_index++;
}

for (int i = 0; i < select_events_poll_count && nb_descriptors > 0; i++, fd_index++) {
if (!(fds[fd_index].revents & fds[fd_index].events)) {
continue;
}
bool is_read = fds[fd_index].revents & POLLIN;
bool is_write = fds[fd_index].revents & POLLOUT;
fds[fd_index].revents = 0;
nb_descriptors--;

select_event_notify(fds[fd_index].fd, is_read, is_write, glb);
}
}

free((void *) fds);

return NULL;
}

static void select_thread_signal(struct ESP32PlatformData *platform)
{
// Write can fail if the counter overflows
// (very unlikely, 2^64)
uint64_t val = 1;
if (UNLIKELY(write(platform->signal_fd, &val, sizeof(val)) < 0)) {
uint64_t ignored;
if (UNLIKELY(read(platform->signal_fd, &ignored, sizeof(ignored)) < 0)) {
fprintf(stderr, "Reading event_fd failed\n");
AVM_ABORT();
}
if (UNLIKELY(write(platform->signal_fd, &val, sizeof(val)) < 0)) {
fprintf(stderr, "Writing event_fd failed\n");
AVM_ABORT();
}
}
}

bool event_listener_is_event(EventListener *listener, listener_event_t event)
Expand Down
3 changes: 3 additions & 0 deletions src/platforms/esp32/test/main/test_erl_sources/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ compile_erlang(test_list_to_binary)
compile_erlang(test_md5)
compile_erlang(test_monotonic_time)
compile_erlang(test_rtc_slow)
compile_erlang(test_select)
compile_erlang(test_socket)
compile_erlang(test_time_and_processes)
compile_erlang(test_tz)
Expand All @@ -56,6 +57,7 @@ add_custom_command(
test_md5.beam
test_monotonic_time.beam
test_rtc_slow.beam
test_select.beam
test_socket.beam
test_time_and_processes.beam
test_tz.beam
Expand All @@ -67,6 +69,7 @@ add_custom_command(
"${CMAKE_CURRENT_BINARY_DIR}/test_md5.beam"
"${CMAKE_CURRENT_BINARY_DIR}/test_monotonic_time.beam"
"${CMAKE_CURRENT_BINARY_DIR}/test_rtc_slow.beam"
"${CMAKE_CURRENT_BINARY_DIR}/test_select.beam"
"${CMAKE_CURRENT_BINARY_DIR}/test_socket.beam"
"${CMAKE_CURRENT_BINARY_DIR}/test_time_and_processes.beam"
"${CMAKE_CURRENT_BINARY_DIR}/test_tz.beam"
Expand Down
79 changes: 79 additions & 0 deletions src/platforms/esp32/test/main/test_erl_sources/test_select.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
%
% This file is part of AtomVM.
%
% Copyright 2023 Paul Guyot <pguyot@kallisys.net>
%
% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
% You may obtain a copy of the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS,
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
% See the License for the specific language governing permissions and
% limitations under the License.
%
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
%

-module(test_select).

-export([start/0]).

% This test relies on a special vfs registered under /pipe.

start() ->
{ok, WrFd} = atomvm:posix_open("/pipe/0", [o_wronly]),
{ok, RdFd} = atomvm:posix_open("/pipe/0", [o_rdonly]),
% Make sure this test vfs works as expected
{ok, 1} = atomvm:posix_write(WrFd, <<42>>),
{error, eagain} = atomvm:posix_write(WrFd, <<43>>),
{ok, <<42>>} = atomvm:posix_read(RdFd, 1),
{error, eagain} = atomvm:posix_read(RdFd, 1),

% Write fd should be selectable.
SelectWriteRef = make_ref(),
ok = atomvm:posix_select_write(WrFd, self(), SelectWriteRef),
ok =
receive
{select, WrFd, SelectWriteRef, ready_output} -> ok;
M -> {unexpected, M}
after 200 -> fail
end,
ok = atomvm:posix_select_stop(WrFd),

% Write and check that rd is selectable fd should be selectable.
{ok, 1} = atomvm:posix_write(WrFd, <<42>>),
SelectReadRef = make_ref(),
ok = atomvm:posix_select_read(RdFd, self(), SelectReadRef),
ok =
receive
{select, RdFd, SelectReadRef, ready_input} -> ok
after 200 -> fail
end,
{ok, <<42>>} = atomvm:posix_read(RdFd, 1),
ok = atomvm:posix_select_read(RdFd, self(), SelectReadRef),
ok =
receive
{select, RdFd, SelectReadRef, _} -> fail
after 200 -> ok
end,
{ok, 1} = atomvm:posix_write(WrFd, <<43>>),
ok =
receive
{select, RdFd, SelectReadRef, ready_input} -> ok;
M2 -> {unexpected, M2}
after 200 -> fail
end,
ok = atomvm:posix_select_stop(RdFd),
ok =
receive
Message -> {unexpected, Message}
after 200 -> ok
end,

ok = atomvm:posix_close(WrFd),
ok = atomvm:posix_close(RdFd),
ok.
Loading