diff --git a/src/libAtomVM/erl_nif.h b/src/libAtomVM/erl_nif.h index 4819c8a36f..c829a15a6f 100644 --- a/src/libAtomVM/erl_nif.h +++ b/src/libAtomVM/erl_nif.h @@ -201,6 +201,8 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj); * Please note that `kqueue(2)` and `poll(2)` behave differently for some * objects, for example for vnodes and EOF. * + * On `esp32`, this is currently implemented using `poll(2)`. + * * @param env current environment * @param event event object (typically a file descriptor) * @param mode select mode (`ERL_NIF_SELECT_READ` and/or `ERL_NIF_SELECT_WRITE`) diff --git a/src/platforms/esp32/components/avm_sys/CMakeLists.txt b/src/platforms/esp32/components/avm_sys/CMakeLists.txt index 02d0220f0e..0aa2bd3e69 100644 --- a/src/platforms/esp32/components/avm_sys/CMakeLists.txt +++ b/src/platforms/esp32/components/avm_sys/CMakeLists.txt @@ -39,7 +39,7 @@ endif() idf_component_register( SRCS ${AVM_SYS_COMPONENT_SRCS} INCLUDE_DIRS "include" - REQUIRES "spi_flash" "soc" "newlib" "pthread" ${ADDITIONAL_COMPONENTS} + REQUIRES "spi_flash" "soc" "newlib" "pthread" "vfs" ${ADDITIONAL_COMPONENTS} PRIV_REQUIRES "libatomvm" "esp_timer" ${ADDITIONAL_PRIV_REQUIRES} ) diff --git a/src/platforms/esp32/components/avm_sys/include/esp32_sys.h b/src/platforms/esp32/components/avm_sys/include/esp32_sys.h index 1437255469..aa60657dff 100644 --- a/src/platforms/esp32/components/avm_sys/include/esp32_sys.h +++ b/src/platforms/esp32/components/avm_sys/include/esp32_sys.h @@ -24,12 +24,14 @@ #include "freertos/FreeRTOS.h" #include #include +#include "esp_pthread.h" #if ESP_IDF_VERSION_MAJOR >= 5 #include #endif #include +#include #include "sys.h" @@ -81,6 +83,13 @@ struct EventListener struct ESP32PlatformData { + pthread_t select_thread; + bool select_thread_exit; + bool eventfd_registered; + int signal_fd; + int ATOMIC select_events_poll_count; + struct pollfd *fds; + // socket_driver EventListener *socket_listener; struct SyncList sockets; diff --git a/src/platforms/esp32/components/avm_sys/sys.c b/src/platforms/esp32/components/avm_sys/sys.c index b90ed15581..f0ebe91a29 100644 --- a/src/platforms/esp32/components/avm_sys/sys.c +++ b/src/platforms/esp32/components/avm_sys/sys.c @@ -40,14 +40,16 @@ #include #include #include +#include #include #include -// introduced starting with 4.4 #if ESP_IDF_VERSION_MAJOR >= 5 #include "esp_chip_info.h" #endif +#include + #ifdef HAVE_SOC_CPU_CORES_NUM #include "soc/soc_caps.h" #endif @@ -61,6 +63,9 @@ static Context *port_driver_create_port(const char *port_name, GlobalContext *global, term opts); +static void *select_thread_loop(void *); +static void select_thread_signal(struct ESP32PlatformData *platform); + // clang-format off static const char *const esp_free_heap_size_atom = "\x14" "esp32_free_heap_size"; static const char *const esp_largest_free_block_atom = "\x18" "esp32_largest_free_block"; @@ -182,6 +187,29 @@ void sys_init_platform(GlobalContext *glb) { struct ESP32PlatformData *platform = malloc(sizeof(struct ESP32PlatformData)); glb->platform_data = platform; + platform->select_thread_exit = false; + platform->select_events_poll_count = -1; + esp_vfs_eventfd_config_t eventfd_config = ESP_VFS_EVENTD_CONFIG_DEFAULT(); + esp_err_t err = esp_vfs_eventfd_register(&eventfd_config); + if (err == ESP_OK) { + platform->eventfd_registered = true; + } else { + if (UNLIKELY(err != ESP_ERR_INVALID_STATE)) { + // Function can return fatal ESP_ERR_NO_MEM + fprintf(stderr, "Cannot register eventfd, unexpected error = %d\n", err); + AVM_ABORT(); + } + platform->eventfd_registered = false; + } + int signal_fd = eventfd(0, 0); + if (UNLIKELY(signal_fd < 0)) { + fprintf(stderr, "Cannot create signal_fd\n"); + AVM_ABORT(); + } + platform->signal_fd = signal_fd; + if (UNLIKELY(pthread_create(&platform->select_thread, NULL, select_thread_loop, glb))) { + AVM_ABORT(); + } #ifndef AVM_NO_SMP // Use the ESP-IDF API to change the default thread attributes // We use the current main thread priority. @@ -208,6 +236,16 @@ void sys_init_platform(GlobalContext *glb) void sys_free_platform(GlobalContext *glb) { struct ESP32PlatformData *platform = glb->platform_data; + platform->select_thread_exit = true; + select_thread_signal(platform); + pthread_join(platform->select_thread, NULL); + close(platform->signal_fd); + if (platform->eventfd_registered) { + if (UNLIKELY(esp_vfs_eventfd_unregister() != ESP_OK)) { + fprintf(stderr, "Cannot unregister eventfd\n"); + AVM_ABORT(); + } + } free(platform); } @@ -560,16 +598,119 @@ void sys_unregister_listener(GlobalContext *global, struct EventListener *listen void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) { - UNUSED(global); UNUSED(event); UNUSED(is_write); + + struct ESP32PlatformData *platform = global->platform_data; + platform->select_events_poll_count = -1; + select_thread_signal(platform); } void sys_unregister_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) { - UNUSED(global); UNUSED(event); UNUSED(is_write); + + struct ESP32PlatformData *platform = global->platform_data; + platform->select_events_poll_count = -1; + select_thread_signal(platform); +} + +static void *select_thread_loop(void *arg) +{ + GlobalContext *glb = arg; + struct ESP32PlatformData *platform = glb->platform_data; + struct pollfd *fds = malloc(0); + while (!platform->select_thread_exit) { + int select_events_poll_count = platform->select_events_poll_count; + int poll_count = 1; + int fd_index; + if (select_events_poll_count < 0) { + // Means it is dirty and should be rebuilt. + size_t select_events_new_count; + if (select_events_poll_count < 0) { + select_event_count_and_destroy_closed(NULL, NULL, &select_events_new_count, glb); + } else { + select_events_new_count = select_events_poll_count; + } + + fds = realloc(fds, sizeof(struct pollfd) * (poll_count + select_events_new_count)); + + fds[0].fd = platform->signal_fd; + fds[0].events = POLLIN; + fds[0].revents = 0; + + fd_index = poll_count; + + struct ListHead *item; + struct ListHead *select_events = synclist_rdlock(&glb->select_events); + LIST_FOR_EACH (item, select_events) { + struct SelectEvent *select_event = GET_LIST_ENTRY(item, struct SelectEvent, head); + if (select_event->read || select_event->write) { + fds[fd_index].fd = select_event->event; + fds[fd_index].events = (select_event->read ? POLLIN : 0) | (select_event->write ? POLLOUT : 0); + fds[fd_index].revents = 0; + + fd_index++; + } + } + synclist_unlock(&glb->select_events); + + select_events_poll_count = select_events_new_count; + platform->select_events_poll_count = select_events_new_count; + } + + poll_count += select_events_poll_count; + + int nb_descriptors = poll(fds, poll_count, -1); + fd_index = 0; + if (nb_descriptors > 0) { + if ((fds[0].revents & fds[0].events)) { + // We've been signaled + uint64_t ignored; + if (UNLIKELY(read(platform->signal_fd, &ignored, sizeof(ignored)) < 0)) { + fprintf(stderr, "Reading event_fd failed -- errno = %d\n", errno); + AVM_ABORT(); + } + nb_descriptors--; + } + fd_index++; + } + + for (int i = 0; i < select_events_poll_count && nb_descriptors > 0; i++, fd_index++) { + if (!(fds[fd_index].revents & fds[fd_index].events)) { + continue; + } + bool is_read = fds[fd_index].revents & POLLIN; + bool is_write = fds[fd_index].revents & POLLOUT; + fds[fd_index].revents = 0; + nb_descriptors--; + + select_event_notify(fds[fd_index].fd, is_read, is_write, glb); + } + } + + free((void *) fds); + + return NULL; +} + +static void select_thread_signal(struct ESP32PlatformData *platform) +{ + // Write can fail if the counter overflows + // (very unlikely, 2^64) + uint64_t val = 1; + if (UNLIKELY(write(platform->signal_fd, &val, sizeof(val)) < 0)) { + uint64_t ignored; + if (UNLIKELY(read(platform->signal_fd, &ignored, sizeof(ignored)) < 0)) { + fprintf(stderr, "Reading event_fd failed\n"); + AVM_ABORT(); + } + if (UNLIKELY(write(platform->signal_fd, &val, sizeof(val)) < 0)) { + fprintf(stderr, "Writing event_fd failed\n"); + AVM_ABORT(); + } + } } bool event_listener_is_event(EventListener *listener, listener_event_t event) diff --git a/src/platforms/esp32/test/main/test_erl_sources/CMakeLists.txt b/src/platforms/esp32/test/main/test_erl_sources/CMakeLists.txt index 2f6187491e..3bdd57b84b 100644 --- a/src/platforms/esp32/test/main/test_erl_sources/CMakeLists.txt +++ b/src/platforms/esp32/test/main/test_erl_sources/CMakeLists.txt @@ -42,6 +42,7 @@ compile_erlang(test_list_to_binary) compile_erlang(test_md5) compile_erlang(test_monotonic_time) compile_erlang(test_rtc_slow) +compile_erlang(test_select) compile_erlang(test_socket) compile_erlang(test_time_and_processes) compile_erlang(test_tz) @@ -56,6 +57,7 @@ add_custom_command( test_md5.beam test_monotonic_time.beam test_rtc_slow.beam + test_select.beam test_socket.beam test_time_and_processes.beam test_tz.beam @@ -67,6 +69,7 @@ add_custom_command( "${CMAKE_CURRENT_BINARY_DIR}/test_md5.beam" "${CMAKE_CURRENT_BINARY_DIR}/test_monotonic_time.beam" "${CMAKE_CURRENT_BINARY_DIR}/test_rtc_slow.beam" + "${CMAKE_CURRENT_BINARY_DIR}/test_select.beam" "${CMAKE_CURRENT_BINARY_DIR}/test_socket.beam" "${CMAKE_CURRENT_BINARY_DIR}/test_time_and_processes.beam" "${CMAKE_CURRENT_BINARY_DIR}/test_tz.beam" diff --git a/src/platforms/esp32/test/main/test_erl_sources/test_select.erl b/src/platforms/esp32/test/main/test_erl_sources/test_select.erl new file mode 100644 index 0000000000..4a77dc4389 --- /dev/null +++ b/src/platforms/esp32/test/main/test_erl_sources/test_select.erl @@ -0,0 +1,79 @@ +% +% This file is part of AtomVM. +% +% Copyright 2023 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +-module(test_select). + +-export([start/0]). + +% This test relies on a special vfs registered under /pipe. + +start() -> + {ok, WrFd} = atomvm:posix_open("/pipe/0", [o_wronly]), + {ok, RdFd} = atomvm:posix_open("/pipe/0", [o_rdonly]), + % Make sure this test vfs works as expected + {ok, 1} = atomvm:posix_write(WrFd, <<42>>), + {error, eagain} = atomvm:posix_write(WrFd, <<43>>), + {ok, <<42>>} = atomvm:posix_read(RdFd, 1), + {error, eagain} = atomvm:posix_read(RdFd, 1), + + % Write fd should be selectable. + SelectWriteRef = make_ref(), + ok = atomvm:posix_select_write(WrFd, self(), SelectWriteRef), + ok = + receive + {select, WrFd, SelectWriteRef, ready_output} -> ok; + M -> {unexpected, M} + after 200 -> fail + end, + ok = atomvm:posix_select_stop(WrFd), + + % Write and check that rd is selectable fd should be selectable. + {ok, 1} = atomvm:posix_write(WrFd, <<42>>), + SelectReadRef = make_ref(), + ok = atomvm:posix_select_read(RdFd, self(), SelectReadRef), + ok = + receive + {select, RdFd, SelectReadRef, ready_input} -> ok + after 200 -> fail + end, + {ok, <<42>>} = atomvm:posix_read(RdFd, 1), + ok = atomvm:posix_select_read(RdFd, self(), SelectReadRef), + ok = + receive + {select, RdFd, SelectReadRef, _} -> fail + after 200 -> ok + end, + {ok, 1} = atomvm:posix_write(WrFd, <<43>>), + ok = + receive + {select, RdFd, SelectReadRef, ready_input} -> ok; + M2 -> {unexpected, M2} + after 200 -> fail + end, + ok = atomvm:posix_select_stop(RdFd), + ok = + receive + Message -> {unexpected, Message} + after 200 -> ok + end, + + ok = atomvm:posix_close(WrFd), + ok = atomvm:posix_close(RdFd), + ok. diff --git a/src/platforms/esp32/test/main/test_main.c b/src/platforms/esp32/test/main/test_main.c index 2518230d7c..c946e60852 100644 --- a/src/platforms/esp32/test/main/test_main.c +++ b/src/platforms/esp32/test/main/test_main.c @@ -19,6 +19,7 @@ */ #include "unity.h" +#include #include #include @@ -37,6 +38,7 @@ #include #include #include +#include #include #include @@ -228,6 +230,163 @@ TEST_CASE("test_monotonic_time", "[test_run]") TEST_ASSERT(ret_value == OK_ATOM); } +struct pipefs_global_ctx +{ + int max_fd; + uint8_t byte; + bool has_byte; + bool is_select; + int select_nfds; + esp_vfs_select_sem_t select_sem; + fd_set *readfds; + fd_set *writefds; + fd_set readfds_orig; + fd_set writefds_orig; +}; + +static struct pipefs_global_ctx pipefs_global_ctx; + +static ssize_t pipefs_write(int fd, const void *data, size_t size) +{ + UNUSED(fd); + + if (size == 0) { + return 0; + } + if (pipefs_global_ctx.has_byte) { + errno = EAGAIN; + return -1; + } + pipefs_global_ctx.has_byte = true; + pipefs_global_ctx.byte = *((uint8_t *) data); + if (pipefs_global_ctx.is_select) { + // Notify any reader. + bool notify = false; + for (int i = 0; i < pipefs_global_ctx.select_nfds; i++) { + if (FD_ISSET(i, &pipefs_global_ctx.readfds_orig)) { + FD_SET(i, pipefs_global_ctx.readfds); + notify = true; + } + } + if (notify) { + esp_vfs_select_triggered(pipefs_global_ctx.select_sem); + } + } + return 1; +} + +static ssize_t pipefs_read(int fd, void *data, size_t size) +{ + UNUSED(fd); + + if (size == 0) { + return 0; + } + if (!pipefs_global_ctx.has_byte) { + errno = EAGAIN; + return -1; + } + pipefs_global_ctx.has_byte = false; + *((uint8_t *) data) = pipefs_global_ctx.byte; + if (pipefs_global_ctx.is_select) { + // Notify any writer. + bool notify = false; + for (int i = 0; i < pipefs_global_ctx.select_nfds; i++) { + if (FD_ISSET(i, &pipefs_global_ctx.writefds_orig)) { + FD_SET(i, pipefs_global_ctx.writefds); + notify = true; + } + } + if (notify) { + esp_vfs_select_triggered(pipefs_global_ctx.select_sem); + } + } + return 1; +} + +static int pipefs_open(const char *path, int flags, int mode) +{ + return ++pipefs_global_ctx.max_fd; +} + +static int pipefs_close(int fd) +{ + UNUSED(fd); + return 0; +} + +static esp_err_t pipefs_start_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, esp_vfs_select_sem_t sem, void **end_select_args) +{ + // Cannot select twice in parallel + if (pipefs_global_ctx.is_select) { + return ESP_ERR_INVALID_STATE; + } + + pipefs_global_ctx.select_nfds = nfds; + pipefs_global_ctx.readfds = readfds; + pipefs_global_ctx.readfds_orig = *readfds; + pipefs_global_ctx.writefds = writefds; + pipefs_global_ctx.writefds_orig = *writefds; + pipefs_global_ctx.select_sem = sem; + pipefs_global_ctx.is_select = true; + FD_ZERO(readfds); + FD_ZERO(writefds); + FD_ZERO(exceptfds); + + // Notify based on current state. + bool notify = false; + for (int i = 0; i < pipefs_global_ctx.select_nfds; i++) { + if (pipefs_global_ctx.has_byte) { + if (FD_ISSET(i, &pipefs_global_ctx.readfds_orig)) { + FD_SET(i, pipefs_global_ctx.readfds); + notify = true; + } + } else { + if (FD_ISSET(i, &pipefs_global_ctx.writefds_orig)) { + FD_SET(i, pipefs_global_ctx.writefds); + notify = true; + } + } + } + if (notify) { + esp_vfs_select_triggered(pipefs_global_ctx.select_sem); + } + + return ESP_OK; +} + +static esp_err_t pipefs_end_select(void *end_select_args) +{ + if (!pipefs_global_ctx.is_select) { + return ESP_ERR_INVALID_STATE; + } + pipefs_global_ctx.is_select = false; + return ESP_OK; +} + +TEST_CASE("test_select", "[test_run]") +{ + esp_vfs_t pipefs = { + .flags = ESP_VFS_FLAG_DEFAULT, + .write = &pipefs_write, + .open = &pipefs_open, + .read = &pipefs_read, + .close = &pipefs_close, + .start_select = &pipefs_start_select, + .end_select = &pipefs_end_select, + }; + pipefs_global_ctx.has_byte = false; + pipefs_global_ctx.is_select = false; + pipefs_global_ctx.max_fd = 0; + + ESP_ERROR_CHECK(esp_vfs_register("/pipe", &pipefs, NULL)); + + term ret_value = avm_test_case("test_select.beam"); + TEST_ASSERT(ret_value == OK_ATOM); + + esp_vfs_unregister("/pipe"); +} + TEST_CASE("test_time_and_processes", "[test_run]") { term ret_value = avm_test_case("test_time_and_processes.beam");