diff --git a/c_src/py_callback.c b/c_src/py_callback.c index b7eaa74..f593b01 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -1276,6 +1276,168 @@ PyTypeObject ErlangPidType = { .tp_doc = "Opaque Erlang process identifier", }; +/* ============================================================================ + * ErlangRef - opaque Erlang reference type + * + * Stores the reference as a serialized binary for round-trip through Python. + * ============================================================================ */ + +static void ErlangRef_dealloc(ErlangRefObject *self) { + if (self->data != NULL) { + PyMem_Free(self->data); + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject *ErlangRef_repr(ErlangRefObject *self) { + return PyUnicode_FromFormat("", self->size); +} + +static PyObject *ErlangRef_richcompare(PyObject *a, PyObject *b, int op) { + if (!Py_IS_TYPE(b, &ErlangRefType)) { + Py_RETURN_NOTIMPLEMENTED; + } + ErlangRefObject *ra = (ErlangRefObject *)a; + ErlangRefObject *rb = (ErlangRefObject *)b; + + int cmp = 0; + if (ra->size != rb->size) { + cmp = (ra->size < rb->size) ? -1 : 1; + } else { + cmp = memcmp(ra->data, rb->data, ra->size); + } + + switch (op) { + case Py_EQ: return PyBool_FromLong(cmp == 0); + case Py_NE: return PyBool_FromLong(cmp != 0); + default: Py_RETURN_NOTIMPLEMENTED; + } +} + +static Py_hash_t ErlangRef_hash(ErlangRefObject *self) { + /* Simple hash of the serialized data */ + Py_hash_t h = 0; + for (size_t i = 0; i < self->size; i++) { + h = h * 31 + self->data[i]; + } + if (h == -1) h = -2; + return h; +} + +PyTypeObject ErlangRefType = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "erlang.Ref", + .tp_basicsize = sizeof(ErlangRefObject), + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_dealloc = (destructor)ErlangRef_dealloc, + .tp_repr = (reprfunc)ErlangRef_repr, + .tp_richcompare = ErlangRef_richcompare, + .tp_hash = (hashfunc)ErlangRef_hash, + .tp_doc = "Opaque Erlang reference", +}; + +/* ============================================================================ + * ErlangAtom - Python type for Erlang atoms + * + * Erlang atoms are symbols (like Ruby symbols or Lisp symbols). This type + * allows Python code to explicitly create atoms for message passing. + * ============================================================================ */ + +static void ErlangAtom_dealloc(ErlangAtomObject *self) { + if (self->name != NULL) { + PyMem_Free(self->name); + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject *ErlangAtom_repr(ErlangAtomObject *self) { + return PyUnicode_FromFormat("", self->name); +} + +static PyObject *ErlangAtom_str(ErlangAtomObject *self) { + return PyUnicode_FromString(self->name); +} + +static PyObject *ErlangAtom_richcompare(PyObject *a, PyObject *b, int op) { + if (!Py_IS_TYPE(b, &ErlangAtomType)) { + Py_RETURN_NOTIMPLEMENTED; + } + ErlangAtomObject *aa = (ErlangAtomObject *)a; + ErlangAtomObject *ab = (ErlangAtomObject *)b; + + int cmp = strcmp(aa->name, ab->name); + + switch (op) { + case Py_EQ: return PyBool_FromLong(cmp == 0); + case Py_NE: return PyBool_FromLong(cmp != 0); + case Py_LT: return PyBool_FromLong(cmp < 0); + case Py_LE: return PyBool_FromLong(cmp <= 0); + case Py_GT: return PyBool_FromLong(cmp > 0); + case Py_GE: return PyBool_FromLong(cmp >= 0); + default: Py_RETURN_NOTIMPLEMENTED; + } +} + +static Py_hash_t ErlangAtom_hash(ErlangAtomObject *self) { + /* Use Python's string hash for consistency */ + PyObject *str = PyUnicode_FromString(self->name); + if (str == NULL) return -1; + Py_hash_t h = PyObject_Hash(str); + Py_DECREF(str); + return h; +} + +PyTypeObject ErlangAtomType = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "erlang.Atom", + .tp_basicsize = sizeof(ErlangAtomObject), + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_dealloc = (destructor)ErlangAtom_dealloc, + .tp_repr = (reprfunc)ErlangAtom_repr, + .tp_str = (reprfunc)ErlangAtom_str, + .tp_richcompare = ErlangAtom_richcompare, + .tp_hash = (hashfunc)ErlangAtom_hash, + .tp_doc = "Erlang atom (symbol)", +}; + +/** + * erlang.atom(name) - Create an Erlang atom + * + * Args: name (string) + * Returns: ErlangAtomObject + */ +static PyObject *erlang_atom_impl(PyObject *self, PyObject *args) { + (void)self; + const char *name; + Py_ssize_t name_len; + + if (!PyArg_ParseTuple(args, "s#", &name, &name_len)) { + return NULL; + } + + /* Validate atom name length (Erlang limit is 255 bytes) */ + if (name_len > 255) { + PyErr_SetString(PyExc_ValueError, "Atom name too long (max 255 bytes)"); + return NULL; + } + + ErlangAtomObject *obj = PyObject_New(ErlangAtomObject, &ErlangAtomType); + if (obj == NULL) { + return NULL; + } + + obj->name = PyMem_Malloc(name_len + 1); + if (obj->name == NULL) { + Py_DECREF(obj); + return PyErr_NoMemory(); + } + + memcpy(obj->name, name, name_len); + obj->name[name_len] = '\0'; + + return (PyObject *)obj; +} + /* ============================================================================ * ScheduleMarker - marker type for explicit scheduler release * @@ -2002,7 +2164,9 @@ static PyObject *erlang_send_impl(PyObject *self, PyObject *args) { } /* Fire-and-forget send */ - if (!enif_send(NULL, &pid->pid, msg_env, msg)) { + int send_result = enif_send(NULL, &pid->pid, msg_env, msg); + + if (!send_result) { enif_free_env(msg_env); PyErr_SetString(get_current_process_error(), "Failed to send message: process may not exist"); @@ -2698,6 +2862,11 @@ static PyMethodDef ErlangModuleMethods[] = { "Call a registered Erlang function.\n\n" "Usage: erlang.call('func_name', arg1, arg2, ...)\n" "Returns: The result from the Erlang function."}, + {"_atom", erlang_atom_impl, METH_VARARGS, + "Internal: Create an Erlang atom.\n\n" + "Usage: erlang._atom('name')\n" + "Returns: An ErlangAtom object that converts to an Erlang atom.\n" + "NOTE: Use erlang.atom() wrapper instead for safety limits."}, {"send", erlang_send_impl, METH_VARARGS, "Send a message to an Erlang process (fire-and-forget).\n\n" "Usage: erlang.send(pid, term)\n" @@ -2840,6 +3009,16 @@ static int create_erlang_module(void) { return -1; } + /* Initialize ErlangRef type */ + if (PyType_Ready(&ErlangRefType) < 0) { + return -1; + } + + /* Initialize ErlangAtom type */ + if (PyType_Ready(&ErlangAtomType) < 0) { + return -1; + } + /* Initialize ScheduleMarker type */ if (PyType_Ready(&ScheduleMarkerType) < 0) { return -1; @@ -2911,6 +3090,22 @@ static int create_erlang_module(void) { return -1; } + /* Add ErlangRef type to module */ + Py_INCREF(&ErlangRefType); + if (PyModule_AddObject(module, "Ref", (PyObject *)&ErlangRefType) < 0) { + Py_DECREF(&ErlangRefType); + Py_DECREF(module); + return -1; + } + + /* Add ErlangAtom type to module */ + Py_INCREF(&ErlangAtomType); + if (PyModule_AddObject(module, "Atom", (PyObject *)&ErlangAtomType) < 0) { + Py_DECREF(&ErlangAtomType); + Py_DECREF(module); + return -1; + } + /* Add ScheduleMarker type to module */ Py_INCREF(&ScheduleMarkerType); if (PyModule_AddObject(module, "ScheduleMarker", (PyObject *)&ScheduleMarkerType) < 0) { diff --git a/c_src/py_convert.c b/c_src/py_convert.c index a0ccd48..1257e0e 100644 --- a/c_src/py_convert.c +++ b/c_src/py_convert.c @@ -339,6 +339,29 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) { return enif_make_pid(env, &pid_obj->pid); } + /* Handle ErlangRef → Erlang reference (deserialize from binary) */ + if (Py_IS_TYPE(obj, &ErlangRefType)) { + ErlangRefObject *ref_obj = (ErlangRefObject *)obj; + ERL_NIF_TERM result; + if (enif_binary_to_term(env, ref_obj->data, ref_obj->size, &result, 0) > 0) { + return result; + } + /* Failed to deserialize - return undefined */ + return enif_make_atom(env, "undefined"); + } + + /* Handle ErlangAtom → Erlang atom */ + if (Py_IS_TYPE(obj, &ErlangAtomType)) { + ErlangAtomObject *atom_obj = (ErlangAtomObject *)obj; + ERL_NIF_TERM atom_term; + /* Try existing atom first (no new allocation) */ + if (enif_make_existing_atom(env, atom_obj->name, &atom_term, ERL_NIF_LATIN1)) { + return atom_term; + } + /* Atom doesn't exist yet, create it */ + return enif_make_atom(env, atom_obj->name); + } + /* Handle NumPy arrays by converting to Python list first */ if (is_numpy_ndarray(obj)) { PyObject *tolist = PyObject_CallMethod(obj, "tolist", NULL); @@ -603,6 +626,31 @@ static PyObject *term_to_py(ErlNifEnv *env, ERL_NIF_TERM term) { return PyBuffer_from_resource(pybuf, pybuf); } + /* Check for reference - serialize to binary for round-trip. + * IMPORTANT: This must come AFTER all resource checks, because NIF + * resource terms also satisfy enif_is_ref() but should be handled + * as their specific resource type (PyObj, Channel, Buffer, etc). */ + if (enif_is_ref(env, term)) { + ErlNifBinary bin; + if (enif_term_to_binary(env, term, &bin)) { + ErlangRefObject *obj = PyObject_New(ErlangRefObject, &ErlangRefType); + if (obj == NULL) { + enif_release_binary(&bin); + return NULL; + } + obj->data = PyMem_Malloc(bin.size); + if (obj->data == NULL) { + Py_DECREF(obj); + enif_release_binary(&bin); + return NULL; + } + memcpy(obj->data, bin.data, bin.size); + obj->size = bin.size; + enif_release_binary(&bin); + return (PyObject *)obj; + } + } + /* Fallback: return None for unknown types */ Py_RETURN_NONE; } diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index af8281c..e3e89d0 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -144,6 +144,15 @@ static ErlNifPid g_global_shared_router; static bool g_global_shared_router_valid = false; static pthread_mutex_t g_global_router_mutex = PTHREAD_MUTEX_INITIALIZER; +/* Global shared worker for scalable I/O model. + * Used by dispatch_timer to send task_ready, ensuring process_ready_tasks + * is called after timer events. This centralizes the wakeup mechanism + * so both router-dispatched and worker-dispatched timers work correctly. + */ +static ErlNifPid g_global_shared_worker; +static bool g_global_shared_worker_valid = false; +static pthread_mutex_t g_global_worker_mutex = PTHREAD_MUTEX_INITIALIZER; + /* ============================================================================ * Per-Interpreter Reactor Cache * ============================================================================ @@ -1786,6 +1795,9 @@ ERL_NIF_TERM nif_dispatch_callback(ErlNifEnv *env, int argc, * dispatch_timer(LoopRef, CallbackId) -> ok * * Called when a timer expires. + * Adds timer event to pending queue and sends task_ready to worker + * to trigger process_ready_tasks. This ensures _run_once is called + * to handle the timer callback. */ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { @@ -1804,6 +1816,14 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc, event_loop_add_pending(loop, EVENT_TYPE_TIMER, callback_id, -1); + /* Note: We rely on event_loop_add_pending signaling the condition variable + * to wake up poll_events_wait. This works for both: + * - erlang.run() inside py:exec: Python loop is waiting on poll_events_wait + * - create_task: The worker is triggered by its own timer handling + * + * We don't send task_ready to the global worker here because dispatch_timer + * may be called on a loop different from the one the global worker manages. */ + return ATOM_OK; } @@ -2535,10 +2555,17 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, */ atomic_store(&loop->task_wake_pending, false); - /* OPTIMIZATION: Check task count BEFORE acquiring GIL - * This avoids expensive GIL acquisition when there's nothing to do */ + /* OPTIMIZATION: Check if there's any work BEFORE acquiring GIL + * This avoids expensive GIL acquisition when there's nothing to do. + * + * We need to check BOTH: + * - task_count: new tasks submitted via submit_task + * - pending_count: timer/FD events dispatched via dispatch_timer/handle_fd_event + * + * If either has work, we need to proceed and call _run_once. */ uint_fast64_t task_count = atomic_load(&loop->task_count); - if (task_count == 0) { + int pending_count = atomic_load(&loop->pending_count); + if (task_count == 0 && pending_count == 0) { return ATOM_OK; /* Nothing to process, skip GIL entirely */ } @@ -2598,10 +2625,10 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, pthread_mutex_unlock(&loop->task_queue_mutex); - /* If no tasks were dequeued, return early (no GIL needed) */ - if (num_tasks == 0) { - return ATOM_OK; - } + /* NOTE: We do NOT return early here even if num_tasks == 0. + * We may have pending timer/FD events that need _run_once to process. + * The first check (task_count == 0 && pending_count == 0) at the start + * of this function already handles the case where there's truly no work. */ /* ======================================================================== * PHASE 2: Process all tasks WITH GIL (Python operations) @@ -2670,8 +2697,37 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, /* Lazy loop creation (uvloop-style): create Python loop on first use */ if (!loop->py_loop_valid || loop->py_loop == NULL) { - /* Create new event loop via asyncio policy (triggers ErlangEventLoop.__init__) */ - PyObject *new_loop = PyObject_CallMethod(asyncio, "new_event_loop", NULL); + /* Create ErlangEventLoop directly instead of via asyncio.new_event_loop(). + * This is necessary because dirty NIF scheduler threads don't have the + * event loop policy set. asyncio.new_event_loop() would create a + * SelectorEventLoop instead of our ErlangEventLoop. */ + PyObject *erlang_loop_mod = PyImport_ImportModule("_erlang_impl._loop"); + if (erlang_loop_mod == NULL) { + PyErr_Clear(); + erlang_loop_mod = PyImport_ImportModule("erlang_loop"); + } + if (erlang_loop_mod == NULL) { + PyErr_Clear(); + for (int i = 0; i < num_tasks; i++) { + enif_free_env(tasks[i].term_env); + } + PyGILState_Release(gstate); + return make_error(env, "loop_module_import_failed"); + } + + PyObject *loop_class = PyObject_GetAttrString(erlang_loop_mod, "ErlangEventLoop"); + Py_DECREF(erlang_loop_mod); + if (loop_class == NULL) { + PyErr_Clear(); + for (int i = 0; i < num_tasks; i++) { + enif_free_env(tasks[i].term_env); + } + PyGILState_Release(gstate); + return make_error(env, "loop_class_not_found"); + } + + PyObject *new_loop = PyObject_CallNoArgs(loop_class); + Py_DECREF(loop_class); if (new_loop == NULL) { PyErr_Clear(); for (int i = 0; i < num_tasks; i++) { @@ -2681,7 +2737,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, return make_error(env, "loop_creation_failed"); } - /* Set as current event loop */ + /* Set as current event loop for this thread */ PyObject *set_result = PyObject_CallMethod(asyncio, "set_event_loop", "O", new_loop); Py_XDECREF(set_result); @@ -2850,6 +2906,7 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, enif_free_env(term_env); continue; } + /* Copy PID */ pid_obj->pid = caller_pid; /* Convert ref to Python */ @@ -2912,19 +2969,72 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc, /* NOTE: We don't DECREF asyncio and run_and_send here because they're cached * in the loop structure. They'll be freed when the loop is destroyed. */ - /* Run one iteration of the event loop only if coroutines were scheduled. - * For sync functions (like math.sqrt), results are sent directly via enif_send - * and we don't need to drive the Python event loop. + /* Check if Python loop is already running (e.g., from erlang.run() in py:exec). + * If so, skip calling _run_once - the running loop will handle events itself + * when poll_events_wait returns. Calling _run_once on a running loop is not + * safe because _run_once is not reentrant. */ + PyObject *is_running = PyObject_CallMethod(loop->py_loop, "is_running", NULL); + if (is_running != NULL) { + int running = PyObject_IsTrue(is_running); + Py_DECREF(is_running); + if (running) { + /* Loop is already running - just signal it and clean up. + * The pending events were already added by dispatch_timer/handle_fd_event, + * and the condition variable was signaled. The running loop will wake up + * and process them. */ + if (events_module != NULL) { + Py_XDECREF(old_running_loop); + Py_DECREF(events_module); + } + PyGILState_Release(gstate); + return ATOM_OK; + } + } else { + PyErr_Clear(); + } + + /* Run the event loop until there's no more immediate work. + * + * We need to keep calling _run_once because: + * 1. First call may schedule timers (coroutine hits await asyncio.sleep) + * 2. Timer dispatch adds callback to _ready queue + * 3. Next _run_once processes the _ready queue (resumes coroutine) + * 4. Coroutine may complete and send result, or schedule more work * - * Pass timeout_hint=0 so we don't block - we just added work that needs - * processing immediately. This is a uvloop-style optimization. */ - if (coros_scheduled > 0) { + * We loop until both pending_count AND Python's _ready queue are empty. + * Pass timeout_hint=0 so we don't block. */ + int current_pending = atomic_load(&loop->pending_count); + int py_ready = 0; + int iterations = 0; + const int max_iterations = 100; /* Safety limit */ + + /* Loop while there's work: new coroutines, pending events, OR ready callbacks */ + while ((coros_scheduled > 0 || current_pending > 0 || py_ready > 0) && iterations < max_iterations) { + iterations++; + PyObject *run_result = PyObject_CallMethod(loop->py_loop, "_run_once", "i", 0); if (run_result != NULL) { Py_DECREF(run_result); } else { + PyErr_Print(); PyErr_Clear(); + break; + } + + /* Check if Python loop has more ready callbacks */ + PyObject *ready_len = PyObject_CallMethod(loop->py_loop, "_get_ready_count", NULL); + py_ready = 0; + if (ready_len != NULL) { + py_ready = (int)PyLong_AsLong(ready_len); + Py_DECREF(ready_len); + if (PyErr_Occurred()) { + PyErr_Clear(); + py_ready = 0; + } } + + current_pending = atomic_load(&loop->pending_count); + coros_scheduled = 0; /* Already processed on first iteration */ } /* Restore original event loop context before releasing GIL */ @@ -5453,6 +5563,28 @@ ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc, return ATOM_OK; } +/** + * Set the shared worker PID for task_ready notifications. + * This worker receives task_ready messages from dispatch_timer and other + * event sources to trigger process_ready_tasks. + */ +ERL_NIF_TERM nif_set_shared_worker(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + ErlNifPid worker_pid; + if (!enif_get_local_pid(env, argv[0], &worker_pid)) { + return make_error(env, "invalid_pid"); + } + + pthread_mutex_lock(&g_global_worker_mutex); + g_global_shared_worker = worker_pid; + g_global_shared_worker_valid = true; + pthread_mutex_unlock(&g_global_worker_mutex); + + return ATOM_OK; +} + /* Python function: _poll_events(timeout_ms) -> num_events */ static PyObject *py_poll_events(PyObject *self, PyObject *args) { (void)self; @@ -6044,6 +6176,20 @@ static void loop_capsule_destructor(PyObject *capsule) { } } +/** + * Destructor for global loop capsules. + * Only releases reference - does NOT signal shutdown since the global + * loop is shared and managed by Erlang, not Python. + */ +static void global_loop_capsule_destructor(PyObject *capsule) { + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer( + capsule, LOOP_CAPSULE_NAME); + if (loop != NULL) { + /* Only release the reference, don't shutdown */ + enif_release_resource(loop); + } +} + /* Python function: _loop_new() -> capsule */ static PyObject *py_loop_new(PyObject *self, PyObject *args) { (void)self; @@ -6252,6 +6398,120 @@ static PyObject *py_set_global_loop_ref(PyObject *self, PyObject *args) { Py_RETURN_NONE; } +/** + * Python function: _clear_loop_ref(capsule) + * + * Clear the Python loop reference from an event loop capsule. + * Should be called when the Python loop is closed to allow + * creating a new loop later. + */ +static PyObject *py_clear_loop_ref(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + /* Clear the Python loop reference */ + if (loop->py_loop != NULL) { + Py_DECREF(loop->py_loop); + loop->py_loop = NULL; + } + loop->py_loop_valid = false; + + Py_RETURN_NONE; +} + +/* Python function: _get_global_loop_capsule() -> capsule + * + * Returns a capsule for the global interpreter event loop. + * This is the loop created by Erlang that has has_worker=true. + * Python's ErlangEventLoop should use this capsule instead of creating + * a new one, so that timers and FD events are properly dispatched to + * the worker which triggers process_ready_tasks. + */ +static PyObject *py_get_global_loop_capsule(PyObject *self, PyObject *args) { + (void)self; + (void)args; + + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { + PyErr_SetString(PyExc_RuntimeError, "Global event loop not initialized"); + return NULL; + } + + /* Keep the resource alive while capsule exists */ + enif_keep_resource(loop); + + return PyCapsule_New(loop, LOOP_CAPSULE_NAME, global_loop_capsule_destructor); +} + +/** + * Python function: _has_loop_ref(capsule) -> bool + * + * Check if a loop capsule has an ACTIVE Python loop reference. + * Returns True only if there's a valid loop that is currently RUNNING. + * This prevents multiple concurrent loops while allowing sequential + * loop replacement (e.g., between test cases). + * + * The key insight is that the event confusion bug occurs when multiple + * loops are running simultaneously. A non-running loop (even if not + * explicitly closed) can be safely replaced. + */ +static PyObject *py_has_loop_ref(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + if (loop->py_loop_valid && loop->py_loop != NULL) { + /* Check if the existing loop is running - only block if running */ + PyObject *is_running = PyObject_CallMethod(loop->py_loop, "is_running", NULL); + if (is_running != NULL) { + int running = PyObject_IsTrue(is_running); + Py_DECREF(is_running); + if (running) { + /* Loop is still running - prevent concurrent loop creation */ + Py_RETURN_TRUE; + } + } else { + /* Error calling is_running - clear error and check is_closed as fallback */ + PyErr_Clear(); + } + + /* Loop exists but is not running - check if closed for cleanup */ + PyObject *is_closed = PyObject_CallMethod(loop->py_loop, "is_closed", NULL); + if (is_closed != NULL) { + int closed = PyObject_IsTrue(is_closed); + Py_DECREF(is_closed); + if (closed) { + /* Loop is closed, clean up reference */ + Py_DECREF(loop->py_loop); + loop->py_loop = NULL; + loop->py_loop_valid = false; + } + } else { + PyErr_Clear(); + } + /* Not running, allow replacement */ + Py_RETURN_FALSE; + } + Py_RETURN_FALSE; +} + /* Python function: _run_once_native_for(capsule, timeout_ms) -> [(callback_id, event_type), ...] */ static PyObject *py_run_once_for(PyObject *self, PyObject *args) { (void)self; @@ -6653,14 +6913,27 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { return NULL; } - if (!event_loop_ensure_router(loop)) { + /* For timer scheduling, we need to use the global interpreter loop which + * has the worker process. The capsule's loop may be a Python-created loop + * that doesn't have has_worker set, which would cause timer dispatches + * to go to the router instead of the worker, breaking the event loop flow. + * + * The global loop (created by Erlang) has has_worker=true and its worker + * properly triggers process_ready_tasks after timer dispatch. */ + erlang_event_loop_t *target_loop = get_interpreter_event_loop(); + if (target_loop == NULL) { + /* Fall back to capsule's loop if global not available */ + target_loop = loop; + } + + if (!event_loop_ensure_router(target_loop)) { PyErr_SetString(PyExc_RuntimeError, "Event loop has no router or worker"); return NULL; } if (delay_ms < 0) delay_ms = 0; - uint64_t timer_ref_id = atomic_fetch_add(&loop->next_callback_id, 1); + uint64_t timer_ref_id = atomic_fetch_add(&target_loop->next_callback_id, 1); ErlNifEnv *msg_env = enif_alloc_env(); if (msg_env == NULL) { @@ -6668,8 +6941,8 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { return NULL; } - /* Include loop resource in message so router dispatches to correct loop */ - ERL_NIF_TERM loop_term = enif_make_resource(msg_env, loop); + /* Include the target loop resource in message so dispatch goes to correct loop */ + ERL_NIF_TERM loop_term = enif_make_resource(msg_env, target_loop); ERL_NIF_TERM msg = enif_make_tuple5( msg_env, @@ -6681,7 +6954,7 @@ static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { ); /* Use worker_pid when available for scalable I/O */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; + ErlNifPid *target_pid = target_loop->has_worker ? &target_loop->worker_pid : &target_loop->router_pid; int send_result = enif_send(NULL, target_pid, msg_env, msg); enif_free_env(msg_env); @@ -6861,6 +7134,9 @@ static PyMethodDef PyEventLoopMethods[] = { {"_cancel_timer", py_cancel_timer, METH_VARARGS, "Cancel an Erlang timer"}, /* Handle-based API (takes explicit loop capsule) */ {"_loop_new", py_loop_new, METH_NOARGS, "Create a new event loop, returns capsule"}, + {"_get_global_loop_capsule", py_get_global_loop_capsule, METH_NOARGS, "Get capsule for global event loop"}, + {"_has_loop_ref", py_has_loop_ref, METH_VARARGS, "Check if loop capsule has Python loop reference"}, + {"_clear_loop_ref", py_clear_loop_ref, METH_VARARGS, "Clear Python loop reference from C struct"}, {"_loop_destroy", py_loop_destroy, METH_VARARGS, "Destroy an event loop"}, {"_set_loop_ref", py_set_loop_ref, METH_VARARGS, "Store Python loop reference in C struct"}, {"_set_global_loop_ref", py_set_global_loop_ref, METH_VARARGS, "Store Python loop reference in global loop"}, diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index 52762ed..1d4c5a8 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -934,6 +934,22 @@ int py_event_loop_init_python(ErlNifEnv *env, erlang_event_loop_t *loop); ERL_NIF_TERM nif_set_python_event_loop(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +/** + * @brief Set the shared router PID for per-loop created loops + * + * NIF: set_shared_router(RouterPid) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Set the shared worker PID for task_ready notifications + * + * NIF: set_shared_worker(WorkerPid) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_set_shared_worker(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + /** * @brief Create and register the py_event_loop Python module * diff --git a/c_src/py_nif.c b/c_src/py_nif.c index db0bcd4..422faca 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -6583,6 +6583,7 @@ static ErlNifFunc nif_funcs[] = { {"set_python_event_loop", 1, nif_set_python_event_loop, 0}, {"set_isolation_mode", 1, nif_set_isolation_mode, 0}, {"set_shared_router", 1, nif_set_shared_router, 0}, + {"set_shared_worker", 1, nif_set_shared_worker, 0}, /* ASGI optimizations */ {"asgi_build_scope", 1, nif_asgi_build_scope, ERL_NIF_DIRTY_JOB_IO_BOUND}, diff --git a/c_src/py_nif.h b/c_src/py_nif.h index 20e8418..3aa4d7f 100644 --- a/c_src/py_nif.h +++ b/c_src/py_nif.h @@ -1373,6 +1373,21 @@ extern PyObject *ProcessErrorException; typedef struct { PyObject_HEAD; ErlNifPid pid; } ErlangPidObject; extern PyTypeObject ErlangPidType; +/** @brief Python type for opaque Erlang references (stored as serialized binary) */ +typedef struct { + PyObject_HEAD; + unsigned char *data; /* Serialized reference data */ + size_t size; /* Size of serialized data */ +} ErlangRefObject; +extern PyTypeObject ErlangRefType; + +/** @brief Python type for Erlang atoms (stored as string) */ +typedef struct { + PyObject_HEAD; + char *name; /* Atom name (null-terminated) */ +} ErlangAtomObject; +extern PyTypeObject ErlangAtomType; + /** @brief Cached numpy.ndarray type for fast isinstance checks (NULL if numpy unavailable) */ extern PyObject *g_numpy_ndarray_type; diff --git a/priv/_erlang_impl/__init__.py b/priv/_erlang_impl/__init__.py index ee56b0b..3be6b2f 100644 --- a/priv/_erlang_impl/__init__.py +++ b/priv/_erlang_impl/__init__.py @@ -44,6 +44,7 @@ loop.run_until_complete(main()) """ +import os import sys import asyncio import time @@ -82,12 +83,60 @@ 'Channel', 'reply', 'ChannelClosed', + 'atom', ] +# Atom caching with configurable limit to prevent BEAM atom table exhaustion. +# The BEAM VM has a hard limit (~1M atoms) and crashes when exceeded. +# This provides a Python-level safety valve well under that limit. +_MAX_USER_ATOMS = int(os.environ.get('ERLANG_PYTHON_MAX_ATOMS', '10000')) +_atom_cache = {} + # Re-export for uvloop API compatibility EventLoopPolicy = ErlangEventLoopPolicy +def atom(name): + """Create an Erlang atom with safety limit. + + Atoms in Erlang are permanent and the BEAM VM has a hard limit + (~1M atoms). This function provides a Python-level cache with + a configurable limit to prevent atom table exhaustion from + untrusted Python code. + + Args: + name: The atom name as a string. + + Returns: + An ErlangAtom object that converts to an Erlang atom. + + Raises: + RuntimeError: If the atom limit is reached. + + The limit can be configured via the ERLANG_PYTHON_MAX_ATOMS + environment variable (default: 10000). + + Example: + >>> import erlang + >>> ok = erlang.atom('ok') + >>> error = erlang.atom('error') + """ + if name in _atom_cache: + return _atom_cache[name] + + if len(_atom_cache) >= _MAX_USER_ATOMS: + raise RuntimeError( + f"Atom limit ({_MAX_USER_ATOMS}) reached. " + "Set ERLANG_PYTHON_MAX_ATOMS env var to increase." + ) + + # Import erlang module to access internal _atom function + import erlang as _erlang + result = _erlang._atom(name) + _atom_cache[name] = result + return result + + def get_event_loop_policy() -> ErlangEventLoopPolicy: """Get an Erlang event loop policy instance. diff --git a/priv/_erlang_impl/_loop.py b/priv/_erlang_impl/_loop.py index 1329bf7..e75013b 100644 --- a/priv/_erlang_impl/_loop.py +++ b/priv/_erlang_impl/_loop.py @@ -70,7 +70,7 @@ class ErlangEventLoop(asyncio.AbstractEventLoop): # Use __slots__ for faster attribute access and reduced memory __slots__ = ( - '_pel', '_loop_capsule', + '_pel', '_loop_capsule', '_uses_global_capsule', '_readers', '_writers', '_callbacks_by_cid', # callback_id -> (callback, args, event_type) for O(1) dispatch '_fd_resources', # fd -> fd_key (shared fd_resource_t per fd) @@ -114,8 +114,33 @@ def __init__(self): # Fallback for testing without actual NIF self._pel = _MockNifModule() - # Create isolated loop capsule - self._loop_capsule = self._pel._loop_new() + # Use the global loop capsule instead of creating a new one. + # The global loop (created by Erlang) has has_worker=true, which ensures + # that timers and FD events are dispatched to the worker process. + # The worker triggers process_ready_tasks which calls _run_once. + # Without this, Python-created loops would have their own pending queues + # that never get processed because the worker doesn't know about them. + self._uses_global_capsule = False + if hasattr(self._pel, '_get_global_loop_capsule'): + try: + self._loop_capsule = self._pel._get_global_loop_capsule() + self._uses_global_capsule = True + # Check if another loop already owns this capsule. + # Only one ErlangEventLoop per interpreter is supported. + if hasattr(self._pel, '_has_loop_ref') and self._pel._has_loop_ref(self._loop_capsule): + raise RuntimeError( + "An ErlangEventLoop already exists for this interpreter. " + "Only one loop per interpreter is supported." + ) + except RuntimeError as e: + # Re-raise our "already exists" error + if "already exists" in str(e): + raise + # Fall back to creating a new loop if global not available + self._loop_capsule = self._pel._loop_new() + self._uses_global_capsule = False + else: + self._loop_capsule = self._pel._loop_new() # Store reference to this Python loop in the C struct # This enables process_ready_tasks to access the loop directly @@ -304,11 +329,21 @@ def close(self): self._default_executor.shutdown(wait=True) self._default_executor = None - # Destroy loop capsule - try: - self._pel._loop_destroy(self._loop_capsule) - except Exception: - pass + # Clear loop ref to allow creating a new loop later. + # This is important for the global capsule case where the capsule + # persists but a new Python loop may be created. + if self._loop_capsule is not None and hasattr(self._pel, '_clear_loop_ref'): + try: + self._pel._clear_loop_ref(self._loop_capsule) + except Exception: + pass + + # Destroy loop capsule (but not if using shared global capsule) + if not self._uses_global_capsule: + try: + self._pel._loop_destroy(self._loop_capsule) + except Exception: + pass self._loop_capsule = None async def shutdown_asyncgens(self): @@ -1058,6 +1093,13 @@ def _dispatch(self, callback_id, event_type): if not handle._cancelled: self._ready_append(handle) + def _get_ready_count(self): + """Return the number of ready callbacks. + + Used by C code to check if there's more work to do. + """ + return len(self._ready) + def _check_closed(self): """Raise an error if the loop is closed.""" if self._closed: @@ -1281,16 +1323,26 @@ async def _run_and_send(coro, caller_pid, ref): ref: A reference to include in the result message The result message format is: - ('async_result', ref, ('ok', result)) - on success - ('async_result', ref, ('error', error_str)) - on failure + (async_result, ref, (ok, result)) - on success + (async_result, ref, (error, error_str)) - on failure + + Note: Uses cached atom() to create atoms for message keys, since Python + strings become Erlang binaries but the await function expects atoms. """ import erlang + from . import atom # Use cached version from _erlang_impl + + # Create atoms for message keys (strings become binaries, await expects atoms) + async_result = atom('async_result') + ok = atom('ok') + error = atom('error') + try: result = await coro - erlang.send(caller_pid, ('async_result', ref, ('ok', result))) + erlang.send(caller_pid, (async_result, ref, (ok, result))) except asyncio.CancelledError: - erlang.send(caller_pid, ('async_result', ref, ('error', 'cancelled'))) + erlang.send(caller_pid, (async_result, ref, (error, 'cancelled'))) except Exception as e: import traceback tb = traceback.format_exc() - erlang.send(caller_pid, ('async_result', ref, ('error', f'{type(e).__name__}: {e}\n{tb}'))) + erlang.send(caller_pid, (async_result, ref, (error, f'{type(e).__name__}: {e}\n{tb}'))) diff --git a/src/py.erl b/src/py.erl index 72e7b15..f748487 100644 --- a/src/py.erl +++ b/src/py.erl @@ -1040,11 +1040,18 @@ activate_venv_with_site_packages(VenvBin, SitePackages) -> {ok, _} = eval(<<"setattr(__import__('sys'), '_active_venv', vp)">>, #{vp => VenvBin}), {ok, _} = eval(<<"setattr(__import__('sys'), '_venv_site_packages', sp)">>, #{sp => SitePackages}), %% Add site-packages and process .pth files (editable installs) - ok = exec(<<"import site as _site, sys as _sys\n" - "_b = frozenset(_sys.path)\n" - "_site.addsitedir(_sys._venv_site_packages)\n" - "_sys.path[:] = [p for p in _sys.path if p not in _b] + [p for p in _sys.path if p in _b]\n" - "del _site, _sys, _b\n">>), + %% Note: We embed the site-packages path directly since exec doesn't support + %% variables and sys attributes may not persist across calls in subinterpreters + SitePackagesStr = binary_to_list(SitePackages), + ExecCode = iolist_to_binary([ + <<"import site as _site, sys as _sys\n">>, + <<"_sp = '">>, escape_python_string(SitePackagesStr), <<"'\n">>, + <<"_b = frozenset(_sys.path)\n">>, + <<"_site.addsitedir(_sp)\n">>, + <<"_sys.path[:] = [p for p in _sys.path if p not in _b] + [p for p in _sys.path if p in _b]\n">>, + <<"del _site, _sys, _b, _sp\n">> + ]), + ok = exec(ExecCode), ok; {ok, false} -> {error, {invalid_venv, SitePackages}}; @@ -1052,6 +1059,13 @@ activate_venv_with_site_packages(VenvBin, SitePackages) -> Error end. +%% @private Escape a string for embedding in Python code +escape_python_string(Str) -> + lists:flatmap(fun($') -> "\\'"; + ($\\) -> "\\\\"; + (C) -> [C] + end, Str). + %% @doc Deactivate the current virtual environment. %% Restores sys.path to its original state. -spec deactivate_venv() -> ok | {error, term()}. diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index 3d2803a..de7904b 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -290,6 +290,8 @@ init([]) -> {ok, WorkerPid} = py_event_worker:start_link(WorkerId, LoopRef), ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), ok = py_nif:event_loop_set_id(LoopRef, WorkerId), + %% Set global shared worker for dispatch_timer task_ready notifications + ok = py_nif:set_shared_worker(WorkerPid), %% Also start legacy router for backward compatibility {ok, RouterPid} = py_event_router:start_link(LoopRef), @@ -356,7 +358,9 @@ handle_call(get_loop, _From, #state{loop_ref = undefined} = State) -> {ok, WorkerPid} = py_event_worker:start_link(WorkerId, LoopRef), ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid), ok = py_nif:event_loop_set_id(LoopRef, WorkerId), + ok = py_nif:set_shared_worker(WorkerPid), {ok, RouterPid} = py_event_router:start_link(LoopRef), + ok = py_nif:set_shared_router(RouterPid), ok = py_nif:set_python_event_loop(LoopRef), NewState = State#state{ loop_ref = LoopRef, diff --git a/src/py_event_worker.erl b/src/py_event_worker.erl index b1aa877..efa455e 100644 --- a/src/py_event_worker.erl +++ b/src/py_event_worker.erl @@ -45,10 +45,14 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({select, FdRes, _Ref, ready_input}, State) -> py_nif:handle_fd_event_and_reselect(FdRes, read), + %% Trigger event processing after FD event dispatch + self() ! task_ready, {noreply, State}; handle_info({select, FdRes, _Ref, ready_output}, State) -> py_nif:handle_fd_event_and_reselect(FdRes, write), + %% Trigger event processing after FD event dispatch + self() ! task_ready, {noreply, State}; handle_info({start_timer, _LoopRef, DelayMs, CallbackId, TimerRef}, State) -> @@ -80,6 +84,9 @@ handle_info({timeout, TimerRef}, State) -> {_ErlTimerRef, CallbackId} -> py_nif:dispatch_timer(LoopRef, CallbackId), NewTimers = maps:remove(TimerRef, Timers), + %% Trigger event processing after timer dispatch + %% This ensures _run_once is called to handle the timer callback + self() ! task_ready, {noreply, State#state{timers = NewTimers}} end; @@ -127,6 +134,9 @@ drain_tasks_loop(LoopRef) -> ok; {error, py_loop_not_set} -> ok; + {error, task_queue_not_initialized} -> + %% Loop is being destroyed, ignore + ok; {error, Reason} -> error_logger:warning_msg("py_event_worker: task processing failed: ~p~n", [Reason]), ok diff --git a/src/py_nif.erl b/src/py_nif.erl index 917aef3..a6c8836 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -152,6 +152,7 @@ set_python_event_loop/1, set_isolation_mode/1, set_shared_router/1, + set_shared_worker/1, %% ASGI optimizations asgi_build_scope/1, asgi_run/5, @@ -1060,6 +1061,13 @@ set_isolation_mode(_Mode) -> set_shared_router(_RouterPid) -> ?NIF_STUB. +%% @doc Set the shared worker PID for task_ready notifications. +%% The worker receives task_ready messages from dispatch_timer and other +%% event sources to trigger process_ready_tasks. +-spec set_shared_worker(pid()) -> ok | {error, term()}. +set_shared_worker(_WorkerPid) -> + ?NIF_STUB. + %%% ============================================================================ %%% ASGI Optimizations %%% ============================================================================ diff --git a/test/py_async_task_SUITE.erl b/test/py_async_task_SUITE.erl index db14c0a..9548b1f 100644 --- a/test/py_async_task_SUITE.erl +++ b/test/py_async_task_SUITE.erl @@ -17,6 +17,7 @@ test_async_coroutine/1, test_async_with_args/1, test_async_sleep/1, + test_timer_event_triggering/1, %% Error handling tests test_async_error/1, test_invalid_module/1, @@ -54,6 +55,7 @@ all() -> test_async_coroutine, test_async_with_args, test_async_sleep, + test_timer_event_triggering, %% Error handling tests test_async_error, test_invalid_module, @@ -176,6 +178,45 @@ test_async_sleep(_Config) -> true = abs(R - float(N)) < 0.0001 end, Results). +test_timer_event_triggering(_Config) -> + %% Test that timer events properly trigger event loop processing. + %% + %% This verifies the fix for the timer event triggering issue where + %% asyncio.sleep would never complete because dispatch_timer added + %% events to pending_head but nothing called _run_once to process them. + %% + %% The fix ensures that after dispatching timer/FD events, the worker + %% sends task_ready to itself to trigger _run_once processing. + %% + %% Uses test_async_task module which has async functions with asyncio.sleep. + + %% Test simple_task which uses asyncio.sleep(0.01) + ct:log("Testing simple async task with asyncio.sleep..."), + Ref1 = py_event_loop:create_task(test_async_task, simple_task, []), + Result1 = py_event_loop:await(Ref1, 5000), + ct:log("simple_task result: ~p", [Result1]), + {ok, <<"hello from async">>} = Result1, + + %% Test task_with_args which uses asyncio.sleep(0.01) + ct:log("Testing async task with args and asyncio.sleep..."), + Ref2 = py_event_loop:create_task(test_async_task, task_with_args, [10, 32]), + Result2 = py_event_loop:await(Ref2, 5000), + ct:log("task_with_args result: ~p", [Result2]), + {ok, 42} = Result2, + + %% Test concurrent async tasks with sleep + ct:log("Testing concurrent async tasks with asyncio.sleep..."), + Refs = [py_event_loop:create_task(test_async_task, task_with_args, [N, N]) + || N <- lists:seq(1, 5)], + Results = [py_event_loop:await(Ref, 5000) || Ref <- Refs], + ct:log("Concurrent results: ~p", [Results]), + + %% Verify all completed with correct values (N + N) + Expected = [{ok, N * 2} || N <- lists:seq(1, 5)], + Expected = Results, + + ct:log("timer_event_triggering test: all asyncio.sleep operations completed"). + %% ============================================================================ %% Error handling tests %% ============================================================================