Skip to content

Commit

Permalink
Add retries to job heartbeat (apache#37541)
Browse files Browse the repository at this point in the history
* Job heartbeat retries

* Tweak format

* Retry on individual db operations
  • Loading branch information
awdavidson authored and howardyoo committed Mar 18, 2024
1 parent 5ddd34c commit 749280c
Showing 1 changed file with 3 additions and 0 deletions.
3 changes: 3 additions & 0 deletions airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.platform import getuser
from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
from airflow.utils.state import JobState
Expand Down Expand Up @@ -302,6 +303,7 @@ def _kill(job_id: str, session: Session = NEW_SESSION) -> Job | JobPydantic:
@staticmethod
@internal_api_call
@provide_session
@retry_db_transaction
def _fetch_from_db(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic | None:
if isinstance(job, Job):
# not Internal API
Expand Down Expand Up @@ -342,6 +344,7 @@ def _update_in_db(job: Job | JobPydantic, session: Session = NEW_SESSION):
@staticmethod
@internal_api_call
@provide_session
@retry_db_transaction
def _update_heartbeat(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic:
orm_job: Job | None = session.scalar(select(Job).where(Job.id == job.id).limit(1))
if orm_job is None:
Expand Down

0 comments on commit 749280c

Please sign in to comment.