Skip to content

Commit

Permalink
Merge d0a827f into 4ca9251
Browse files Browse the repository at this point in the history
  • Loading branch information
WillDaSilva authored May 2, 2024
2 parents 4ca9251 + d0a827f commit 41b3ba1
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 13 deletions.
36 changes: 34 additions & 2 deletions src/apscheduler/_schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from inspect import isbuiltin, isclass, ismethod, ismodule
from logging import Logger, getLogger
from types import TracebackType
from typing import Any, Callable, Iterable, Mapping, cast, overload
from typing import Any, Callable, Iterable, Literal, Mapping, cast, overload
from uuid import UUID, uuid4

import anyio
Expand All @@ -30,7 +30,13 @@
from .. import JobAdded, SerializationError, TaskLookupError
from .._context import current_async_scheduler, current_job
from .._converters import as_enum, as_timedelta
from .._enums import CoalescePolicy, ConflictPolicy, JobOutcome, RunState, SchedulerRole
from .._enums import (
CoalescePolicy,
ConflictPolicy,
JobOutcome,
RunState,
SchedulerRole,
)
from .._events import (
Event,
JobReleased,
Expand Down Expand Up @@ -410,6 +416,7 @@ async def add_schedule(
id: str | None = None,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
paused: bool = False,
job_executor: str | UnsetValue = unset,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
Expand All @@ -427,6 +434,7 @@ async def add_schedule(
based ID will be assigned)
:param args: positional arguments to be passed to the task function
:param kwargs: keyword arguments to be passed to the task function
:param paused: whether the schedule is paused
:param job_executor: name of the job executor to run the task with
:param coalesce: determines what to do when processing the schedule if multiple
fire times have become due for this schedule since the last processing
Expand Down Expand Up @@ -478,6 +486,7 @@ async def add_schedule(
trigger=trigger,
args=args,
kwargs=kwargs,
paused=paused,
coalesce=coalesce,
misfire_grace_time=task.misfire_grace_time
if misfire_grace_time is unset
Expand Down Expand Up @@ -529,6 +538,29 @@ async def remove_schedule(self, id: str) -> None:
self._check_initialized()
await self.data_store.remove_schedules({id})

async def pause_schedule(self, id: str) -> None:
"""Pause the specified schedule."""
self._check_initialized()
await self.data_store.pause_schedules({id})

async def unpause_schedule(
self,
id: str,
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
"""Unpause the specified schedule.
By default, the schedule will be resumed as if it had never been paused, and all
missed runs will be considered misfires. The ``resume_from`` parameter can be
used to specify a different time from which to resume the schedule. The string
``'now'`` can be used as shorthand for ``datetime.now(tz=UTC)``. If
``resume_from`` is not ``None``, then the trigger will be repeatedly advanced
until the next fire time is at or after the specified time.
"""
self._check_initialized()
await self.data_store.unpause_schedules({id}, resume_from=resume_from)

async def add_job(
self,
func_or_task_id: TaskType,
Expand Down
32 changes: 29 additions & 3 deletions src/apscheduler/_schedulers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@
import threading
from collections.abc import MutableMapping, Sequence
from contextlib import ExitStack
from datetime import timedelta
from datetime import datetime, timedelta
from functools import partial
from logging import Logger
from types import TracebackType
from typing import Any, Callable, Iterable, Mapping, overload
from typing import Any, Callable, Iterable, Literal, Mapping, overload
from uuid import UUID

from anyio.from_thread import BlockingPortal, start_blocking_portal

from .. import current_scheduler
from .._enums import CoalescePolicy, ConflictPolicy, RunState, SchedulerRole
from .._enums import (
CoalescePolicy,
ConflictPolicy,
RunState,
SchedulerRole,
)
from .._events import Event, T_Event
from .._structures import Job, JobResult, Schedule, Task
from .._utils import UnsetValue, unset
Expand Down Expand Up @@ -238,6 +243,7 @@ def add_schedule(
id: str | None = None,
args: Iterable | None = None,
kwargs: Mapping[str, Any] | None = None,
paused: bool = False,
job_executor: str | UnsetValue = unset,
coalesce: CoalescePolicy = CoalescePolicy.latest,
misfire_grace_time: float | timedelta | None | UnsetValue = unset,
Expand All @@ -254,6 +260,7 @@ def add_schedule(
id=id,
args=args,
kwargs=kwargs,
paused=paused,
job_executor=job_executor,
coalesce=coalesce,
misfire_grace_time=misfire_grace_time,
Expand All @@ -275,6 +282,25 @@ def remove_schedule(self, id: str) -> None:
self._ensure_services_ready()
self._portal.call(self._async_scheduler.remove_schedule, id)

def pause_schedule(self, id: str) -> None:
self._ensure_services_ready()
self._portal.call(self._async_scheduler.pause_schedule, id)

def unpause_schedule(
self,
id: str,
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
self._ensure_services_ready()
self._portal.call(
partial(
self._async_scheduler.unpause_schedule,
id,
resume_from=resume_from,
)
)

def add_job(
self,
func_or_task_id: TaskType,
Expand Down
2 changes: 2 additions & 0 deletions src/apscheduler/_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Schedule:
:var str task_id: unique identifier of the task to be run on this schedule
:var tuple args: positional arguments to pass to the task callable
:var dict[str, Any] kwargs: keyword arguments to pass to the task callable
:var bool paused: whether the schedule is paused
:var CoalescePolicy coalesce: determines what to do when processing the schedule if
multiple fire times have become due for this schedule since the last processing
:var ~datetime.timedelta | None misfire_grace_time: maximum number of seconds the
Expand Down Expand Up @@ -105,6 +106,7 @@ class Schedule:
kwargs: dict[str, Any] = attrs.field(
eq=False, order=False, converter=dict, default=()
)
paused: bool = attrs.field(eq=False, order=False, default=False)
coalesce: CoalescePolicy = attrs.field(
eq=False,
order=False,
Expand Down
26 changes: 25 additions & 1 deletion src/apscheduler/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import AsyncExitStack
from datetime import datetime
from logging import Logger
from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator
from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Literal
from uuid import UUID

if sys.version_info >= (3, 11):
Expand Down Expand Up @@ -245,6 +245,30 @@ async def remove_schedules(self, ids: Iterable[str]) -> None:
:param ids: a specific set of schedule IDs to remove
"""

@abstractmethod
async def pause_schedules(self, ids: Iterable[str]) -> None:
"""
Pause the specified schedules.
:param ids: a specific set of schedule IDs to pause
"""

@abstractmethod
async def unpause_schedules(
self,
ids: Iterable[str],
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
"""
Unpause the specified schedules.
:param ids: a specific set of schedule IDs to unpause
:param resume_from: the time to resume the schedules from, or ``'now'`` as a
shorthand for ``datetime.now(tz=UTC)`` or ``None`` to resume from where the
schedule left off which may cause it to misfire
"""

@abstractmethod
async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]:
"""
Expand Down
51 changes: 51 additions & 0 deletions src/apscheduler/datastores/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from __future__ import annotations

from contextlib import AsyncExitStack
from datetime import datetime, timezone
from logging import Logger
from typing import Iterable, Literal

import attrs

from .._enums import ConflictPolicy
from .._retry import RetryMixin
from .._structures import Schedule
from ..abc import DataStore, EventBroker, Serializer
from ..serializers.pickle import PickleSerializer

Expand All @@ -29,6 +33,53 @@ async def start(
self._event_broker = event_broker
self._logger = logger

async def pause_schedules(self, ids: Iterable[str]) -> None:
for schedule in await self.get_schedules(ids):
await self.add_schedule(
attrs.evolve(schedule, paused=True),
ConflictPolicy.replace,
)

def _get_unpaused_next_fire_time(
self,
schedule: Schedule,
resume_from: datetime | Literal["now"] | None,
) -> datetime | None:
if resume_from is None:
return schedule.next_fire_time
if resume_from == "now":
resume_from = datetime.now(tz=timezone.utc)
if (
schedule.next_fire_time is not None
and schedule.next_fire_time >= resume_from
):
return schedule.next_fire_time
try:
while (next_fire_time := schedule.trigger.next()) < resume_from:
pass # Advance `next_fire_time` until its at or past `resume_from`
except TypeError: # The trigger is exhausted
return None
return next_fire_time

async def unpause_schedules(
self,
ids: Iterable[str],
*,
resume_from: datetime | Literal["now"] | None = None,
) -> None:
for schedule in await self.get_schedules(ids):
await self.add_schedule(
attrs.evolve(
schedule,
paused=False,
next_fire_time=self._get_unpaused_next_fire_time(
schedule,
resume_from,
),
),
ConflictPolicy.replace,
)


@attrs.define(kw_only=True)
class BaseExternalDataStore(BaseDataStore, RetryMixin):
Expand Down
7 changes: 5 additions & 2 deletions src/apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,11 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
schedules: list[Schedule] = []
for state in self._schedules:
if state.next_fire_time is None or state.next_fire_time > now:
# The schedule is either paused or not yet due
break
# The schedule is either exhausted or not yet due
continue
elif state.schedule.paused:
# The schedule is paused
continue
elif state.acquired_by is not None:
if state.acquired_by != scheduler_id and now <= state.acquired_until:
# The schedule has been acquired by another scheduler and the
Expand Down
16 changes: 13 additions & 3 deletions src/apscheduler/datastores/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,19 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
lambda: self._schedules.find(
{
"next_fire_time": {"$lte": now},
"$or": [
{"acquired_until": {"$exists": False}},
{"acquired_until": {"$lt": now}},
"$and": [
{
"$or": [
{"paused": {"$exists": False}},
{"paused": False},
]
},
{
"$or": [
{"acquired_until": {"$exists": False}},
{"acquired_until": {"$lt": now}},
]
},
],
},
session=session,
Expand Down
8 changes: 7 additions & 1 deletion src/apscheduler/datastores/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from anyio import CancelScope, to_thread
from sqlalchemy import (
BigInteger,
Boolean,
Column,
DateTime,
Enum,
Expand Down Expand Up @@ -293,6 +294,7 @@ def get_table_definitions(self) -> MetaData:
Column("trigger", LargeBinary),
Column("args", LargeBinary),
Column("kwargs", LargeBinary),
Column("paused", Boolean, nullable=False, server_default=literal(False)),
Column("coalesce", Enum(CoalescePolicy), nullable=False),
Column("misfire_grace_time", interval_type),
Column("max_jitter", interval_type),
Expand Down Expand Up @@ -600,6 +602,7 @@ async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedul
and_(
self._t_schedules.c.next_fire_time.isnot(None),
comparison,
self._t_schedules.c.paused.is_(False),
or_(
self._t_schedules.c.acquired_until.is_(None),
self._t_schedules.c.acquired_until < now,
Expand Down Expand Up @@ -752,7 +755,10 @@ async def get_next_schedule_run_time(self) -> datetime | None:

statenent = (
select(*columns)
.where(self._t_schedules.c.next_fire_time.isnot(None))
.where(
self._t_schedules.c.next_fire_time.isnot(None),
self._t_schedules.c.paused.is_(False),
)
.order_by(self._t_schedules.c.next_fire_time)
.limit(1)
)
Expand Down
Loading

0 comments on commit 41b3ba1

Please sign in to comment.