Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

### Added

- **Automatic Env Reuse for Event Loop Tasks** - Functions defined via `py:exec(Ctx, Code)`
can now be called directly using `py_event_loop:run/3,4`, `create_task/3,4`, and `spawn_task/3,4`
without manual env passing. The process-local environment is automatically detected and used
for function lookup when targeting `__main__` module.

- **PyBuffer API** - Zero-copy WSGI input buffer for streaming HTTP bodies
- `py_buffer:new/0,1` - Create buffer (chunked or with content_length)
- `py_buffer:write/2` - Append data, signals waiting Python readers
Expand Down
211 changes: 207 additions & 4 deletions c_src/py_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@
*/

/** @brief Name for the PyCapsule storing event loop pointer */
static const char *EVENT_LOOP_CAPSULE_NAME = "erlang_python.event_loop";

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Free-threaded Python 3.13t

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Documentation

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.12 / ubuntu-24.04

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Lint

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.13 / ubuntu-24.04

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

/** @brief Module attribute name for storing the event loop */
static const char *EVENT_LOOP_ATTR_NAME = "_loop";

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Free-threaded Python 3.13t

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Documentation

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.12 / ubuntu-24.04

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Lint

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.13 / ubuntu-24.04

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

/* ============================================================================
* Module State Structure
Expand Down Expand Up @@ -502,6 +502,18 @@
}
loop->namespaces_head = NULL;

/* Clean up PID-to-env mappings */
pid_env_mapping_t *mapping = loop->pid_env_head;
while (mapping != NULL) {
pid_env_mapping_t *next = mapping->next;
if (mapping->env != NULL) {
enif_release_resource(mapping->env);
}
enif_free(mapping);
mapping = next;
}
loop->pid_env_head = NULL;

pthread_mutex_unlock(&loop->namespaces_mutex);
PyGILState_Release(gstate);
} else {
Expand All @@ -517,6 +529,18 @@
}
loop->namespaces_head = NULL;

/* Clean up PID-to-env mappings */
pid_env_mapping_t *mapping = loop->pid_env_head;
while (mapping != NULL) {
pid_env_mapping_t *next = mapping->next;
if (mapping->env != NULL) {
enif_release_resource(mapping->env);
}
enif_free(mapping);
mapping = next;
}
loop->pid_env_head = NULL;

pthread_mutex_unlock(&loop->namespaces_mutex);
}
pthread_mutex_destroy(&loop->namespaces_mutex);
Expand Down Expand Up @@ -1128,6 +1152,7 @@

/* Initialize per-process namespace registry */
loop->namespaces_head = NULL;
loop->pid_env_head = NULL;
if (pthread_mutex_init(&loop->namespaces_mutex, NULL) != 0) {
pthread_mutex_destroy(&loop->task_queue_mutex);
enif_ioq_destroy(loop->task_queue);
Expand Down Expand Up @@ -1239,7 +1264,7 @@
if (!enif_get_atom(env, argv[1], atom_buf, sizeof(atom_buf), ERL_NIF_LATIN1)) {
return make_error(env, "invalid_id");
}
strncpy(loop->loop_id, atom_buf, sizeof(loop->loop_id) - 1);

Check warning on line 1267 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘__builtin_strncpy’ output may be truncated copying 63 bytes from a string of length 63 [-Wstringop-truncation]

Check warning on line 1267 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘__builtin_strncpy’ output may be truncated copying 63 bytes from a string of length 63 [-Wstringop-truncation]
loop->loop_id[sizeof(loop->loop_id) - 1] = '\0';
} else {
size_t copy_len = id_bin.size < sizeof(loop->loop_id) - 1 ?
Expand Down Expand Up @@ -2501,6 +2526,164 @@
return ATOM_OK;
}

/* ============================================================================
* PID-to-Env Mapping Helpers
* ============================================================================ */

/**
* @brief Register or update an env mapping for a PID
*
* Increments refcount if mapping exists, otherwise creates new mapping.
* Calls enif_keep_resource to keep the env alive.
*
* @param loop Event loop containing the mapping registry
* @param pid PID to register
* @param env_res Environment resource (will be kept via enif_keep_resource)
* @return true on success, false on allocation failure
*/
static bool register_pid_env(erlang_event_loop_t *loop, const ErlNifPid *pid,
void *env_res) {
pthread_mutex_lock(&loop->namespaces_mutex);

/* Check if mapping already exists */
pid_env_mapping_t *mapping = loop->pid_env_head;
while (mapping != NULL) {
if (enif_compare_pids(&mapping->pid, pid) == 0) {
/* Found existing mapping - increment refcount */
mapping->refcount++;
pthread_mutex_unlock(&loop->namespaces_mutex);
return true;
}
mapping = mapping->next;
}

/* Create new mapping */
mapping = enif_alloc(sizeof(pid_env_mapping_t));
if (mapping == NULL) {
pthread_mutex_unlock(&loop->namespaces_mutex);
return false;
}

mapping->pid = *pid;
mapping->env = env_res;
mapping->refcount = 1;
mapping->next = loop->pid_env_head;
loop->pid_env_head = mapping;

/* Keep the resource alive */
enif_keep_resource(env_res);

pthread_mutex_unlock(&loop->namespaces_mutex);
return true;
}

