Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
WillDaSilva committed May 6, 2024
1 parent 695efe6 commit 61f7051
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 264 deletions.
59 changes: 47 additions & 12 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,28 +532,63 @@ async def remove_schedule(self, id: str) -> None:
self._check_initialized()
await self.data_store.remove_schedules({id})

async def pause_schedule(self, id: str) -> None:
async def pause_schedule(self, schedule_id: str) -> None:
"""Pause the specified schedule."""
self._check_initialized()
await self.data_store.pause_schedules({id})
await self.data_store.add_schedule(
schedule=attrs.evolve(await self.get_schedule(schedule_id), paused=True),
conflict_policy=ConflictPolicy.replace,
)

def _get_unpaused_next_fire_time(
self,
schedule: Schedule,
resume_from: datetime | Literal["now"] | None,
) -> datetime | None:
if resume_from is None:
return schedule.next_fire_time
if resume_from == "now":
resume_from = datetime.now(tz=timezone.utc)
if (
schedule.next_fire_time is not None
and schedule.next_fire_time >= resume_from
):
return schedule.next_fire_time
try:
while (next_fire_time := schedule.trigger.next()) < resume_from:
pass # Advance `next_fire_time` until its at or past `resume_from`
except TypeError: # The trigger is exhausted
return None
return next_fire_time

async def unpause_schedule(
self,
id: str,
schedule_id: str,
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
"""Unpause the specified schedule.
By default, the schedule will be resumed as if it had never been paused, and all
missed runs will be considered misfires. The ``resume_from`` parameter can be
used to specify a different time from which to resume the schedule. The string
``'now'`` can be used as shorthand for ``datetime.now(tz=UTC)``. If
``resume_from`` is not ``None``, then the trigger will be repeatedly advanced
until the next fire time is at or after the specified time.
"""
Unpause the specified schedule.
:param resume_from: the time to resume the schedules from, or ``'now'`` as a
shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the
schedule left off which may cause it to misfire
"""
self._check_initialized()
await self.data_store.unpause_schedules({id}, resume_from=resume_from)
schedule = await self.get_schedule(schedule_id)
await self.data_store.add_schedule(
schedule=attrs.evolve(
schedule,
paused=False,
next_fire_time=self._get_unpaused_next_fire_time(
schedule,
resume_from,
),
),
conflict_policy=ConflictPolicy.replace,
)

async def add_job(
self,
Expand Down
8 changes: 4 additions & 4 deletions src/apscheduler/_schedulers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,21 +277,21 @@ def remove_schedule(self, id: str) -> None:
self._ensure_services_ready()
self._portal.call(self._async_scheduler.remove_schedule, id)

def pause_schedule(self, id: str) -> None:
def pause_schedule(self, schedule_id: str) -> None:
self._ensure_services_ready()
self._portal.call(self._async_scheduler.pause_schedule, id)
self._portal.call(self._async_scheduler.pause_schedule, schedule_id)

def unpause_schedule(
self,
id: str,
schedule_id: str,
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
self._ensure_services_ready()
self._portal.call(
partial(
self._async_scheduler.unpause_schedule,
id,
schedule_id,
resume_from=resume_from,
)
)
Expand Down
26 changes: 1 addition & 25 deletions src/apscheduler/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import AsyncExitStack
from datetime import datetime
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Literal
from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator
from uuid import UUID

if sys.version_info >= (3, 11):
Expand Down Expand Up @@ -245,30 +245,6 @@ async def remove_schedules(self, ids: Iterable[str]) -> None:
:param ids: a specific set of schedule IDs to remove
"""

@abstractmethod
async def pause_schedules(self, ids: Iterable[str]) -> None:
"""
Pause the specified schedules.
:param ids: a specific set of schedule IDs to pause
"""

@abstractmethod
async def unpause_schedules(
self,
ids: Iterable[str],
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
"""
Unpause the specified schedules.
:param ids: a specific set of schedule IDs to unpause
:param resume_from: the time to resume the schedules from, or ``'now'`` as a
shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the
schedule left off which may cause it to misfire
"""

@abstractmethod
async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
"""
Expand Down
51 changes: 0 additions & 51 deletions src/apscheduler/datastores/base.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
from __future__ import annotations

from contextlib import AsyncExitStack
from datetime import datetime, timezone
from logging import Logger
from typing import Iterable, Literal

import attrs

from .._enums import ConflictPolicy
from .._retry import RetryMixin
from .._structures import Schedule
from ..abc import DataStore, EventBroker, Serializer
from ..serializers.pickle import PickleSerializer

Expand All @@ -33,53 +29,6 @@ async def start(
self._event_broker = event_broker
self._logger = logger

async def pause_schedules(self, ids: Iterable[str]) -> None:
for schedule in await self.get_schedules(ids):
await self.add_schedule(
attrs.evolve(schedule, paused=True),
ConflictPolicy.replace,
)

def _get_unpaused_next_fire_time(
self,
schedule: Schedule,
resume_from: datetime | Literal["now"] | None,
) -> datetime | None:
if resume_from is None:
return schedule.next_fire_time
if resume_from == "now":
resume_from = datetime.now(tz=timezone.utc)
if (
schedule.next_fire_time is not None
and schedule.next_fire_time >= resume_from
):
return schedule.next_fire_time
try:
while (next_fire_time := schedule.trigger.next()) < resume_from:
pass # Advance `next_fire_time` until its at or past `resume_from`
except TypeError: # The trigger is exhausted
return None
return next_fire_time

async def unpause_schedules(
self,
ids: Iterable[str],
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
for schedule in await self.get_schedules(ids):
await self.add_schedule(
attrs.evolve(
schedule,
paused=False,
next_fire_time=self._get_unpaused_next_fire_time(
schedule,
resume_from,
),
),
ConflictPolicy.replace,
)


@attrs.define(kw_only=True)
class BaseExternalDataStore(BaseDataStore, RetryMixin):
Expand Down
5 changes: 3 additions & 2 deletions src/apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
schedules: list[Schedule] = []
for state in self._schedules:
if state.next_fire_time is None or state.next_fire_time > now:
# The schedule is either exhausted or not yet due
continue
# The schedule is either exhausted or not yet due. There will be no
# schedules that are due after this one, so we can stop here.
break
elif state.schedule.paused:
# The schedule is paused
continue
Expand Down
5 changes: 3 additions & 2 deletions src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Uuid,
and_,
bindparam,
false,
or_,
select,
)
Expand Down Expand Up @@ -602,7 +603,7 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
and_(
self._t_schedules.c.next_fire_time.isnot(None),
comparison,
self._t_schedules.c.paused.is_(False),
self._t_schedules.c.paused == false(),
or_(
self._t_schedules.c.acquired_until.is_(None),
self._t_schedules.c.acquired_until < now,
Expand Down Expand Up @@ -757,7 +758,7 @@ async def get_next_schedule_run_time(self) -> datetime | None:
select(*columns)
.where(
self._t_schedules.c.next_fire_time.isnot(None),
self._t_schedules.c.paused.is_(False),
self._t_schedules.c.paused == false(),
)
.order_by(self._t_schedules.c.next_fire_time)
.limit(1)
Expand Down
Loading

0 comments on commit 61f7051

Please sign in to comment.