Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed #9387: Add State types for tasks and DAGs #15285

Merged
merged 1 commit into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks
from airflow.utils.state import State
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.types import DagRunType

if TYPE_CHECKING:
Expand Down Expand Up @@ -314,7 +314,9 @@ def generate_run_id(run_type: DagRunType, execution_date: datetime) -> str:
return f"{run_type}__{execution_date.isoformat()}"

@provide_session
def get_task_instances(self, state=None, session=None) -> Iterable[TI]:
def get_task_instances(
self, state: Optional[Iterable[TaskInstanceState]] = None, session=None
) -> Iterable[TI]:
"""Returns the task instances for this dag run"""
tis = session.query(TI).filter(
TI.dag_id == self.dag_id,
Expand Down
2 changes: 1 addition & 1 deletion airflow/typing_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""

try:
# Protocol and TypedDict are only added to typing module starting from
# Literal, Protocol and TypedDict are only added to typing module starting from
# python 3.8 we can safely remove this shim import after Airflow drops
# support for <3.8
from typing import Literal, Protocol, TypedDict, runtime_checkable # type: ignore
Expand Down
169 changes: 102 additions & 67 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,70 +16,101 @@
# specific language governing permissions and limitations
# under the License.

from enum import Enum
from typing import Dict, FrozenSet, Tuple

from airflow.settings import STATE_COLORS
from airflow.utils.types import Optional


class State:
class TaskInstanceState(str, Enum):
"""
Static class with task instance states constants and color method to
avoid hardcoding.
Enum that represents all possible states that a Task Instance can be in.

Note that None is also allowed, so always use this in a type hint with Optional.
"""

# scheduler
NONE = None # type: None
REMOVED = "removed"
SCHEDULED = "scheduled"
# Set by the scheduler
# None - Task is created but should not run yet
REMOVED = "removed" # Task vanished from DAG before it ran
SCHEDULED = "scheduled" # Task should run and will be handed to executor soon

# Set by the task instance itself
QUEUED = "queued" # Executor has enqueued the task
RUNNING = "running" # Task is executing
SUCCESS = "success" # Task completed
SHUTDOWN = "shutdown" # External request to shut down
FAILED = "failed" # Task errored out
UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
SKIPPED = "skipped" # Skipped by branching or some other mechanism
SENSING = "sensing" # Smart sensor offloaded to the sensor DAG

andrewgodwin marked this conversation as resolved.
Show resolved Hide resolved
# set by the executor (t.b.d.)
# LAUNCHED = "launched"
def __str__(self) -> str: # pylint: disable=invalid-str-returned
return self.value


class DagRunState(str, Enum):
"""
Enum that represents all possible states that a DagRun can be in.

These are "shared" with TaskInstanceState in some parts of the code,
so please ensure that their values always match the ones with the
same name in TaskInstanceState.
"""

# set by a task
QUEUED = "queued"
RUNNING = "running"
SUCCESS = "success"
SHUTDOWN = "shutdown" # External request to shut down
FAILED = "failed"
UP_FOR_RETRY = "up_for_retry"
UP_FOR_RESCHEDULE = "up_for_reschedule"
UPSTREAM_FAILED = "upstream_failed"
SKIPPED = "skipped"
SENSING = "sensing"

task_states = (
SUCCESS,
RUNNING,
FAILED,
UPSTREAM_FAILED,
SKIPPED,
UP_FOR_RETRY,
UP_FOR_RESCHEDULE,
QUEUED,
NONE,
SCHEDULED,
SENSING,
REMOVED,
)

dag_states = (
SUCCESS,
RUNNING,
FAILED,

class State:
"""
Static class with task instance state constants and color methods to
avoid hardcoding.
"""

# Backwards-compat constants for code that does not yet use the enum
# These first three are shared by DagState and TaskState
SUCCESS = TaskInstanceState.SUCCESS
RUNNING = TaskInstanceState.RUNNING
FAILED = TaskInstanceState.FAILED

# These are TaskState only
NONE = None
REMOVED = TaskInstanceState.REMOVED
SCHEDULED = TaskInstanceState.SCHEDULED
QUEUED = TaskInstanceState.QUEUED
SHUTDOWN = TaskInstanceState.SHUTDOWN
UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
SKIPPED = TaskInstanceState.SKIPPED
SENSING = TaskInstanceState.SENSING

task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)

dag_states: Tuple[DagRunState, ...] = (
DagRunState.SUCCESS,
DagRunState.RUNNING,
DagRunState.FAILED,
)

state_color = {
QUEUED: 'gray',
RUNNING: 'lime',
SUCCESS: 'green',
SHUTDOWN: 'blue',
FAILED: 'red',
UP_FOR_RETRY: 'gold',
UP_FOR_RESCHEDULE: 'turquoise',
UPSTREAM_FAILED: 'orange',
SKIPPED: 'pink',
REMOVED: 'lightgrey',
SCHEDULED: 'tan',
NONE: 'lightblue',
SENSING: 'lightseagreen',
state_color: Dict[Optional[TaskInstanceState], str] = {
None: 'lightblue',
TaskInstanceState.QUEUED: 'gray',
TaskInstanceState.RUNNING: 'lime',
TaskInstanceState.SUCCESS: 'green',
TaskInstanceState.SHUTDOWN: 'blue',
TaskInstanceState.FAILED: 'red',
TaskInstanceState.UP_FOR_RETRY: 'gold',
TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
TaskInstanceState.UPSTREAM_FAILED: 'orange',
TaskInstanceState.SKIPPED: 'pink',
TaskInstanceState.REMOVED: 'lightgrey',
TaskInstanceState.SCHEDULED: 'tan',
TaskInstanceState.SENSING: 'lightseagreen',
}
state_color.update(STATE_COLORS) # type: ignore

Expand All @@ -96,17 +127,17 @@ def color_fg(cls, state):
return 'white'
return 'black'

running = frozenset([RUNNING, SENSING])
running: FrozenSet[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.SENSING])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, surprised this type hint is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be, I just like adding explicit type hints where I think it helps the reader of the code understand what's going on.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I also often do that even if not technically needed to make it explicit. I think code should be much more optimised for reader's time than writer's time to write it ;).

"""
A list of states indicating that a task is being executed.
"""

finished = frozenset(
finished: FrozenSet[TaskInstanceState] = frozenset(
[
SUCCESS,
FAILED,
SKIPPED,
UPSTREAM_FAILED,
TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED,
TaskInstanceState.SKIPPED,
TaskInstanceState.UPSTREAM_FAILED,
]
)
"""
Expand All @@ -118,29 +149,33 @@ def color_fg(cls, state):
case, it is no longer running.
"""

unfinished = frozenset(
unfinished: FrozenSet[Optional[TaskInstanceState]] = frozenset(
[
NONE,
SCHEDULED,
QUEUED,
RUNNING,
SENSING,
SHUTDOWN,
UP_FOR_RETRY,
UP_FOR_RESCHEDULE,
None,
TaskInstanceState.SCHEDULED,
TaskInstanceState.QUEUED,
TaskInstanceState.RUNNING,
TaskInstanceState.SENSING,
TaskInstanceState.SHUTDOWN,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
]
)
"""
A list of states indicating that a task either has not completed
a run or has not even started.
"""

failed_states = frozenset([FAILED, UPSTREAM_FAILED])
failed_states: FrozenSet[TaskInstanceState] = frozenset(
[TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
)
"""
A list of states indicating that a task or dag is a failed state.
"""

success_states = frozenset([SUCCESS, SKIPPED])
success_states: FrozenSet[TaskInstanceState] = frozenset(
[TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
)
"""
A list of states indicating that a task or dag is a success state.
"""
Expand Down