/**
* @brief Look up env for a PID
*
* @param loop Event loop containing the mapping registry
* @param pid PID to look up
* @return Environment resource or NULL if not found
*/
static void *lookup_pid_env(erlang_event_loop_t *loop, const ErlNifPid *pid) {
pthread_mutex_lock(&loop->namespaces_mutex);

pid_env_mapping_t *mapping = loop->pid_env_head;
while (mapping != NULL) {
if (enif_compare_pids(&mapping->pid, pid) == 0) {
void *env_res = mapping->env;
pthread_mutex_unlock(&loop->namespaces_mutex);
return env_res;
}
mapping = mapping->next;
}

pthread_mutex_unlock(&loop->namespaces_mutex);
return NULL;
}

/**
* submit_task_with_env(LoopRef, CallerPid, Ref, Module, Func, Args, Kwargs, EnvRef) -> ok | {error, Reason}
*
* Like submit_task but registers the process-local env for the caller PID.
* The env's globals dict is used for function lookup, allowing functions
* defined via py:exec() to be called from the event loop.
*
* Note: The env resource is stored in a PID->env mapping, not serialized.
* This avoids the issue of resource references not surviving serialization.
*/
ERL_NIF_TERM nif_submit_task_with_env(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;

erlang_event_loop_t *loop;
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
(void **)&loop)) {
return make_error(env, "invalid_loop");
}

if (!loop->task_queue_initialized) {
return make_error(env, "task_queue_not_initialized");
}

/* Validate caller_pid */
ErlNifPid caller_pid;
if (!enif_get_local_pid(env, argv[1], &caller_pid)) {
return make_error(env, "invalid_caller_pid");
}

/* Get and register the env resource */
void *env_res;
if (!enif_get_resource(env, argv[7], get_env_resource_type(), &env_res)) {
return make_error(env, "invalid_env");
}

/* Register the env for this PID (increments refcount if exists) */
if (!register_pid_env(loop, &caller_pid, env_res)) {
return make_error(env, "env_registration_failed");
}

/* Create task tuple: {CallerPid, Ref, Module, Func, Args, Kwargs}
* Note: We use 6-tuple, NOT 7-tuple. The env is looked up by PID. */
ERL_NIF_TERM task_tuple = enif_make_tuple6(env,
argv[1], argv[2], argv[3], argv[4], argv[5], argv[6]);

/* Serialize to binary */
ErlNifBinary task_bin;
if (!enif_term_to_binary(env, task_tuple, &task_bin)) {
return make_error(env, "serialization_failed");
}

/* Thread-safe enqueue */
pthread_mutex_lock(&loop->task_queue_mutex);
int enq_result = enif_ioq_enq_binary(loop->task_queue, &task_bin, 0);
pthread_mutex_unlock(&loop->task_queue_mutex);

if (enq_result != 1) {
enif_release_binary(&task_bin);
return make_error(env, "enqueue_failed");
}

/* Increment task count */
atomic_fetch_add(&loop->task_count, 1);

/* Coalesced wakeup (uvloop-style) */
if (loop->has_worker) {
if (!atomic_exchange(&loop->task_wake_pending, true)) {
ErlNifEnv *msg_env = enif_alloc_env();
if (msg_env != NULL) {
if (ATOM_TASK_READY == 0) {
ATOM_TASK_READY = enif_make_atom(msg_env, "task_ready");
}
ERL_NIF_TERM msg = enif_make_atom(msg_env, "task_ready");
enif_send(NULL, &loop->worker_pid, msg_env, msg);
enif_free_env(msg_env);
}
}
}

return ATOM_OK;
}

/**
* Maximum tasks to dequeue in one batch before acquiring GIL.
* This bounds memory usage while still amortizing GIL acquisition cost.
Expand Down Expand Up @@ -2792,7 +2975,8 @@
/* Extract: {CallerPid, Ref, Module, Func, Args, Kwargs} */
int arity;
const ERL_NIF_TERM *tuple_elems;
if (!enif_get_tuple(term_env, task_term, &arity, &tuple_elems) || arity != 6) {
if (!enif_get_tuple(term_env, task_term, &arity, &tuple_elems) ||
arity != 6) {
enif_free_env(term_env);
continue;
}
Expand All @@ -2810,6 +2994,9 @@
continue;
}

/* Look up env by PID (registered via submit_task_with_env) */
py_env_resource_t *task_env = (py_env_resource_t *)lookup_pid_env(loop, &caller_pid);

/* Convert module/func to C strings */
char *module_name = enif_alloc(module_bin.size + 1);
char *func_name = enif_alloc(func_bin.size + 1);
Expand All @@ -2824,11 +3011,27 @@
memcpy(func_name, func_bin.data, func_bin.size);
func_name[func_bin.size] = '\0';

/* Look up namespace for caller process (only exists if they called exec/eval) */
/* Look up namespace for caller process (used for reentrant calls) */
process_namespace_t *ns = lookup_process_namespace(loop, &caller_pid);

