Skip to content

Commit

Permalink
Fix "Event loop closed" error on Windows
Browse files Browse the repository at this point in the history
  For more information look at
  aio-libs/aiohttp#4324
  • Loading branch information
gofff committed Sep 13, 2021
1 parent 31f13ad commit e50aab8
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
28 changes: 26 additions & 2 deletions yddg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,31 @@ async def aenumerate(
i += 1


async def path_list_agen(
path_list: List[Any]) -> AsyncGenerator[Any, None]:
async def path_list_agen(path_list: List[Any]) -> AsyncGenerator[Any, None]:
for path in path_list:
yield path


def event_loop_closed_workaround() -> None:
import sys

# thanks for workaround:
# https://github.com/aio-libs/aiohttp/issues/4324#issuecomment-733884349
if (sys.platform.startswith("win")
and sys.version_info[2] < 10):
from asyncio.proactor_events import _ProactorBasePipeTransport
from functools import wraps

def silence_del(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RuntimeError as e:
if str(e) != 'Event loop is closed':
raise

return wrapper

_ProactorBasePipeTransport.__del__ = silence_del(
_ProactorBasePipeTransport.__del__)
15 changes: 14 additions & 1 deletion yddg/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ async def flush(queue: Union[T.YDiskPathQueue, T.ItemQueue],


class YDDataGenerator(Iterable):

def __init__(
self,
urls: List[str],
Expand Down Expand Up @@ -70,6 +69,12 @@ async def __path_extracting(self) -> None:
await self.paths_queue.put(None)

async def start(self) -> None:
# To silence 'Event loop is closed' RuntimeError in Windows
# and Python < 3.10
# For more information read:
# https://github.com/aio-libs/aiohttp/issues/4324
const.event_loop_closed_workaround()

self.path_extract_task = asyncio.ensure_future(
self.__path_extracting())
self.item_extract_task = asyncio.ensure_future(
Expand All @@ -91,6 +96,14 @@ async def stop(self) -> None:
await self.paths_queue.join()
await self.item_queue.join()

if (self.path_extract_task is not None
and not self.path_extract_task.cancelled()):
self.path_extract_task.cancel()

if (self.item_extract_task is not None
and not self.item_extract_task.cancelled()):
self.item_extract_task.cancel()

async def __anext__(self):
item = await self.item_queue.get()
self.item_queue.task_done()
Expand Down

0 comments on commit e50aab8

Please sign in to comment.