Skip to content

Commit

Permalink
Add State types for tasks and DAGs (#15285)
Browse files Browse the repository at this point in the history
This adds TaskState and DagState enum types that contain all possible states, makes all other core state constants derive their values from them, and adds a couple of initial type hints that use the new enums (with the plan being that we can add signficantly more later).

closes: #9387
(cherry picked from commit 2b7c596)
  • Loading branch information
Andrew Godwin authored and jhtimmins committed Aug 13, 2021
1 parent 6ebbf82 commit dd94647
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 70 deletions.
6 changes: 4 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks
from airflow.utils.state import State
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.types import DagRunType

if TYPE_CHECKING:
Expand Down Expand Up @@ -302,7 +302,9 @@ def generate_run_id(run_type: DagRunType, execution_date: datetime) -> str:
return f"{run_type}__{execution_date.isoformat()}"

@provide_session
def get_task_instances(self, state=None, session=None) -> Iterable[TI]:
def get_task_instances(
self, state: Optional[Iterable[TaskInstanceState]] = None, session=None
) -> Iterable[TI]:
"""Returns the task instances for this dag run"""
tis = session.query(TI).filter(
TI.dag_id == self.dag_id,
Expand Down
2 changes: 1 addition & 1 deletion airflow/typing_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""

try:
# Protocol and TypedDict are only added to typing module starting from
# Literal, Protocol and TypedDict are only added to typing module starting from
# python 3.8 we can safely remove this shim import after Airflow drops
# support for <3.8
from typing import Literal, Protocol, TypedDict, runtime_checkable # type: ignore
Expand Down
169 changes: 102 additions & 67 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,70 +16,101 @@
# specific language governing permissions and limitations
# under the License.

from enum import Enum
from typing import Dict, FrozenSet, Tuple

from airflow.settings import STATE_COLORS
from airflow.utils.types import Optional


class State:
class TaskInstanceState(str, Enum):
"""
Static class with task instance states constants and color method to
avoid hardcoding.
Enum that represents all possible states that a Task Instance can be in.
Note that None is also allowed, so always use this in a type hint with Optional.
"""

# scheduler
NONE = None # type: None
REMOVED = "removed"
SCHEDULED = "scheduled"
# Set by the scheduler
# None - Task is created but should not run yet
REMOVED = "removed" # Task vanished from DAG before it ran
SCHEDULED = "scheduled" # Task should run and will be handed to executor soon

# Set by the task instance itself
QUEUED = "queued" # Executor has enqueued the task
RUNNING = "running" # Task is executing
SUCCESS = "success" # Task completed
SHUTDOWN = "shutdown" # External request to shut down
FAILED = "failed" # Task errored out
UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
SKIPPED = "skipped" # Skipped by branching or some other mechanism
SENSING = "sensing" # Smart sensor offloaded to the sensor DAG

# set by the executor (t.b.d.)
# LAUNCHED = "launched"
def __str__(self) -> str: # pylint: disable=invalid-str-returned
return self.value


class DagRunState(str, Enum):
"""
Enum that represents all possible states that a DagRun can be in.
These are "shared" with TaskInstanceState in some parts of the code,
so please ensure that their values always match the ones with the
same name in TaskInstanceState.
"""

# set by a task
QUEUED = "queued"
RUNNING = "running"
SUCCESS = "success"
SHUTDOWN = "shutdown" # External request to shut down
FAILED = "failed"
UP_FOR_RETRY = "up_for_retry"
UP_FOR_RESCHEDULE = "up_for_reschedule"
UPSTREAM_FAILED = "upstream_failed"
SKIPPED = "skipped"
SENSING = "sensing"

task_states = (
SUCCESS,
RUNNING,
FAILED,
UPSTREAM_FAILED,
SKIPPED,
UP_FOR_RETRY,
UP_FOR_RESCHEDULE,
QUEUED,
NONE,
SCHEDULED,
SENSING,
REMOVED,
)

dag_states = (
SUCCESS,
RUNNING,
FAILED,

class State:
"""
Static class with task instance state constants and color methods to
avoid hardcoding.
"""

# Backwards-compat constants for code that does not yet use the enum
# These first three are shared by DagState and TaskState
SUCCESS = TaskInstanceState.SUCCESS
RUNNING = TaskInstanceState.RUNNING
FAILED = TaskInstanceState.FAILED

# These are TaskState only
NONE = None
REMOVED = TaskInstanceState.REMOVED
SCHEDULED = TaskInstanceState.SCHEDULED
QUEUED = TaskInstanceState.QUEUED
SHUTDOWN = TaskInstanceState.SHUTDOWN
UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
SKIPPED = TaskInstanceState.SKIPPED
SENSING = TaskInstanceState.SENSING

task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)

dag_states: Tuple[DagRunState, ...] = (
DagRunState.SUCCESS,
DagRunState.RUNNING,
DagRunState.FAILED,
)

state_color = {
QUEUED: 'gray',
RUNNING: 'lime',
SUCCESS: 'green',
SHUTDOWN: 'blue',
FAILED: 'red',
UP_FOR_RETRY: 'gold',
UP_FOR_RESCHEDULE: 'turquoise',
UPSTREAM_FAILED: 'orange',
SKIPPED: 'pink',
REMOVED: 'lightgrey',
SCHEDULED: 'tan',
NONE: 'lightblue',
SENSING: 'lightseagreen',
state_color: Dict[Optional[TaskInstanceState], str] = {
None: 'lightblue',
TaskInstanceState.QUEUED: 'gray',
TaskInstanceState.RUNNING: 'lime',
TaskInstanceState.SUCCESS: 'green',
TaskInstanceState.SHUTDOWN: 'blue',
TaskInstanceState.FAILED: 'red',
TaskInstanceState.UP_FOR_RETRY: 'gold',
TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
TaskInstanceState.UPSTREAM_FAILED: 'orange',
TaskInstanceState.SKIPPED: 'pink',
TaskInstanceState.REMOVED: 'lightgrey',
TaskInstanceState.SCHEDULED: 'tan',
TaskInstanceState.SENSING: 'lightseagreen',
}
state_color.update(STATE_COLORS) # type: ignore

Expand All @@ -96,17 +127,17 @@ def color_fg(cls, state):
return 'white'
return 'black'

running = frozenset([RUNNING, SENSING])
running: FrozenSet[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.SENSING])
"""
A list of states indicating that a task is being executed.
"""

finished = frozenset(
finished: FrozenSet[TaskInstanceState] = frozenset(
[
SUCCESS,
FAILED,
SKIPPED,
UPSTREAM_FAILED,
TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED,
TaskInstanceState.SKIPPED,
TaskInstanceState.UPSTREAM_FAILED,
]
)
"""
Expand All @@ -118,29 +149,33 @@ def color_fg(cls, state):
case, it is no longer running.
"""

unfinished = frozenset(
unfinished: FrozenSet[Optional[TaskInstanceState]] = frozenset(
[
NONE,
SCHEDULED,
QUEUED,
RUNNING,
SENSING,
SHUTDOWN,
UP_FOR_RETRY,
UP_FOR_RESCHEDULE,
None,
TaskInstanceState.SCHEDULED,
TaskInstanceState.QUEUED,
TaskInstanceState.RUNNING,
TaskInstanceState.SENSING,
TaskInstanceState.SHUTDOWN,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
]
)
"""
A list of states indicating that a task either has not completed
a run or has not even started.
"""

failed_states = frozenset([FAILED, UPSTREAM_FAILED])
failed_states: FrozenSet[TaskInstanceState] = frozenset(
[TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
)
"""
A list of states indicating that a task or dag is a failed state.
"""

success_states = frozenset([SUCCESS, SKIPPED])
success_states: FrozenSet[TaskInstanceState] = frozenset(
[TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
)
"""
A list of states indicating that a task or dag is a success state.
"""
Expand Down

0 comments on commit dd94647

Please sign in to comment.