Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions c_src/py_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@
*/

/** @brief Name for the PyCapsule storing event loop pointer */
static const char *EVENT_LOOP_CAPSULE_NAME = "erlang_python.event_loop";

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Documentation

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Lint

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Free-threaded Python 3.13t

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.12 / ubuntu-24.04

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.13 / ubuntu-24.04

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

Check warning on line 95 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘EVENT_LOOP_CAPSULE_NAME’ defined but not used [-Wunused-variable]

/** @brief Module attribute name for storing the event loop */
static const char *EVENT_LOOP_ATTR_NAME = "_loop";

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Documentation

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Lint

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / Free-threaded Python 3.13t

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.12 / ubuntu-24.04

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / OTP 27.0 / Python 3.13 / ubuntu-24.04

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

Check warning on line 98 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘EVENT_LOOP_ATTR_NAME’ defined but not used [-Wunused-variable]

/* ============================================================================
* Module State Structure
Expand Down Expand Up @@ -1087,6 +1087,9 @@
/* Initialize fields */
memset(loop, 0, sizeof(erlang_event_loop_t));

/* Initialize pending_capacity (memset zeros it, but we need the initial value) */
loop->pending_capacity = INITIAL_PENDING_CAPACITY;

if (pthread_mutex_init(&loop->mutex, NULL) != 0) {
enif_release_resource(loop);
return make_error(env, "mutex_init_failed");
Expand Down Expand Up @@ -1264,7 +1267,7 @@
if (!enif_get_atom(env, argv[1], atom_buf, sizeof(atom_buf), ERL_NIF_LATIN1)) {
return make_error(env, "invalid_id");
}
strncpy(loop->loop_id, atom_buf, sizeof(loop->loop_id) - 1);

Check warning on line 1270 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.12

‘__builtin_strncpy’ output may be truncated copying 63 bytes from a string of length 63 [-Wstringop-truncation]

Check warning on line 1270 in c_src/py_event_loop.c

View workflow job for this annotation

GitHub Actions / ASan / Python 3.13

‘__builtin_strncpy’ output may be truncated copying 63 bytes from a string of length 63 [-Wstringop-truncation]
loop->loop_id[sizeof(loop->loop_id) - 1] = '\0';
} else {
size_t copy_len = id_bin.size < sizeof(loop->loop_id) - 1 ?
Expand Down Expand Up @@ -6410,6 +6413,9 @@
/* Initialize fields */
memset(loop, 0, sizeof(erlang_event_loop_t));

/* Initialize pending_capacity (memset zeros it, but we need the initial value) */
loop->pending_capacity = INITIAL_PENDING_CAPACITY;

if (pthread_mutex_init(&loop->mutex, NULL) != 0) {
enif_release_resource(loop);
PyErr_SetString(PyExc_RuntimeError, "Failed to initialize mutex");
Expand Down Expand Up @@ -7446,6 +7452,9 @@
/* Initialize fields */
memset(loop, 0, sizeof(erlang_event_loop_t));

/* Initialize pending_capacity (memset zeros it, but we need the initial value) */
loop->pending_capacity = INITIAL_PENDING_CAPACITY;

if (pthread_mutex_init(&loop->mutex, NULL) != 0) {
enif_release_resource(loop);
return -1;
Expand Down
5 changes: 5 additions & 0 deletions c_src/py_event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ typedef struct erlang_event_loop {
uint32_t interp_id;

/* ========== Async Task Queue (uvloop-inspired) ========== */
/*
* Future optimization: Replace serialized task queue with native MPSC
* ring buffer to avoid enif_term_to_binary/enif_binary_to_term overhead.
* See task_entry_t/task_ring_t design in optimization plan.
*/

/** @brief Python ErlangEventLoop instance (direct ref, no thread-local) */
PyObject *py_loop;
Expand Down
15 changes: 12 additions & 3 deletions priv/_erlang_impl/_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,10 +1339,19 @@ async def _run_and_send(coro, caller_pid, ref):

try:
result = await coro
erlang.send(caller_pid, (async_result, ref, (ok, result)))
try:
erlang.send(caller_pid, (async_result, ref, (ok, result)))
except erlang.ProcessError:
pass # Caller gone, nothing to do
except asyncio.CancelledError:
erlang.send(caller_pid, (async_result, ref, (error, 'cancelled')))
try:
erlang.send(caller_pid, (async_result, ref, (error, 'cancelled')))
except erlang.ProcessError:
pass # Caller gone, nothing to do
except Exception as e:
import traceback
tb = traceback.format_exc()
erlang.send(caller_pid, (async_result, ref, (error, f'{type(e).__name__}: {e}\n{tb}')))
try:
erlang.send(caller_pid, (async_result, ref, (error, f'{type(e).__name__}: {e}\n{tb}')))
except erlang.ProcessError:
pass # Caller gone, nothing to do
129 changes: 78 additions & 51 deletions priv/_erlang_impl/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,39 @@ async def _start(self):
self._loop.call_soon(self._protocol.connection_made, self)
self._loop.add_reader(self._fileno, self._read_ready)

# Maximum reads per callback to avoid starving other events
_max_reads_per_call = 16

def _read_ready(self):
"""Called when data is available to read."""
"""Called when data is available to read.

Drains socket until EAGAIN with a budget to avoid starvation.
"""
if self._conn_lost:
return
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._fatal_error(exc, 'Fatal read error')
return

if data:
self._protocol.data_received(data)
else:
# Connection closed (EOF received)
self._loop.remove_reader(self._fileno)
keep_open = self._protocol.eof_received()
# If eof_received returns False/None, close the transport
if not keep_open:
self._closing = True
self._conn_lost += 1
self._call_connection_lost(None)
for _ in range(self._max_reads_per_call):
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
# EAGAIN - no more data available
return
except Exception as exc:
self._fatal_error(exc, 'Fatal read error')
return

if data:
self._protocol.data_received(data)
else:
# Connection closed (EOF received)
self._loop.remove_reader(self._fileno)
keep_open = self._protocol.eof_received()
# If eof_received returns False/None, close the transport
if not keep_open:
self._closing = True
self._conn_lost += 1
self._call_connection_lost(None)
return

def write(self, data):
"""Write data to the transport."""
Expand Down Expand Up @@ -122,30 +132,38 @@ def write(self, data):

self._buffer.extend(data)

# Maximum writes per callback to avoid starving other events
_max_writes_per_call = 16

def _write_ready_cb(self):
"""Called when socket is ready for writing."""
remaining = len(self._buffer) - self._buffer_offset
if remaining <= 0:
self._loop.remove_writer(self._fileno)
if self._closing:
self._call_connection_lost(None)
return
"""Called when socket is ready for writing.

Drains buffer until EAGAIN with a budget to avoid starvation.
"""
for _ in range(self._max_writes_per_call):
remaining = len(self._buffer) - self._buffer_offset
if remaining <= 0:
self._loop.remove_writer(self._fileno)
if self._closing:
self._call_connection_lost(None)
return

try:
# Use memoryview with offset for O(1) access to remaining data
data_view = memoryview(self._buffer)[self._buffer_offset:]
n = self._sock.send(data_view)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._loop.remove_writer(self._fileno)
self._fatal_error(exc, 'Fatal write error')
return
try:
# Use memoryview with offset for O(1) access to remaining data
data_view = memoryview(self._buffer)[self._buffer_offset:]
n = self._sock.send(data_view)
except (BlockingIOError, InterruptedError):
# EAGAIN - socket buffer full
return
except Exception as exc:
self._loop.remove_writer(self._fileno)
self._fatal_error(exc, 'Fatal write error')
return

if n:
self._buffer_offset += n # O(1) offset update instead of O(n) deletion
if n:
self._buffer_offset += n # O(1) offset update instead of O(n) deletion

# Check if buffer is fully consumed
# Check if buffer is fully consumed after budget exhausted
if self._buffer_offset >= len(self._buffer):
# Reset buffer when fully consumed
self._buffer = self._buffer_factory()
Expand Down Expand Up @@ -258,6 +276,9 @@ class ErlangDatagramTransport(transports.DatagramTransport):

max_size = 256 * 1024 # 256 KB

# Maximum reads per callback to avoid starving other events
_max_reads_per_call = 16

def __init__(self, loop, sock, protocol, address=None, extra=None):
super().__init__(extra)
self._loop = loop
Expand All @@ -282,21 +303,27 @@ async def _start(self):
self._loop.add_reader(self._fileno, self._read_ready)

def _read_ready(self):
"""Called when data is available to read."""
"""Called when data is available to read.

Drains socket until EAGAIN with a budget to avoid starvation.
"""
if self._conn_lost:
return
try:
data, addr = self._sock.recvfrom(self.max_size)
except (BlockingIOError, InterruptedError):
return
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on datagram transport')
return

self._protocol.datagram_received(data, addr)
for _ in range(self._max_reads_per_call):
try:
data, addr = self._sock.recvfrom(self.max_size)
except (BlockingIOError, InterruptedError):
# EAGAIN - no more data available
return
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on datagram transport')
return

self._protocol.datagram_received(data, addr)

def sendto(self, data, addr=None):
"""Send data to the transport."""
Expand Down
3 changes: 2 additions & 1 deletion src/py.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,8 @@ deactivate_venv() ->
%% Returns a map with venv_path and site_packages, or none if no venv is active.
-spec venv_info() -> {ok, map() | none} | {error, term()}.
venv_info() ->
Code = <<"({'active': True, 'venv_path': __import__('sys')._active_venv, 'site_packages': __import__('sys')._venv_site_packages, 'sys_path': __import__('sys').path} if hasattr(__import__('sys'), '_active_venv') else {'active': False})">>,
%% Check both attributes exist to handle partial activation/deactivation state
Code = <<"({'active': True, 'venv_path': __import__('sys')._active_venv, 'site_packages': __import__('sys')._venv_site_packages, 'sys_path': __import__('sys').path} if (hasattr(__import__('sys'), '_active_venv') and hasattr(__import__('sys'), '_venv_site_packages')) else {'active': False})">>,
eval(Code).

%% @private
Expand Down
28 changes: 23 additions & 5 deletions src/py_event_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ handle_cast(_Msg, State) -> {noreply, State}.
handle_info({select, FdRes, _Ref, ready_input}, State) ->
py_nif:handle_fd_event_and_reselect(FdRes, read),
%% Trigger event processing after FD event dispatch
self() ! task_ready,
maybe_send_task_ready(),
{noreply, State};

handle_info({select, FdRes, _Ref, ready_output}, State) ->
py_nif:handle_fd_event_and_reselect(FdRes, write),
%% Trigger event processing after FD event dispatch
self() ! task_ready,
maybe_send_task_ready(),
{noreply, State};

handle_info({start_timer, _LoopRef, DelayMs, CallbackId, TimerRef}, State) ->
Expand Down Expand Up @@ -86,7 +86,7 @@ handle_info({timeout, TimerRef}, State) ->
NewTimers = maps:remove(TimerRef, Timers),
%% Trigger event processing after timer dispatch
%% This ensures _run_once is called to handle the timer callback
self() ! task_ready,
maybe_send_task_ready(),
{noreply, State#state{timers = NewTimers}}
end;

Expand All @@ -96,6 +96,8 @@ handle_info({select, _FdRes, _Ref, cancelled}, State) -> {noreply, State};
%% This is sent via enif_send when a new async task is submitted.
%% Uses a drain-until-empty loop to handle tasks submitted during processing.
handle_info(task_ready, #state{loop_ref = LoopRef} = State) ->
%% Clear the pending flag - we're processing now
put(task_ready_pending, false),
drain_tasks_loop(LoopRef),
{noreply, State};

Expand All @@ -121,7 +123,9 @@ drain_tasks_loop(LoopRef) ->
ok ->
%% Check if more task_ready messages arrived during processing
receive
task_ready -> drain_tasks_loop(LoopRef)
task_ready ->
put(task_ready_pending, false),
drain_tasks_loop(LoopRef)
after 0 ->
ok
end;
Expand All @@ -130,7 +134,7 @@ drain_tasks_loop(LoopRef) ->
%% Send task_ready to self and return, allowing the gen_server
%% to process other messages (select, timers) before continuing.
%% This prevents starvation under sustained task traffic.
self() ! task_ready,
maybe_send_task_ready(),
ok;
{error, py_loop_not_set} ->
ok;
Expand All @@ -141,3 +145,17 @@ drain_tasks_loop(LoopRef) ->
error_logger:warning_msg("py_event_worker: task processing failed: ~p~n", [Reason]),
ok
end.

%% @doc Send task_ready message only if one isn't already pending.
%% Uses process dictionary to coalesce multiple wakeup requests.
maybe_send_task_ready() ->
case get(task_ready_pending) of
true ->
%% Already pending, no need to send another
ok;
_ ->
%% No pending message, send one and mark as pending
put(task_ready_pending, true),
self() ! task_ready,
ok
end.
Loading