/* Look up function (checks process namespace for __main__, then cache/import) */
PyObject *func = get_function_for_task(loop, ns, module_name, func_name);
/* Look up function - check task_env first, then process namespace, then import */
PyObject *func = NULL;

/* First, check the passed env's globals (from py:exec) */
if (task_env != NULL && task_env->globals != NULL) {
if (strcmp(module_name, "__main__") == 0 ||
strcmp(module_name, "_process_") == 0) {
func = PyDict_GetItemString(task_env->globals, func_name);
if (func != NULL) {
Py_INCREF(func);
}
}
}

/* Fallback to process namespace and cache/import */
if (func == NULL) {
func = get_function_for_task(loop, ns, module_name, func_name);
}

enif_free(module_name);
enif_free(func_name);
Expand Down
39 changes: 39 additions & 0 deletions c_src/py_event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ typedef struct process_namespace {
struct process_namespace *next;
} process_namespace_t;

/**
* @struct pid_env_mapping_t
* @brief Mapping from Erlang PID to process-local Python environment
*
* Used to pass env resources across the task queue without serialization.
* The env is kept alive by enif_keep_resource until the mapping is removed.
*/
typedef struct pid_env_mapping {
/** @brief PID of the owning Erlang process */
ErlNifPid pid;

/** @brief Environment resource (kept via enif_keep_resource) */
void *env; /* py_env_resource_t* - forward declared to avoid header deps */

/** @brief Reference count for this mapping (multiple tasks may use it) */
int refcount;

/** @brief Next mapping in linked list */
struct pid_env_mapping *next;
} pid_env_mapping_t;

/** @brief Event types for pending callbacks */
typedef enum {
EVENT_TYPE_READ = 1,
Expand Down Expand Up @@ -372,6 +393,12 @@ typedef struct erlang_event_loop {

/** @brief Mutex protecting namespace registry */
pthread_mutex_t namespaces_mutex;

/* ========== PID-to-Env Mapping Registry ========== */
/* Protected by namespaces_mutex (shared with namespace registry) */

/** @brief Head of PID-to-env mapping linked list */
pid_env_mapping_t *pid_env_head;
} erlang_event_loop_t;

/* ============================================================================
Expand Down Expand Up @@ -624,6 +651,18 @@ ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc,
ERL_NIF_TERM nif_submit_task(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Submit an async task with process-local env (thread-safe)
*
* Like submit_task but includes an env resource reference. The env's globals
* dict is used for function lookup, allowing functions defined via py:exec()
* to be called from the event loop.
*
* NIF: submit_task_with_env(LoopRef, CallerPid, Ref, Module, Func, Args, Kwargs, EnvRef) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_submit_task_with_env(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Process all pending tasks from the task queue
*
Expand Down
28 changes: 7 additions & 21 deletions c_src/py_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ ErlNifResourceType *INLINE_CONTINUATION_RESOURCE_TYPE = NULL;
/* Process-local Python environment resource type */
ErlNifResourceType *PY_ENV_RESOURCE_TYPE = NULL;

/* Getter for PY_ENV_RESOURCE_TYPE (used by py_event_loop.c) */
ErlNifResourceType *get_env_resource_type(void) {
return PY_ENV_RESOURCE_TYPE;
}

_Atomic uint32_t g_context_id_counter = 1;

/* ============================================================================
Expand All @@ -80,27 +85,7 @@ _Atomic uint32_t g_context_id_counter = 1;
* resource destructor frees the Python dicts.
*/

/**
* @struct py_env_resource_t
* @brief Process-local Python environment (globals/locals)
*
* Stored in process dictionary as py_local_env. When the process exits,
* Erlang GC drops the reference, triggering the destructor which frees
* the Python dicts.
*
* Each env is bound to a specific interpreter (identified by interp_id).
* The dicts must be freed in the same interpreter that created them.
*/
typedef struct {
/** @brief Global namespace dictionary */
PyObject *globals;
/** @brief Local namespace dictionary (same as globals for module-level execution) */
PyObject *locals;
/** @brief Interpreter ID that owns these dicts (0 = main interpreter) */
int64_t interp_id;
/** @brief Pool slot index (-1 for main interpreter) */
int pool_slot;
} py_env_resource_t;
/* py_env_resource_t is now defined in py_nif.h */

/**
* @brief Destructor for py_env_resource_t
Expand Down Expand Up @@ -6534,6 +6519,7 @@ static ErlNifFunc nif_funcs[] = {
{"event_loop_run_async", 7, nif_event_loop_run_async, ERL_NIF_DIRTY_JOB_IO_BOUND},
/* Async task queue NIFs (uvloop-inspired) */
{"submit_task", 7, nif_submit_task, 0}, /* Thread-safe, no GIL needed */
{"submit_task_with_env", 8, nif_submit_task_with_env, 0}, /* With process-local env */
{"process_ready_tasks", 1, nif_process_ready_tasks, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"event_loop_set_py_loop", 2, nif_event_loop_set_py_loop, 0},
/* Per-process namespace NIFs */
Expand Down
Loading
Loading