Skip to content

Commit

Permalink
_IocpProactor: Synchronise _poll with asyncio, don't hold lock during…
Browse files Browse the repository at this point in the history
… wait
  • Loading branch information
dnadlinger committed Oct 1, 2023
1 parent 5f6580e commit d59585a
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions qasync/_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,32 +138,45 @@ def _poll(self, timeout=None):
if ms >= UINT32_MAX:
raise ValueError("timeout too big")

with QtCore.QMutexLocker(self._lock):
while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
while True:
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0

with QtCore.QMutexLocker(self._lock):
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
self._logger.debug(
"GetQueuedCompletionStatus() returned an unexpected " +
"event: err=%s transferred=%s key=%#x address=%#x",
err, transferred, key, address)
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
ms = 0
continue

if obj in self._stopped_serving:
# FIXME: Seems like this will ultimately call Future.cancel(),
# which then would end up calling self._loop.call_soon() on the
# poller thread, which is not thread-safe.
f.cancel()
# Futures might already be resolved or cancelled
# Don't call the callback if _register() already read the result or
# if the overlapped has been cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))

ms = 0
# FIXME: Need to check logic around self._unregistered here: Do we rely
# on no new events being registered between the GetQueuedCompletionStatus
# call and this?
with QtCore.QMutexLocker(self._lock):
# Remove unregistered futures
for ov in self._unregistered:
self._cache.pop(ov.address, None)
self._unregistered.clear()


@with_logger
Expand Down

0 comments on commit d59585a

Please sign in to comment.