diff --git a/oper8/watch_manager/python_watch_manager/threads/__init__.py b/oper8/watch_manager/python_watch_manager/threads/__init__.py index cc9521d..ba5e345 100644 --- a/oper8/watch_manager/python_watch_manager/threads/__init__.py +++ b/oper8/watch_manager/python_watch_manager/threads/__init__.py @@ -1,6 +1,7 @@ """Import the ThreadBase and subclasses""" # Local from .base import ThreadBase +from .heartbeat import HeartbeatThread from .reconcile import ReconcileThread from .timer import TimerThread from .watch import WatchThread diff --git a/oper8/watch_manager/python_watch_manager/threads/heartbeat.py b/oper8/watch_manager/python_watch_manager/threads/heartbeat.py new file mode 100644 index 0000000..8991eba --- /dev/null +++ b/oper8/watch_manager/python_watch_manager/threads/heartbeat.py @@ -0,0 +1,52 @@ +""" +Thread class that will dump a heartbeat to a file periodically +""" + +# Standard +from datetime import datetime, timedelta + +# Local +from ....exceptions import assert_config +from ..utils import parse_time_delta +from .timer import TimerThread + + +class HeartbeatThread(TimerThread): + """The HeartbeatThread acts as a pulse for the PythonWatchManager. + + This thread will periodically dump the value of "now" to a file which can be + read by an observer such as a liveness/readiness probe to ensure that the + manager is functioning well. + """ + + # This format is designed to be read using `date -d $(cat heartbeat.txt)` + # using the GNU date utility + # CITE: https://www.gnu.org/software/coreutils/manual/html_node/Examples-of-date.html + _DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + + def __init__(self, heartbeat_file: str, heartbeat_period: str): + """Initialize with the file location for the heartbeat output + + Args: + heartbeat_file: str + The fully-qualified path to the heartbeat file + heartbeat_period: str + Time delta string representing period delay between beats. + NOTE: The GNU `date` utility cannot parse sub-seconds easily, so + the expected configuration for this is to be >= 1s + """ + self._heartbeat_file = heartbeat_file + self._offset = parse_time_delta(heartbeat_period) + assert_config( + self._offset >= timedelta(seconds=1), + "heartbeat_period must be >= 1s", + ) + super().__init__(name="heartbeat_thread") + self.put_event(datetime.now(), self._run_heartbeat) + + def _run_heartbeat(self): + """Run the heartbeat dump to the heartbeat file and put the next beat""" + now = datetime.now() + with open(self._heartbeat_file, "w") as handle: + handle.write(now.strftime(self._DATE_FORMAT)) + self.put_event(now + self._offset, self._run_heartbeat) diff --git a/oper8/watch_manager/python_watch_manager/threads/timer.py b/oper8/watch_manager/python_watch_manager/threads/timer.py index 0dc92e7..eaf2253 100644 --- a/oper8/watch_manager/python_watch_manager/threads/timer.py +++ b/oper8/watch_manager/python_watch_manager/threads/timer.py @@ -24,9 +24,9 @@ class TimerThread(ThreadBase, metaclass=Singleton): to threading.Timer stdlib class except that it uses one shared thread for all events instead of a thread per event.""" - def __init__(self): + def __init__(self, name: Optional[str] = None): """Initialize a priorityqueue like object and a synchronization object""" - super().__init__(name="timer_thread", daemon=True) + super().__init__(name=name or "timer_thread", daemon=True) # Use a heap queue instead of a queue.PriorityQueue as we're already handling # synchronization with the notify condition