Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class Job(Base, LoggingMixin):

def __init__(self, executor=None, heartrate=None, **kwargs):
# Save init parameters as DB fields
self.heartbeat_failed = False
self.hostname = get_hostname()
if executor:
self.executor = executor
Expand Down Expand Up @@ -217,9 +218,22 @@ def heartbeat(

heartbeat_callback(session)
self.log.debug("[heartbeat]")
self.heartbeat_failed = False
except OperationalError:
Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
self.log.exception("%s heartbeat got an exception", self.__class__.__name__)
if not self.heartbeat_failed:
self.log.exception("%s heartbeat got an exception", self.__class__.__name__)
self.heartbeat_failed = True
if self.is_alive():
self.log.error(
"%s heartbeat failed with error. Scheduler may go into unhealthy state",
self.__class__.__name__,
)
else:
self.log.error(
"%s heartbeat failed with error. Scheduler is in unhealthy state", self.__class__.__name__
)
# self.log.exception("%s heartbeat got an exception", self.__class__.__name__)
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat

Expand Down