Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add complete type checking with mypy #352

Merged
merged 16 commits into from
Oct 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .isort.cfg

This file was deleted.

27 changes: 27 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[mypy]
files = aiojobs, tests
check_untyped_defs = True
follow_imports_for_stubs = True
disallow_any_decorated = True
disallow_any_generics = True
disallow_any_unimported = True
disallow_incomplete_defs = True
disallow_subclassing_any = True
disallow_untyped_calls = True
disallow_untyped_decorators = True
disallow_untyped_defs = True
enable_error_code = redundant-expr, truthy-bool, ignore-without-code, unused-awaitable
implicit_reexport = False
no_implicit_optional = True
pretty = True
show_column_numbers = True
show_error_codes = True
strict_equality = True
warn_incomplete_stub = True
warn_redundant_casts = True
warn_return_any = True
warn_unreachable = True
warn_unused_ignores = True

[mypy-tests.*]
disallow_any_decorated = False
14 changes: 9 additions & 5 deletions aiojobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
asyncio applications.

"""
from typing import Optional

from ._scheduler import ExceptionHandler, Scheduler

__version__ = "1.0.0"

from ._scheduler import Scheduler


async def create_scheduler(
*, close_timeout=0.1, limit=100, pending_limit=10000, exception_handler=None
):
*,
close_timeout: Optional[float] = 0.1,
limit: Optional[int] = 100,
pending_limit: int = 10000,
exception_handler: Optional[ExceptionHandler] = None
) -> Scheduler:
if exception_handler is not None and not callable(exception_handler):
raise TypeError(
"A callable object or None is expected, "
Expand All @@ -27,4 +31,4 @@ async def create_scheduler(
)


__all__ = ("create_scheduler",)
__all__ = ("Scheduler", "create_scheduler")
67 changes: 42 additions & 25 deletions aiojobs/_job.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,67 @@
import asyncio
import sys
import traceback
from typing import TYPE_CHECKING, Coroutine, Generic, Optional, TypeVar

import async_timeout

if TYPE_CHECKING:
from ._scheduler import Scheduler
else:
Scheduler = None

class Job:
_source_traceback = None
_closed = False
_explicit = False
_task = None
_T = TypeVar("_T", covariant=True)

def __init__(self, coro, scheduler):

class Job(Generic[_T]):
def __init__(self, coro: Coroutine[object, object, _T], scheduler: Scheduler):
self._coro = coro
self._scheduler = scheduler
self._scheduler: Optional[Scheduler] = scheduler
loop = asyncio.get_running_loop()
self._started = loop.create_future()

if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(2))
self._closed = False
self._explicit = False
self._task: Optional["asyncio.Task[_T]"] = None

tb = traceback.extract_stack(sys._getframe(2)) if loop.get_debug() else None
self._source_traceback = tb

def __repr__(self):
def __repr__(self) -> str:
info = []
if self._closed:
info.append("closed")
elif self._task is None:
info.append("pending")
info = " ".join(info)
if info:
info += " "
return f"<Job {info}coro=<{self._coro}>>"
state = " ".join(info)
if state:
state += " "
return f"<Job {state}coro=<{self._coro}>>"

@property
def active(self):
def active(self) -> bool:
return not self.closed and not self.pending

@property
def pending(self):
def pending(self) -> bool:
return self._task is None and not self.closed

@property
def closed(self):
def closed(self) -> bool:
return self._closed

async def _do_wait(self, timeout):
async def _do_wait(self, timeout: Optional[float]) -> _T:
async with async_timeout.timeout(timeout):
# TODO: add a test for waiting for a pending coro
await self._started
assert self._task is not None # Task should have been created before this.
return await self._task

async def wait(self, *, timeout=None):
async def wait(self, *, timeout: Optional[float] = None) -> _T:
if self._closed:
assert self._task is not None # Task must have been created if closed.
return await self._task
assert self._scheduler is not None # Only removed when not _closed.
self._explicit = True
scheduler = self._scheduler
try:
Expand All @@ -63,21 +73,23 @@ async def wait(self, *, timeout=None):
await self._close(scheduler.close_timeout)
raise

async def close(self, *, timeout=None):
async def close(self, *, timeout: Optional[float] = None) -> None:
if self._closed:
return
assert self._scheduler is not None # Only removed when not _closed.
self._explicit = True
if timeout is None:
timeout = self._scheduler.close_timeout
await self._close(timeout)

async def _close(self, timeout):
async def _close(self, timeout: Optional[float]) -> None:
self._closed = True
if self._task is None:
# the task is closed immediately without actual execution
# it prevents a warning like
# RuntimeWarning: coroutine 'coro' was never awaited
self._start()
assert self._task is not None
self._task.cancel()
# self._scheduler is None after _done_callback()
scheduler = self._scheduler
Expand All @@ -96,19 +108,23 @@ async def _close(self, timeout):
}
if self._source_traceback is not None:
context["source_traceback"] = self._source_traceback
# scheduler is only None if job was already finished, in which case
# there's no timeout. self._scheduler will now be None though.
assert scheduler is not None
scheduler.call_exception_handler(context)
except Exception as exc:
if self._explicit:
raise
self._report_exception(exc)

def _start(self):
def _start(self) -> None:
assert self._task is None
self._task = asyncio.create_task(self._coro)
self._task.add_done_callback(self._done_callback)
self._started.set_result(None)

def _done_callback(self, task):
def _done_callback(self, task: "asyncio.Task[_T]") -> None:
assert self._scheduler is not None
scheduler = self._scheduler
scheduler._done(self)
try:
Expand All @@ -118,11 +134,12 @@ def _done_callback(self, task):
else:
if exc is not None and not self._explicit:
self._report_exception(exc)
scheduler._failed_tasks.put_nowait(task)
scheduler._failed_tasks.put_nowait(task) # type: ignore[arg-type]
Dreamsorcerer marked this conversation as resolved.
Show resolved Hide resolved
self._scheduler = None # drop backref
self._closed = True

def _report_exception(self, exc):
def _report_exception(self, exc: BaseException) -> None:
assert self._scheduler is not None
context = {"message": "Job processing failed", "job": self, "exception": exc}
if self._source_traceback is not None:
context["source_traceback"] = self._source_traceback
Expand Down
80 changes: 51 additions & 29 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,85 @@
import asyncio
from collections.abc import Collection
from typing import (
Any,
Callable,
Collection,
Coroutine,
Dict,
Iterator,
Optional,
Set,
TypeVar,
)

from ._job import Job

_T = TypeVar("_T")
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]

class Scheduler(Collection):
def __init__(self, *, close_timeout, limit, pending_limit, exception_handler):
self._jobs = set()

class Scheduler(Collection[Job[object]]):
def __init__(
self,
*,
close_timeout: Optional[float],
limit: Optional[int],
pending_limit: int,
exception_handler: Optional[ExceptionHandler],
):
self._jobs: Set[Job[object]] = set()
self._close_timeout = close_timeout
self._limit = limit
self._exception_handler = exception_handler
self._failed_tasks = asyncio.Queue()
self._failed_tasks: asyncio.Queue[
Optional[asyncio.Task[object]]
] = asyncio.Queue()
self._failed_task = asyncio.create_task(self._wait_failed())
self._pending = asyncio.Queue(maxsize=pending_limit)
self._pending: asyncio.Queue[Job[object]] = asyncio.Queue(maxsize=pending_limit)
self._closed = False

def __iter__(self):
return iter(list(self._jobs))
def __iter__(self) -> Iterator[Job[Any]]:
return iter(self._jobs)

def __len__(self):
def __len__(self) -> int:
return len(self._jobs)

def __contains__(self, job):
return job in self._jobs
def __contains__(self, obj: object) -> bool:
return obj in self._jobs

def __repr__(self):
def __repr__(self) -> str:
info = []
if self._closed:
info.append("closed")
info = " ".join(info)
if info:
info += " "
return f"<Scheduler {info}jobs={len(self)}>"
state = " ".join(info)
if state:
state += " "
return f"<Scheduler {state}jobs={len(self)}>"

@property
def limit(self):
def limit(self) -> Optional[int]:
return self._limit

@property
def pending_limit(self):
def pending_limit(self) -> int:
return self._pending.maxsize

@property
def close_timeout(self):
def close_timeout(self) -> Optional[float]:
return self._close_timeout

@property
def active_count(self):
def active_count(self) -> int:
return len(self._jobs) - self._pending.qsize()

@property
def pending_count(self):
def pending_count(self) -> int:
return self._pending.qsize()

@property
def closed(self):
def closed(self) -> bool:
return self._closed

async def spawn(self, coro):
async def spawn(self, coro: Coroutine[object, object, _T]) -> Job[_T]:
if self._closed:
raise RuntimeError("Scheduling a new job after closing")
job = Job(coro, self)
Expand All @@ -74,7 +96,7 @@ async def spawn(self, coro):
self._jobs.add(job)
return job

async def close(self):
async def close(self) -> None:
if self._closed:
return
self._closed = True # prevent adding new jobs
Expand All @@ -93,24 +115,24 @@ async def close(self):
self._failed_tasks.put_nowait(None)
await self._failed_task

def call_exception_handler(self, context):
def call_exception_handler(self, context: Dict[str, Any]) -> None:
handler = self._exception_handler
if handler is None:
handler = asyncio.get_running_loop().call_exception_handler(context)
else:
handler(self, context)

@property
def exception_handler(self):
def exception_handler(self) -> Optional[ExceptionHandler]:
return self._exception_handler

def _done(self, job):
def _done(self, job: Job[object]) -> None:
self._jobs.discard(job)
if not self.pending_count:
return
# No pending jobs when limit is None
# Safe to subtract.
ntodo = self._limit - self.active_count
ntodo = self._limit - self.active_count # type: ignore[operator]
i = 0
while i < ntodo:
if not self.pending_count:
Expand All @@ -121,7 +143,7 @@ def _done(self, job):
new_job._start()
i += 1

async def _wait_failed(self):
async def _wait_failed(self) -> None:
# a coroutine for waiting failed tasks
# without awaiting for failed tasks async raises a warning
while True:
Expand Down
Loading