Skip to content

Commit

Permalink
refactor: Replace StaleJobFailer class with fail_stale_jobs funct…
Browse files Browse the repository at this point in the history
…ion (meltano#6610)
  • Loading branch information
WillDaSilva committed Aug 12, 2022
1 parent f0dd8f9 commit 175d7a1
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 32 deletions.
4 changes: 2 additions & 2 deletions src/meltano/cli/elt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from meltano.core.db import project_engine
from meltano.core.elt_context import ELTContextBuilder
from meltano.core.job import Job, JobFinder
from meltano.core.job.stale_job_failer import StaleJobFailer
from meltano.core.job.stale_job_failer import fail_stale_jobs
from meltano.core.logging import JobLoggingService, OutputLogger
from meltano.core.plugin import PluginType
from meltano.core.plugin.error import PluginNotFoundError
Expand Down Expand Up @@ -221,7 +221,7 @@ async def dump_file(context_builder, dumpable):


async def _run_job(tracker, project, job, session, context_builder, force=False):
StaleJobFailer(job.job_name).fail_stale_jobs(session)
fail_stale_jobs(session, job.job_name)

if not force:
existing = JobFinder(job.job_name).latest_running(session)
Expand Down
4 changes: 2 additions & 2 deletions src/meltano/cli/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sqlalchemy.orm import Session

from meltano.core.db import project_engine
from meltano.core.job.stale_job_failer import StaleJobFailer
from meltano.core.job.stale_job_failer import fail_stale_jobs
from meltano.core.legacy_tracking import LegacyTracker
from meltano.core.schedule import Schedule
from meltano.core.schedule_service import ScheduleAlreadyExistsError, ScheduleService
Expand Down Expand Up @@ -186,7 +186,7 @@ def list(ctx, format): # noqa: WPS125
_, sessionMaker = project_engine(project) # noqa: N806
session = sessionMaker()
try:
StaleJobFailer().fail_stale_jobs(session)
fail_stale_jobs(session)

if format == "text":
transform_elt_markers = {
Expand Down
4 changes: 2 additions & 2 deletions src/meltano/core/block/extract_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from meltano.core.db import project_engine
from meltano.core.elt_context import PluginContext
from meltano.core.job import Job, JobFinder
from meltano.core.job.stale_job_failer import StaleJobFailer
from meltano.core.job.stale_job_failer import fail_stale_jobs
from meltano.core.logging import JobLoggingService, OutputLogger
from meltano.core.plugin import PluginType
from meltano.core.plugin.project_plugin import ProjectPlugin
Expand Down Expand Up @@ -450,7 +450,7 @@ async def run_with_job(self) -> None:
RunnerError: if failures are encountered during execution or if the underlying pipeline/job is already running.
"""
job = self.context.job
StaleJobFailer(job.job_name).fail_stale_jobs(self.context.session)
fail_stale_jobs(self.context.session, job.job_name)
if not self.context.force:
existing = JobFinder(job.job_name).latest_running(self.context.session)
if existing:
Expand Down
35 changes: 14 additions & 21 deletions src/meltano/core/job/stale_job_failer.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,32 @@
"""Defines StaleJobFailer."""
"""Defines `fail_stale_jobs`."""

from __future__ import annotations

import logging

from sqlalchemy.orm import Session

from .finder import JobFinder

logger = logging.getLogger(__name__)


class StaleJobFailer:
"""Class that will look for stale jobs and mark them as failed."""

def __init__(self, state_id=None):
"""Initialize stale job failer with optional state ID to filter by."""
self.state_id = state_id

def fail_stale_jobs(self, session):
"""Mark all stale jobs as failed."""
for job in self._stale_jobs(session):
self._fail_stale_job(job, session)

def _stale_jobs(self, session):
if self.state_id:
return JobFinder(self.state_id).stale(session)

return JobFinder.all_stale(session)
def fail_stale_jobs(session: Session, state_id: str | None = None) -> None:
"""Mark stale jobs as failed.
def _fail_stale_job(self, job, session):
Args:
session: An ORM DB session.
state_id: If provided, only jobs with this state ID will be failed if stale.
"""
finder = JobFinder.all_stale if state_id is None else JobFinder(state_id).stale
for job in finder(session):
if not job.fail_stale():
return
continue

job.save(session)

# No need to mention state ID if they're all going to be the same.
with_state_id = "" if self.state_id else f" with state ID '{job.job_name}'"
with_state_id = "" if state_id else f" with state ID '{job.job_name}'"

error = job.payload["error"]
logger.info(
Expand Down
8 changes: 3 additions & 5 deletions tests/meltano/core/job/test_stale_job_failer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

from meltano.core.job import Job
from meltano.core.job.stale_job_failer import StaleJobFailer
from meltano.core.job.stale_job_failer import fail_stale_jobs


class TestStaleJobFailer:
Expand Down Expand Up @@ -50,8 +50,7 @@ def test_fail_stale_jobs(
assert stale_job.is_stale()
assert other_stale_job.is_stale()

failer = StaleJobFailer()
failer.fail_stale_jobs(session)
fail_stale_jobs(session)

session.refresh(live_job)
session.refresh(stale_job)
Expand All @@ -75,8 +74,7 @@ def test_fail_stale_jobs_with_state_id(
assert stale_job.is_stale()
assert other_stale_job.is_stale()

failer = StaleJobFailer(state_id=stale_job.job_name)
failer.fail_stale_jobs(session)
fail_stale_jobs(session, state_id=stale_job.job_name)

session.refresh(live_job)
session.refresh(stale_job)
Expand Down

0 comments on commit 175d7a1

Please sign in to comment.