From 915b1047bb25cd8a3e6833f68bb0e327a0cef8e3 Mon Sep 17 00:00:00 2001 From: Bowrna Date: Thu, 7 Sep 2023 14:25:40 +0530 Subject: [PATCH] committing old changes --- airflow/jobs/job.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index 200d55f023fd8..1f09a95b57b54 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -102,6 +102,7 @@ class Job(Base, LoggingMixin): def __init__(self, executor=None, heartrate=None, **kwargs): # Save init parameters as DB fields + self.heartbeat_failed = False self.hostname = get_hostname() if executor: self.executor = executor @@ -217,9 +218,22 @@ def heartbeat( heartbeat_callback(session) self.log.debug("[heartbeat]") + self.heartbeat_failed = False except OperationalError: Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) - self.log.exception("%s heartbeat got an exception", self.__class__.__name__) + if not self.heartbeat_failed: + self.log.exception("%s heartbeat got an exception", self.__class__.__name__) + self.heartbeat_failed = True + if self.is_alive(): + self.log.error( + "%s heartbeat failed with error. Scheduler may go into unhealthy state", + self.__class__.__name__, + ) + else: + self.log.error( + "%s heartbeat failed with error. Scheduler is in unhealthy state", self.__class__.__name__ + ) + # self.log.exception("%s heartbeat got an exception", self.__class__.__name__) # We didn't manage to heartbeat, so make sure that the timestamp isn't updated self.latest_heartbeat = previous_heartbeat