From 1a5c6badec0af3db2f82548b38f45b2372f670dd Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Wed, 24 Apr 2024 12:59:37 -0400 Subject: [PATCH 01/13] Add missing test dependency on `greenlet` --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 2a375731..06e3fcdd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ test = [ "anyio[trio]", "asyncmy >= 0.2.5; python_implementation == 'CPython'", "coverage >= 7", + "greenlet >= 3", "paho-mqtt >= 2.0", "psycopg", "pymongo >= 4", From 89b7dce42ce8b87290f678794cd217cc76089ee1 Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Thu, 25 Apr 2024 13:30:46 -0400 Subject: [PATCH 02/13] Install `asyncio` extra for `sqlalchemy --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 06e3fcdd..70da69bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ cbor = ["cbor2 >= 5.0"] mongodb = ["pymongo >= 4"] mqtt = ["paho-mqtt >= 2.0"] redis = ["redis >= 4.4.0"] -sqlalchemy = ["sqlalchemy >= 2.0.19"] +sqlalchemy = ["sqlalchemy[asyncio] >= 2.0.19"] test = [ "APScheduler[cbor,mongodb,mqtt,redis,sqlalchemy]", "asyncpg >= 0.20; python_implementation == 'CPython'", @@ -54,7 +54,6 @@ test = [ "anyio[trio]", "asyncmy >= 0.2.5; python_implementation == 'CPython'", "coverage >= 7", - "greenlet >= 3", "paho-mqtt >= 2.0", "psycopg", "pymongo >= 4", From 427d21d2ffa379c440d81378c798954a1020593b Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Thu, 2 May 2024 15:11:37 -0400 Subject: [PATCH 03/13] feat: Support pausing and unpausing schedules Closes #899 --- src/apscheduler/_schedulers/async_.py | 26 +++- src/apscheduler/_schedulers/sync.py | 17 ++- src/apscheduler/_structures.py | 2 + src/apscheduler/abc.py | 26 +++- src/apscheduler/datastores/base.py | 51 +++++++ src/apscheduler/datastores/memory.py | 7 +- src/apscheduler/datastores/mongodb.py | 16 ++- src/apscheduler/datastores/sqlalchemy.py | 8 +- tests/test_datastores.py | 164 ++++++++++++++++++++++- 9 files changed, 307 insertions(+), 10 deletions(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 21752167..e071e32b 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -30,7 +30,13 @@ from .. import JobAdded, SerializationError, TaskLookupError from .._context import current_async_scheduler, current_job from .._converters import as_enum, as_timedelta -from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState, SchedulerRole +from .._enums import ( + CoalescePolicy, + ConflictPolicy, + JobOutcome, + RunState, + SchedulerRole, +) from .._events import ( Event, JobReleased, @@ -389,6 +395,7 @@ async def add_schedule( id: str | None = None, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + paused: bool = False, job_executor: str | UnsetValue = unset, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None | UnsetValue = unset, @@ -406,6 +413,7 @@ async def add_schedule( based ID will be assigned) :param args: positional arguments to be passed to the task function :param kwargs: keyword arguments to be passed to the task function + :param paused: whether the schedule is paused :param job_executor: name of the job executor to run the task with :param coalesce: determines what to do when processing the schedule if multiple fire times have become due for this schedule since the last processing @@ -457,6 +465,7 @@ async def add_schedule( trigger=trigger, args=args, kwargs=kwargs, + paused=paused, coalesce=coalesce, misfire_grace_time=task.misfire_grace_time if misfire_grace_time is unset @@ -508,6 +517,21 @@ async def remove_schedule(self, id: str) -> None: self._check_initialized() await self.data_store.remove_schedules({id}) + async def pause_schedule(self, id: str) -> None: + """Pause the specified schedule.""" + self._check_initialized() + await self.data_store.pause_schedules({id}) + + async def unpause_schedule(self, id: str) -> None: + """Unpause the specified schedule. + + Scheduled runs that would have occurred while the schedule was paused are not + considered misfires. The next schedule run will occur at the next time in the + future the schedule would have run if it had never been paused. + """ + self._check_initialized() + await self.data_store.unpause_schedules({id}) + async def add_job( self, func_or_task_id: TaskType, diff --git a/src/apscheduler/_schedulers/sync.py b/src/apscheduler/_schedulers/sync.py index 8b3bb5f4..9ac8ebc7 100644 --- a/src/apscheduler/_schedulers/sync.py +++ b/src/apscheduler/_schedulers/sync.py @@ -16,7 +16,12 @@ from anyio.from_thread import BlockingPortal, start_blocking_portal from .. import Event, current_scheduler -from .._enums import CoalescePolicy, ConflictPolicy, RunState, SchedulerRole +from .._enums import ( + CoalescePolicy, + ConflictPolicy, + RunState, + SchedulerRole, +) from .._structures import Job, JobResult, Schedule, Task from .._utils import UnsetValue, unset from ..abc import DataStore, EventBroker, JobExecutor, Subscription, Trigger @@ -219,6 +224,7 @@ def add_schedule( id: str | None = None, args: Iterable | None = None, kwargs: Mapping[str, Any] | None = None, + paused: bool = False, job_executor: str | UnsetValue = unset, coalesce: CoalescePolicy = CoalescePolicy.latest, misfire_grace_time: float | timedelta | None | UnsetValue = unset, @@ -235,6 +241,7 @@ def add_schedule( id=id, args=args, kwargs=kwargs, + paused=paused, job_executor=job_executor, coalesce=coalesce, misfire_grace_time=misfire_grace_time, @@ -256,6 +263,14 @@ def remove_schedule(self, id: str) -> None: self._ensure_services_ready() self._portal.call(self._async_scheduler.remove_schedule, id) + def pause_schedule(self, id: str) -> None: + self._ensure_services_ready() + self._portal.call(self._async_scheduler.pause_schedule, id) + + def unpause_schedule(self, id: str) -> None: + self._ensure_services_ready() + self._portal.call(self._async_scheduler.unpause_schedule, id) + def add_job( self, func_or_task_id: TaskType, diff --git a/src/apscheduler/_structures.py b/src/apscheduler/_structures.py index ed9023f2..55e2d9a1 100644 --- a/src/apscheduler/_structures.py +++ b/src/apscheduler/_structures.py @@ -74,6 +74,7 @@ class Schedule: :var str task_id: unique identifier of the task to be run on this schedule :var tuple args: positional arguments to pass to the task callable :var dict[str, Any] kwargs: keyword arguments to pass to the task callable + :var bool paused: whether the schedule is paused :var CoalescePolicy coalesce: determines what to do when processing the schedule if multiple fire times have become due for this schedule since the last processing :var ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the @@ -105,6 +106,7 @@ class Schedule: kwargs: dict[str, Any] = attrs.field( eq=False, order=False, converter=dict, default=() ) + paused: bool = attrs.field(eq=False, order=False, default=False) coalesce: CoalescePolicy = attrs.field( eq=False, order=False, diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py index aaefc8d7..21101ace 100644 --- a/src/apscheduler/abc.py +++ b/src/apscheduler/abc.py @@ -5,7 +5,7 @@ from contextlib import AsyncExitStack from datetime import datetime from logging import Logger -from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator +from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Literal from uuid import UUID if sys.version_info >= (3, 11): @@ -245,6 +245,30 @@ async def remove_schedules(self, ids: Iterable[str]) -> None: :param ids: a specific set of schedule IDs to remove """ + @abstractmethod + async def pause_schedules(self, ids: Iterable[str]) -> None: + """ + Pause the specified schedules. + + :param ids: a specific set of schedule IDs to pause + """ + + @abstractmethod + async def unpause_schedules( + self, + ids: Iterable[str], + *, + resume_from: datetime | Literal["now"] | None = None, + ) -> None: + """ + Unpause the specified schedules. + + :param ids: a specific set of schedule IDs to unpause + :param resume_from: the time to resume the schedules from, or ``'now'`` as a + shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the + schedule left off which may cause it to misfire + """ + @abstractmethod async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: """ diff --git a/src/apscheduler/datastores/base.py b/src/apscheduler/datastores/base.py index 4c5bb66e..7ba18e5e 100644 --- a/src/apscheduler/datastores/base.py +++ b/src/apscheduler/datastores/base.py @@ -1,11 +1,15 @@ from __future__ import annotations from contextlib import AsyncExitStack +from datetime import datetime, timezone from logging import Logger +from typing import Iterable, Literal import attrs +from .._enums import ConflictPolicy from .._retry import RetryMixin +from .._structures import Schedule from ..abc import DataStore, EventBroker, Serializer from ..serializers.pickle import PickleSerializer @@ -29,6 +33,53 @@ async def start( self._event_broker = event_broker self._logger = logger + async def pause_schedules(self, ids: Iterable[str]) -> None: + for schedule in await self.get_schedules(ids): + await self.add_schedule( + attrs.evolve(schedule, paused=True), + ConflictPolicy.replace, + ) + + def _get_unpaused_next_fire_time( + self, + schedule: Schedule, + resume_from: datetime | Literal["now"] | None, + ) -> datetime | None: + if resume_from is None: + return schedule.next_fire_time + if resume_from == "now": + resume_from = datetime.now(tz=timezone.utc) + if ( + schedule.next_fire_time is not None + and schedule.next_fire_time >= resume_from + ): + return schedule.next_fire_time + try: + while (next_fire_time := schedule.trigger.next()) < resume_from: + pass # Advance `next_fire_time` until its at or past `resume_from` + except TypeError: # The trigger is exhausted + return None + return next_fire_time + + async def unpause_schedules( + self, + ids: Iterable[str], + *, + resume_from: datetime | Literal["now"] | None = None, + ) -> None: + for schedule in await self.get_schedules(ids): + await self.add_schedule( + attrs.evolve( + schedule, + paused=False, + next_fire_time=self._get_unpaused_next_fire_time( + schedule, + resume_from, + ), + ), + ConflictPolicy.replace, + ) + @attrs.define(kw_only=True) class BaseExternalDataStore(BaseDataStore, RetryMixin): diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 40477a9c..883ab3b2 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -207,8 +207,11 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul schedules: list[Schedule] = [] for state in self._schedules: if state.next_fire_time is None or state.next_fire_time > now: - # The schedule is either paused or not yet due - break + # The schedule is either exhausted or not yet due + continue + elif state.schedule.paused: + # The schedule is paused + continue elif state.acquired_by is not None: if state.acquired_by != scheduler_id and now <= state.acquired_until: # The schedule has been acquired by another scheduler and the diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 57687f99..8009d6b2 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -366,9 +366,19 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul lambda: self._schedules.find( { "next_fire_time": {"$lte": now}, - "$or": [ - {"acquired_until": {"$exists": False}}, - {"acquired_until": {"$lt": now}}, + "$and": [ + { + "$or": [ + {"paused": {"$exists": False}}, + {"paused": False}, + ] + }, + { + "$or": [ + {"acquired_until": {"$exists": False}}, + {"acquired_until": {"$lt": now}}, + ] + }, ], }, session=session, diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index ddc82ea8..9e726d79 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -17,6 +17,7 @@ from anyio import CancelScope, to_thread from sqlalchemy import ( BigInteger, + Boolean, Column, DateTime, Enum, @@ -293,6 +294,7 @@ def get_table_definitions(self) -> MetaData: Column("trigger", LargeBinary), Column("args", LargeBinary), Column("kwargs", LargeBinary), + Column("paused", Boolean, nullable=False, server_default=literal(False)), Column("coalesce", Enum(CoalescePolicy), nullable=False), Column("misfire_grace_time", interval_type), Column("max_jitter", interval_type), @@ -600,6 +602,7 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul and_( self._t_schedules.c.next_fire_time.isnot(None), comparison, + self._t_schedules.c.paused.is_(False), or_( self._t_schedules.c.acquired_until.is_(None), self._t_schedules.c.acquired_until < now, @@ -752,7 +755,10 @@ async def get_next_schedule_run_time(self) -> datetime | None: statenent = ( select(*columns) - .where(self._t_schedules.c.next_fire_time.isnot(None)) + .where( + self._t_schedules.c.next_fire_time.isnot(None), + self._t_schedules.c.paused.is_(False), + ) .order_by(self._t_schedules.c.next_fire_time) .limit(1) ) diff --git a/tests/test_datastores.py b/tests/test_datastores.py index a9b13fd9..27b61aae 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -4,7 +4,7 @@ from contextlib import AsyncExitStack, asynccontextmanager from datetime import datetime, timedelta, timezone from logging import Logger -from typing import TYPE_CHECKING, AsyncGenerator +from typing import TYPE_CHECKING, AsyncGenerator, Literal import anyio import pytest @@ -28,6 +28,7 @@ ) from apscheduler.abc import DataStore, EventBroker from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.interval import IntervalTrigger if TYPE_CHECKING: from time_machine import TimeMachineFixture @@ -557,3 +558,164 @@ async def test_next_schedule_run_time(datastore: DataStore, schedules: list[Sche next_schedule_run_time = await datastore.get_next_schedule_run_time() assert next_schedule_run_time == datetime(2020, 9, 13, tzinfo=timezone.utc) + + +async def test_pause_and_unpause_schedule( + datastore: DataStore, + schedules: list[Schedule], +): + trigger_datetime = datetime(2020, 9, 16, tzinfo=timezone.utc) + async with capture_events(datastore, 3, {ScheduleUpdated}) as events: + for schedule in schedules: + await datastore.add_schedule(schedule, ConflictPolicy.exception) + + trigger = DateTrigger(trigger_datetime) + next_fire_time = trigger.next() + schedule = Schedule( + id="s3", + task_id="foo", + trigger=trigger, + args=(), + kwargs={}, + coalesce=CoalescePolicy.earliest, + misfire_grace_time=None, + ) + schedule.next_fire_time = next_fire_time + await datastore.add_schedule(schedule, ConflictPolicy.replace) + + schedules = await datastore.get_schedules({schedule.id}) + assert schedules[0].task_id == "foo" + assert schedules[0].next_fire_time == next_fire_time + assert schedules[0].args == () + assert schedules[0].kwargs == {} + assert schedules[0].paused is False + assert schedules[0].coalesce is CoalescePolicy.earliest + assert schedules[0].misfire_grace_time is None + + await datastore.pause_schedules({schedule.id}) + schedules = await datastore.get_schedules({schedule.id}) + assert schedules[0].paused is True + assert schedules[0].next_fire_time == next_fire_time + + await datastore.unpause_schedules({schedule.id}) + schedules = await datastore.get_schedules({schedule.id}) + assert schedules[0].paused is False + assert schedules[0].next_fire_time == next_fire_time + + event_1 = events.pop(0) + assert event_1.schedule_id == "s3" + assert event_1.task_id == "foo" + assert event_1.next_fire_time == trigger_datetime + + event_2 = events.pop(0) + assert event_2.schedule_id == "s3" + assert event_2.task_id == "foo" + assert event_2.next_fire_time == trigger_datetime + + event_3 = events.pop(0) + assert event_3.schedule_id == "s3" + assert event_3.task_id == "foo" + assert event_3.next_fire_time == trigger_datetime + + assert not events + + +@pytest.mark.skipif( + platform.python_implementation() != "CPython", + reason="time-machine is not available", +) +async def test_acquire_paused_schedules( + datastore: DataStore, + schedules: list[Schedule], + time_machine: TimeMachineFixture, +) -> None: + time_machine.move_to(datetime(2020, 9, 14, tzinfo=timezone.utc)) + + event_types = {ScheduleRemoved, ScheduleUpdated} + async with capture_events(datastore, 3, event_types) as events: + for schedule in schedules: + await datastore.add_schedule(schedule, ConflictPolicy.exception) + await datastore.pause_schedules({x.id for x in schedules}) + + acquired_schedules = await datastore.acquire_schedules("dummy-id", 3) + assert len(acquired_schedules) == 0 + # Check that the first schedule has not had its next fire time nullified + schedules = await datastore.get_schedules() + assert len(schedules) == 3 + schedules.sort(key=lambda s: s.id) + assert schedules[0].id == "s1" + assert schedules[0].next_fire_time is not None + assert schedules[1].id == "s2" + assert schedules[2].id == "s3" + + received_event = events.pop(0) + assert isinstance(received_event, ScheduleUpdated) + assert received_event.schedule_id == "s1" + assert received_event.next_fire_time == datetime(2020, 9, 13, tzinfo=timezone.utc) + + received_event = events.pop(0) + assert isinstance(received_event, ScheduleUpdated) + assert received_event.schedule_id == "s2" + assert received_event.next_fire_time == datetime(2020, 9, 14, tzinfo=timezone.utc) + + received_event = events.pop(0) + assert isinstance(received_event, ScheduleUpdated) + assert received_event.schedule_id == "s3" + assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) + + assert not events + + +@pytest.mark.skipif( + platform.python_implementation() != "CPython", + reason="time-machine is not available", +) +@pytest.mark.parametrize( + ("resume_from", "expected_next_fire_time"), + ( + ( + "now", + datetime(2020, 9, 14, 6, tzinfo=timezone.utc), + ), + ( + datetime(2020, 9, 14, tzinfo=timezone.utc), + datetime(2020, 9, 14, tzinfo=timezone.utc), + ), + ( + datetime(2020, 9, 14, 8, tzinfo=timezone.utc), + datetime(2020, 9, 14, 12, tzinfo=timezone.utc), + ), + ( + datetime(2120, 9, 14, 8, tzinfo=timezone.utc), + datetime(2120, 9, 14, 12, tzinfo=timezone.utc), + ), + ( + None, + datetime(2020, 9, 14, tzinfo=timezone.utc), + ), + ), + ids=( + "resume_from_now", + "resume_from_exactly_now", + "resume_from_later", + "resume_from_much_later", + "resume_from_none", + ), +) +async def test_resume_paused_schedules( + datastore: DataStore, + time_machine: TimeMachineFixture, + resume_from: datetime | Literal["now"] | None, + expected_next_fire_time: datetime, +) -> None: + time_machine.move_to(datetime(2020, 9, 14, tzinfo=timezone.utc)) + + trigger = IntervalTrigger(hours=6.0) + schedule = Schedule(id="s1", task_id="task1", trigger=trigger, paused=True) + schedule.next_fire_time = trigger.next() + await datastore.add_schedule(schedule, ConflictPolicy.exception) + + await datastore.unpause_schedules({schedule.id}, resume_from=resume_from) + schedule = next(iter(await datastore.get_schedules({schedule.id}))) + assert schedule.paused is False + assert schedule.next_fire_time.astimezone(timezone.utc) == expected_next_fire_time From 3b3c996c6ff52c69cf06fc5dc1f7695c5b3af8b8 Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Thu, 2 May 2024 15:27:10 -0400 Subject: [PATCH 04/13] Add `resume_from` to scheduler classes --- src/apscheduler/_schedulers/async_.py | 20 ++++++++++++++------ src/apscheduler/_schedulers/sync.py | 19 +++++++++++++++---- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 6beb9f6e..8a92db3e 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -11,7 +11,7 @@ from inspect import isbuiltin, isclass, ismethod, ismodule from logging import Logger, getLogger from types import TracebackType -from typing import Any, Callable, Iterable, Mapping, cast, overload +from typing import Any, Callable, Iterable, Literal, Mapping, cast, overload from uuid import UUID, uuid4 import anyio @@ -543,15 +543,23 @@ async def pause_schedule(self, id: str) -> None: self._check_initialized() await self.data_store.pause_schedules({id}) - async def unpause_schedule(self, id: str) -> None: + async def unpause_schedule( + self, + id: str, + *, + resume_from: datetime | Literal["now"] | None = None, + ) -> None: """Unpause the specified schedule. - Scheduled runs that would have occurred while the schedule was paused are not - considered misfires. The next schedule run will occur at the next time in the - future the schedule would have run if it had never been paused. + By default, the schedule will be resumed as if it had never been paused, and all + missed runs will be considered misfires. The ``resume_from`` parameter can be + used to specify a different time from which to resume the schedule. The string + ``'now'`` can be used as shorthand for ``datetime.now(tz=UTC)``. If + ``resume_from`` is not ``None``, then the trigger will be repeatedly advanced + until the next fire time is at or after the specified time. """ self._check_initialized() - await self.data_store.unpause_schedules({id}) + await self.data_store.unpause_schedules({id}, resume_from=resume_from) async def add_job( self, diff --git a/src/apscheduler/_schedulers/sync.py b/src/apscheduler/_schedulers/sync.py index 9a83027f..ae75a861 100644 --- a/src/apscheduler/_schedulers/sync.py +++ b/src/apscheduler/_schedulers/sync.py @@ -6,11 +6,11 @@ import threading from collections.abc import MutableMapping, Sequence from contextlib import ExitStack -from datetime import timedelta +from datetime import datetime, timedelta from functools import partial from logging import Logger from types import TracebackType -from typing import Any, Callable, Iterable, Mapping, overload +from typing import Any, Callable, Iterable, Literal, Mapping, overload from uuid import UUID from anyio.from_thread import BlockingPortal, start_blocking_portal @@ -286,9 +286,20 @@ def pause_schedule(self, id: str) -> None: self._ensure_services_ready() self._portal.call(self._async_scheduler.pause_schedule, id) - def unpause_schedule(self, id: str) -> None: + def unpause_schedule( + self, + id: str, + *, + resume_from: datetime | Literal["now"] | None = None, + ) -> None: self._ensure_services_ready() - self._portal.call(self._async_scheduler.unpause_schedule, id) + self._portal.call( + partial( + self._async_scheduler.unpause_schedule, + id, + resume_from=resume_from, + ) + ) def add_job( self, From d0a827f945d49b90ed2425ad2a04462f3a466927 Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Thu, 2 May 2024 15:36:53 -0400 Subject: [PATCH 05/13] Fix tests for Windows --- tests/test_datastores.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 27b61aae..2998397d 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import platform from contextlib import AsyncExitStack, asynccontextmanager from datetime import datetime, timedelta, timezone @@ -712,6 +713,10 @@ async def test_resume_paused_schedules( trigger = IntervalTrigger(hours=6.0) schedule = Schedule(id="s1", task_id="task1", trigger=trigger, paused=True) + + # Ensure the next fire time is not the same as the start time when using "now" + await asyncio.sleep(0.1) + schedule.next_fire_time = trigger.next() await datastore.add_schedule(schedule, ConflictPolicy.exception) From 1dc6666134a5cff1458eff709a7d0cf496110991 Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Thu, 2 May 2024 16:06:16 -0400 Subject: [PATCH 06/13] Update test for pausing and unpausing via the scheduler --- tests/test_schedulers.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tests/test_schedulers.py b/tests/test_schedulers.py index 34b7adfd..38c3aaf9 100644 --- a/tests/test_schedulers.py +++ b/tests/test_schedulers.py @@ -192,10 +192,10 @@ async def test_configure_task(self, raw_datastore: DataStore) -> None: assert isinstance(event, TaskUpdated) assert event.task_id == "mytask" - async def test_add_remove_schedule( + async def test_add_pause_unpause_remove_schedule( self, raw_datastore: DataStore, timezone: ZoneInfo ) -> None: - send, receive = create_memory_object_stream[Event](3) + send, receive = create_memory_object_stream[Event](5) async with AsyncScheduler(data_store=raw_datastore) as scheduler: scheduler.subscribe(send.send) now = datetime.now(timezone) @@ -210,6 +210,16 @@ async def test_add_remove_schedule( assert schedules[0].id == "foo" assert schedules[0].task_id == f"{__name__}:dummy_async_job" + await scheduler.pause_schedule("foo") + schedule = await scheduler.get_schedule("foo") + assert schedule.paused + assert schedule.next_fire_time == now + + await scheduler.unpause_schedule("foo") + schedule = await scheduler.get_schedule("foo") + assert not schedule.paused + assert schedule.next_fire_time == now + await scheduler.remove_schedule(schedule_id) assert not await scheduler.get_schedules() @@ -224,6 +234,18 @@ async def test_add_remove_schedule( assert event.task_id == f"{__name__}:dummy_async_job" assert event.next_fire_time == now + event = await receive.receive() + assert isinstance(event, ScheduleUpdated) + assert event.schedule_id == "foo" + assert event.task_id == f"{__name__}:dummy_async_job" + assert event.next_fire_time == now + + event = await receive.receive() + assert isinstance(event, ScheduleUpdated) + assert event.schedule_id == "foo" + assert event.task_id == f"{__name__}:dummy_async_job" + assert event.next_fire_time == now + event = await receive.receive() assert isinstance(event, ScheduleRemoved) assert event.schedule_id == "foo" From 695efe690d98134cb942a6edb409e99ac516fb71 Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Thu, 2 May 2024 18:43:16 -0400 Subject: [PATCH 07/13] Apply suggestions from code review --- src/apscheduler/_schedulers/async_.py | 8 +------- src/apscheduler/_schedulers/sync.py | 7 +------ 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 8a92db3e..2e27ea34 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -30,13 +30,7 @@ from .. import JobAdded, SerializationError, TaskLookupError from .._context import current_async_scheduler, current_job from .._converters import as_enum, as_timedelta -from .._enums import ( - CoalescePolicy, - ConflictPolicy, - JobOutcome, - RunState, - SchedulerRole, -) +from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState, SchedulerRole from .._events import ( Event, JobReleased, diff --git a/src/apscheduler/_schedulers/sync.py b/src/apscheduler/_schedulers/sync.py index ae75a861..8630821e 100644 --- a/src/apscheduler/_schedulers/sync.py +++ b/src/apscheduler/_schedulers/sync.py @@ -16,12 +16,7 @@ from anyio.from_thread import BlockingPortal, start_blocking_portal from .. import current_scheduler -from .._enums import ( - CoalescePolicy, - ConflictPolicy, - RunState, - SchedulerRole, -) +from .._enums import CoalescePolicy, ConflictPolicy, RunState, SchedulerRole from .._events import Event, T_Event from .._structures import Job, JobResult, Schedule, Task from .._utils import UnsetValue, unset From 61f70514cface0549ba3cb24774e9a5ad44f24da Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Mon, 6 May 2024 14:24:18 -0400 Subject: [PATCH 08/13] Address review comments --- src/apscheduler/_schedulers/async_.py | 59 ++++++-- src/apscheduler/_schedulers/sync.py | 8 +- src/apscheduler/abc.py | 26 +--- src/apscheduler/datastores/base.py | 51 ------- src/apscheduler/datastores/memory.py | 5 +- src/apscheduler/datastores/sqlalchemy.py | 5 +- tests/test_datastores.py | 169 +---------------------- 7 files changed, 59 insertions(+), 264 deletions(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 2e27ea34..71b462a3 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -532,28 +532,63 @@ async def remove_schedule(self, id: str) -> None: self._check_initialized() await self.data_store.remove_schedules({id}) - async def pause_schedule(self, id: str) -> None: + async def pause_schedule(self, schedule_id: str) -> None: """Pause the specified schedule.""" self._check_initialized() - await self.data_store.pause_schedules({id}) + await self.data_store.add_schedule( + schedule=attrs.evolve(await self.get_schedule(schedule_id), paused=True), + conflict_policy=ConflictPolicy.replace, + ) + + def _get_unpaused_next_fire_time( + self, + schedule: Schedule, + resume_from: datetime | Literal["now"] | None, + ) -> datetime | None: + if resume_from is None: + return schedule.next_fire_time + if resume_from == "now": + resume_from = datetime.now(tz=timezone.utc) + if ( + schedule.next_fire_time is not None + and schedule.next_fire_time >= resume_from + ): + return schedule.next_fire_time + try: + while (next_fire_time := schedule.trigger.next()) < resume_from: + pass # Advance `next_fire_time` until its at or past `resume_from` + except TypeError: # The trigger is exhausted + return None + return next_fire_time async def unpause_schedule( self, - id: str, + schedule_id: str, *, resume_from: datetime | Literal["now"] | None = None, ) -> None: - """Unpause the specified schedule. - - By default, the schedule will be resumed as if it had never been paused, and all - missed runs will be considered misfires. The ``resume_from`` parameter can be - used to specify a different time from which to resume the schedule. The string - ``'now'`` can be used as shorthand for ``datetime.now(tz=UTC)``. If - ``resume_from`` is not ``None``, then the trigger will be repeatedly advanced - until the next fire time is at or after the specified time. + """ + Unpause the specified schedule. + + + :param resume_from: the time to resume the schedules from, or ``'now'`` as a + shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the + schedule left off which may cause it to misfire + """ self._check_initialized() - await self.data_store.unpause_schedules({id}, resume_from=resume_from) + schedule = await self.get_schedule(schedule_id) + await self.data_store.add_schedule( + schedule=attrs.evolve( + schedule, + paused=False, + next_fire_time=self._get_unpaused_next_fire_time( + schedule, + resume_from, + ), + ), + conflict_policy=ConflictPolicy.replace, + ) async def add_job( self, diff --git a/src/apscheduler/_schedulers/sync.py b/src/apscheduler/_schedulers/sync.py index 8630821e..244ecf65 100644 --- a/src/apscheduler/_schedulers/sync.py +++ b/src/apscheduler/_schedulers/sync.py @@ -277,13 +277,13 @@ def remove_schedule(self, id: str) -> None: self._ensure_services_ready() self._portal.call(self._async_scheduler.remove_schedule, id) - def pause_schedule(self, id: str) -> None: + def pause_schedule(self, schedule_id: str) -> None: self._ensure_services_ready() - self._portal.call(self._async_scheduler.pause_schedule, id) + self._portal.call(self._async_scheduler.pause_schedule, schedule_id) def unpause_schedule( self, - id: str, + schedule_id: str, *, resume_from: datetime | Literal["now"] | None = None, ) -> None: @@ -291,7 +291,7 @@ def unpause_schedule( self._portal.call( partial( self._async_scheduler.unpause_schedule, - id, + schedule_id, resume_from=resume_from, ) ) diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py index cb6a619e..e336caef 100644 --- a/src/apscheduler/abc.py +++ b/src/apscheduler/abc.py @@ -5,7 +5,7 @@ from contextlib import AsyncExitStack from datetime import datetime from logging import Logger -from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Literal +from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator from uuid import UUID if sys.version_info >= (3, 11): @@ -245,30 +245,6 @@ async def remove_schedules(self, ids: Iterable[str]) -> None: :param ids: a specific set of schedule IDs to remove """ - @abstractmethod - async def pause_schedules(self, ids: Iterable[str]) -> None: - """ - Pause the specified schedules. - - :param ids: a specific set of schedule IDs to pause - """ - - @abstractmethod - async def unpause_schedules( - self, - ids: Iterable[str], - *, - resume_from: datetime | Literal["now"] | None = None, - ) -> None: - """ - Unpause the specified schedules. - - :param ids: a specific set of schedule IDs to unpause - :param resume_from: the time to resume the schedules from, or ``'now'`` as a - shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the - schedule left off which may cause it to misfire - """ - @abstractmethod async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: """ diff --git a/src/apscheduler/datastores/base.py b/src/apscheduler/datastores/base.py index 7ba18e5e..4c5bb66e 100644 --- a/src/apscheduler/datastores/base.py +++ b/src/apscheduler/datastores/base.py @@ -1,15 +1,11 @@ from __future__ import annotations from contextlib import AsyncExitStack -from datetime import datetime, timezone from logging import Logger -from typing import Iterable, Literal import attrs -from .._enums import ConflictPolicy from .._retry import RetryMixin -from .._structures import Schedule from ..abc import DataStore, EventBroker, Serializer from ..serializers.pickle import PickleSerializer @@ -33,53 +29,6 @@ async def start( self._event_broker = event_broker self._logger = logger - async def pause_schedules(self, ids: Iterable[str]) -> None: - for schedule in await self.get_schedules(ids): - await self.add_schedule( - attrs.evolve(schedule, paused=True), - ConflictPolicy.replace, - ) - - def _get_unpaused_next_fire_time( - self, - schedule: Schedule, - resume_from: datetime | Literal["now"] | None, - ) -> datetime | None: - if resume_from is None: - return schedule.next_fire_time - if resume_from == "now": - resume_from = datetime.now(tz=timezone.utc) - if ( - schedule.next_fire_time is not None - and schedule.next_fire_time >= resume_from - ): - return schedule.next_fire_time - try: - while (next_fire_time := schedule.trigger.next()) < resume_from: - pass # Advance `next_fire_time` until its at or past `resume_from` - except TypeError: # The trigger is exhausted - return None - return next_fire_time - - async def unpause_schedules( - self, - ids: Iterable[str], - *, - resume_from: datetime | Literal["now"] | None = None, - ) -> None: - for schedule in await self.get_schedules(ids): - await self.add_schedule( - attrs.evolve( - schedule, - paused=False, - next_fire_time=self._get_unpaused_next_fire_time( - schedule, - resume_from, - ), - ), - ConflictPolicy.replace, - ) - @attrs.define(kw_only=True) class BaseExternalDataStore(BaseDataStore, RetryMixin): diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 883ab3b2..49003b69 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -207,8 +207,9 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul schedules: list[Schedule] = [] for state in self._schedules: if state.next_fire_time is None or state.next_fire_time > now: - # The schedule is either exhausted or not yet due - continue + # The schedule is either exhausted or not yet due. There will be no + # schedules that are due after this one, so we can stop here. + break elif state.schedule.paused: # The schedule is paused continue diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 9e726d79..17f2ec13 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -32,6 +32,7 @@ Uuid, and_, bindparam, + false, or_, select, ) @@ -602,7 +603,7 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul and_( self._t_schedules.c.next_fire_time.isnot(None), comparison, - self._t_schedules.c.paused.is_(False), + self._t_schedules.c.paused == false(), or_( self._t_schedules.c.acquired_until.is_(None), self._t_schedules.c.acquired_until < now, @@ -757,7 +758,7 @@ async def get_next_schedule_run_time(self) -> datetime | None: select(*columns) .where( self._t_schedules.c.next_fire_time.isnot(None), - self._t_schedules.c.paused.is_(False), + self._t_schedules.c.paused == false(), ) .order_by(self._t_schedules.c.next_fire_time) .limit(1) diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 2998397d..a9b13fd9 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -1,11 +1,10 @@ from __future__ import annotations -import asyncio import platform from contextlib import AsyncExitStack, asynccontextmanager from datetime import datetime, timedelta, timezone from logging import Logger -from typing import TYPE_CHECKING, AsyncGenerator, Literal +from typing import TYPE_CHECKING, AsyncGenerator import anyio import pytest @@ -29,7 +28,6 @@ ) from apscheduler.abc import DataStore, EventBroker from apscheduler.triggers.date import DateTrigger -from apscheduler.triggers.interval import IntervalTrigger if TYPE_CHECKING: from time_machine import TimeMachineFixture @@ -559,168 +557,3 @@ async def test_next_schedule_run_time(datastore: DataStore, schedules: list[Sche next_schedule_run_time = await datastore.get_next_schedule_run_time() assert next_schedule_run_time == datetime(2020, 9, 13, tzinfo=timezone.utc) - - -async def test_pause_and_unpause_schedule( - datastore: DataStore, - schedules: list[Schedule], -): - trigger_datetime = datetime(2020, 9, 16, tzinfo=timezone.utc) - async with capture_events(datastore, 3, {ScheduleUpdated}) as events: - for schedule in schedules: - await datastore.add_schedule(schedule, ConflictPolicy.exception) - - trigger = DateTrigger(trigger_datetime) - next_fire_time = trigger.next() - schedule = Schedule( - id="s3", - task_id="foo", - trigger=trigger, - args=(), - kwargs={}, - coalesce=CoalescePolicy.earliest, - misfire_grace_time=None, - ) - schedule.next_fire_time = next_fire_time - await datastore.add_schedule(schedule, ConflictPolicy.replace) - - schedules = await datastore.get_schedules({schedule.id}) - assert schedules[0].task_id == "foo" - assert schedules[0].next_fire_time == next_fire_time - assert schedules[0].args == () - assert schedules[0].kwargs == {} - assert schedules[0].paused is False - assert schedules[0].coalesce is CoalescePolicy.earliest - assert schedules[0].misfire_grace_time is None - - await datastore.pause_schedules({schedule.id}) - schedules = await datastore.get_schedules({schedule.id}) - assert schedules[0].paused is True - assert schedules[0].next_fire_time == next_fire_time - - await datastore.unpause_schedules({schedule.id}) - schedules = await datastore.get_schedules({schedule.id}) - assert schedules[0].paused is False - assert schedules[0].next_fire_time == next_fire_time - - event_1 = events.pop(0) - assert event_1.schedule_id == "s3" - assert event_1.task_id == "foo" - assert event_1.next_fire_time == trigger_datetime - - event_2 = events.pop(0) - assert event_2.schedule_id == "s3" - assert event_2.task_id == "foo" - assert event_2.next_fire_time == trigger_datetime - - event_3 = events.pop(0) - assert event_3.schedule_id == "s3" - assert event_3.task_id == "foo" - assert event_3.next_fire_time == trigger_datetime - - assert not events - - -@pytest.mark.skipif( - platform.python_implementation() != "CPython", - reason="time-machine is not available", -) -async def test_acquire_paused_schedules( - datastore: DataStore, - schedules: list[Schedule], - time_machine: TimeMachineFixture, -) -> None: - time_machine.move_to(datetime(2020, 9, 14, tzinfo=timezone.utc)) - - event_types = {ScheduleRemoved, ScheduleUpdated} - async with capture_events(datastore, 3, event_types) as events: - for schedule in schedules: - await datastore.add_schedule(schedule, ConflictPolicy.exception) - await datastore.pause_schedules({x.id for x in schedules}) - - acquired_schedules = await datastore.acquire_schedules("dummy-id", 3) - assert len(acquired_schedules) == 0 - # Check that the first schedule has not had its next fire time nullified - schedules = await datastore.get_schedules() - assert len(schedules) == 3 - schedules.sort(key=lambda s: s.id) - assert schedules[0].id == "s1" - assert schedules[0].next_fire_time is not None - assert schedules[1].id == "s2" - assert schedules[2].id == "s3" - - received_event = events.pop(0) - assert isinstance(received_event, ScheduleUpdated) - assert received_event.schedule_id == "s1" - assert received_event.next_fire_time == datetime(2020, 9, 13, tzinfo=timezone.utc) - - received_event = events.pop(0) - assert isinstance(received_event, ScheduleUpdated) - assert received_event.schedule_id == "s2" - assert received_event.next_fire_time == datetime(2020, 9, 14, tzinfo=timezone.utc) - - received_event = events.pop(0) - assert isinstance(received_event, ScheduleUpdated) - assert received_event.schedule_id == "s3" - assert received_event.next_fire_time == datetime(2020, 9, 15, tzinfo=timezone.utc) - - assert not events - - -@pytest.mark.skipif( - platform.python_implementation() != "CPython", - reason="time-machine is not available", -) -@pytest.mark.parametrize( - ("resume_from", "expected_next_fire_time"), - ( - ( - "now", - datetime(2020, 9, 14, 6, tzinfo=timezone.utc), - ), - ( - datetime(2020, 9, 14, tzinfo=timezone.utc), - datetime(2020, 9, 14, tzinfo=timezone.utc), - ), - ( - datetime(2020, 9, 14, 8, tzinfo=timezone.utc), - datetime(2020, 9, 14, 12, tzinfo=timezone.utc), - ), - ( - datetime(2120, 9, 14, 8, tzinfo=timezone.utc), - datetime(2120, 9, 14, 12, tzinfo=timezone.utc), - ), - ( - None, - datetime(2020, 9, 14, tzinfo=timezone.utc), - ), - ), - ids=( - "resume_from_now", - "resume_from_exactly_now", - "resume_from_later", - "resume_from_much_later", - "resume_from_none", - ), -) -async def test_resume_paused_schedules( - datastore: DataStore, - time_machine: TimeMachineFixture, - resume_from: datetime | Literal["now"] | None, - expected_next_fire_time: datetime, -) -> None: - time_machine.move_to(datetime(2020, 9, 14, tzinfo=timezone.utc)) - - trigger = IntervalTrigger(hours=6.0) - schedule = Schedule(id="s1", task_id="task1", trigger=trigger, paused=True) - - # Ensure the next fire time is not the same as the start time when using "now" - await asyncio.sleep(0.1) - - schedule.next_fire_time = trigger.next() - await datastore.add_schedule(schedule, ConflictPolicy.exception) - - await datastore.unpause_schedules({schedule.id}, resume_from=resume_from) - schedule = next(iter(await datastore.get_schedules({schedule.id}))) - assert schedule.paused is False - assert schedule.next_fire_time.astimezone(timezone.utc) == expected_next_fire_time From 63694010055c591003e37dcbeb37cb8bb486f517 Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Wed, 8 May 2024 21:45:12 -0400 Subject: [PATCH 09/13] Address review comments --- src/apscheduler/_schedulers/async_.py | 44 ++++++++++++--------------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 71b462a3..368d2981 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -540,27 +540,6 @@ async def pause_schedule(self, schedule_id: str) -> None: conflict_policy=ConflictPolicy.replace, ) - def _get_unpaused_next_fire_time( - self, - schedule: Schedule, - resume_from: datetime | Literal["now"] | None, - ) -> datetime | None: - if resume_from is None: - return schedule.next_fire_time - if resume_from == "now": - resume_from = datetime.now(tz=timezone.utc) - if ( - schedule.next_fire_time is not None - and schedule.next_fire_time >= resume_from - ): - return schedule.next_fire_time - try: - while (next_fire_time := schedule.trigger.next()) < resume_from: - pass # Advance `next_fire_time` until its at or past `resume_from` - except TypeError: # The trigger is exhausted - return None - return next_fire_time - async def unpause_schedule( self, schedule_id: str, @@ -578,14 +557,29 @@ async def unpause_schedule( """ self._check_initialized() schedule = await self.get_schedule(schedule_id) + + if resume_from is None: + next_fire_time = schedule.next_fire_time + elif resume_from == "now": + resume_from = datetime.now(tz=timezone.utc) + + if ( + schedule.next_fire_time is not None + and schedule.next_fire_time >= resume_from + ): + next_fire_time = schedule.next_fire_time + + # Advance `next_fire_time` until its at or past `resume_from`, or until it's + # exhausted + while next_fire_time := schedule.trigger.next(): + if next_fire_time is None or next_fire_time >= resume_from: + break + await self.data_store.add_schedule( schedule=attrs.evolve( schedule, paused=False, - next_fire_time=self._get_unpaused_next_fire_time( - schedule, - resume_from, - ), + next_fire_time=next_fire_time, ), conflict_policy=ConflictPolicy.replace, ) From e289125b8e86ac86a2a16741a0646df54431d46f Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Wed, 8 May 2024 21:45:18 -0400 Subject: [PATCH 10/13] Update version history --- docs/versionhistory.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 22bf8efc..9b13bb51 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -15,6 +15,7 @@ APScheduler, see the :doc:`migration section `. - **BREAKING** Made publishing ``JobReleased`` events the responsibility of the ``DataStore`` implementation, rather than the scheduler, for consistency with the ``acquire_jobs()`` method +- Added the ability to pause and unpause schedules (PR by @WillDaSilva) - Fixed large parts of ``MongoDBDataStore`` still calling blocking functions in the event loop thread - Fixed JSON serialization of triggers that had been used at least once From 0342393171d96a6fbc8a5b52e4f2380c481930ab Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Wed, 8 May 2024 21:51:08 -0400 Subject: [PATCH 11/13] Use `id` instead of `schedule_id` for consistency --- src/apscheduler/_schedulers/async_.py | 8 ++++---- src/apscheduler/_schedulers/sync.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index 368d2981..e537a0d7 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -532,17 +532,17 @@ async def remove_schedule(self, id: str) -> None: self._check_initialized() await self.data_store.remove_schedules({id}) - async def pause_schedule(self, schedule_id: str) -> None: + async def pause_schedule(self, id: str) -> None: """Pause the specified schedule.""" self._check_initialized() await self.data_store.add_schedule( - schedule=attrs.evolve(await self.get_schedule(schedule_id), paused=True), + schedule=attrs.evolve(await self.get_schedule(id), paused=True), conflict_policy=ConflictPolicy.replace, ) async def unpause_schedule( self, - schedule_id: str, + id: str, *, resume_from: datetime | Literal["now"] | None = None, ) -> None: @@ -556,7 +556,7 @@ async def unpause_schedule( """ self._check_initialized() - schedule = await self.get_schedule(schedule_id) + schedule = await self.get_schedule(id) if resume_from is None: next_fire_time = schedule.next_fire_time diff --git a/src/apscheduler/_schedulers/sync.py b/src/apscheduler/_schedulers/sync.py index 244ecf65..8630821e 100644 --- a/src/apscheduler/_schedulers/sync.py +++ b/src/apscheduler/_schedulers/sync.py @@ -277,13 +277,13 @@ def remove_schedule(self, id: str) -> None: self._ensure_services_ready() self._portal.call(self._async_scheduler.remove_schedule, id) - def pause_schedule(self, schedule_id: str) -> None: + def pause_schedule(self, id: str) -> None: self._ensure_services_ready() - self._portal.call(self._async_scheduler.pause_schedule, schedule_id) + self._portal.call(self._async_scheduler.pause_schedule, id) def unpause_schedule( self, - schedule_id: str, + id: str, *, resume_from: datetime | Literal["now"] | None = None, ) -> None: @@ -291,7 +291,7 @@ def unpause_schedule( self._portal.call( partial( self._async_scheduler.unpause_schedule, - schedule_id, + id, resume_from=resume_from, ) ) From f58ffc656674e2075d96e1d17edfde08623e30a2 Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Wed, 8 May 2024 22:04:06 -0400 Subject: [PATCH 12/13] Add docs for pausing and unpausing schedules --- docs/userguide.rst | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/docs/userguide.rst b/docs/userguide.rst index 4b0348cc..84fe1d63 100644 --- a/docs/userguide.rst +++ b/docs/userguide.rst @@ -332,6 +332,31 @@ the schedule you want to remove as an argument. This is the ID you got from Note that removing a schedule does not cancel any jobs derived from it, but does prevent further jobs from being created from that schedule. +Pausing schedules +----------------- + +To pause a schedule, call :meth:`~Scheduler.pause_schedule`. Pass the identifier of the +schedule you want to pause as an argument. This is the ID you got from +:meth:`~Scheduler.add_schedule`. + +Pausing a schedule prevents any new jobs from being created from it, but does not cancel +any jobs that have already been created from that schedule. + +The schedule can be unpaused by calling :meth:`~Scheduler.unpause_schedule` with the +identifier of the schedule you want to unpause. + +By default the schedule will retain the next fire time it had when it was paused, which +may result in the schedule being considered to have misfired when it is unpaused, +resulting in whatever misfire behavior it has configured +(see :ref:`controlling-how-much-a-job-can-be-started-late` for more details). + +The ``resume_from`` parameter can be used to specify the time from which the schedule +should be resumed. This can be used to avoid the misfire behavior mentioned above. It +can be either a datetime object, or the string ``"now"`` as a convenient shorthand for +the current datetime. If this parameter is provided, the schedules trigger will be +repeatedly advanced to determine a next fire time that is at or after the specified time +to resume from. + Limiting the number of concurrently executing instances of a job ---------------------------------------------------------------- @@ -344,6 +369,8 @@ still running, the later job is terminated with the outcome of To allow more jobs to be concurrently running for a task, pass the desired maximum number as the ``max_running_jobs`` keyword argument to :meth:`~Scheduler.add_schedule`. +.. _controlling-how-much-a-job-can-be-started-late: + Controlling how much a job can be started late ---------------------------------------------- From 56c23ecbfde134b7ad77cc2bb5a133a051b9ac8c Mon Sep 17 00:00:00 2001 From: Will Da Silva Date: Wed, 8 May 2024 22:07:28 -0400 Subject: [PATCH 13/13] Fix refactor --- src/apscheduler/_schedulers/async_.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/apscheduler/_schedulers/async_.py b/src/apscheduler/_schedulers/async_.py index e537a0d7..22c64654 100644 --- a/src/apscheduler/_schedulers/async_.py +++ b/src/apscheduler/_schedulers/async_.py @@ -558,22 +558,22 @@ async def unpause_schedule( self._check_initialized() schedule = await self.get_schedule(id) - if resume_from is None: - next_fire_time = schedule.next_fire_time - elif resume_from == "now": + if resume_from == "now": resume_from = datetime.now(tz=timezone.utc) - if ( + if resume_from is None: + next_fire_time = schedule.next_fire_time + elif ( schedule.next_fire_time is not None and schedule.next_fire_time >= resume_from ): next_fire_time = schedule.next_fire_time - - # Advance `next_fire_time` until its at or past `resume_from`, or until it's - # exhausted - while next_fire_time := schedule.trigger.next(): - if next_fire_time is None or next_fire_time >= resume_from: - break + else: + # Advance `next_fire_time` until its at or past `resume_from`, or until it's + # exhausted + while next_fire_time := schedule.trigger.next(): + if next_fire_time is None or next_fire_time >= resume_from: + break await self.data_store.add_schedule( schedule=attrs.evolve(