-
Notifications
You must be signed in to change notification settings - Fork 225
/
recurring_task.py
44 lines (34 loc) · 1.55 KB
/
recurring_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from __future__ import annotations
import asyncio
from logging import getLogger
from typing import TYPE_CHECKING, Callable
if TYPE_CHECKING:
from datetime import timedelta
logger = getLogger(__name__)
class RecurringTask:
"""Class for creating and managing recurring tasks.
Attributes:
func: The function to be executed repeatedly.
delay: The time delay (in seconds) between function calls.
task: The underlying task object.
"""
def __init__(self, func: Callable, delay: timedelta) -> None:
logger.debug(f'Calling RecurringTask.__init__(func={func.__name__}, delay={delay})...')
self.func = func
self.delay = delay
self.task: asyncio.Task | None = None
async def _wrapper(self) -> None:
"""Internal method that repeatedly executes the provided function with the specified delay."""
sleep_time_secs = self.delay.total_seconds()
while True:
await self.func() if asyncio.iscoroutinefunction(self.func) else self.func()
await asyncio.sleep(sleep_time_secs)
def start(self) -> None:
"""Start the recurring task execution."""
self.task = asyncio.create_task(self._wrapper(), name=f'Task-recurring-{self.func.__name__}')
async def stop(self) -> None:
"""Stop the recurring task execution."""
if self.task:
self.task.cancel()
# Ensure the task has a chance to properly handle the cancellation and any potential exceptions.
await asyncio.gather(self.task, return_exceptions=True)