Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def task_state(args) -> None:
dag = get_dag(args.subdir, args.dag_id, from_db=True)
task = dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(task, args.map_index, logical_date_or_run_id=args.logical_date_or_run_id)
print(ti.current_state())
print(ti.state)


@cli_utils.action_cli(check_db=False)
Expand Down
16 changes: 0 additions & 16 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1947,22 +1947,6 @@ def mark_success_url(self) -> str:
"&state=success"
)

@provide_session
def current_state(self, session: Session = NEW_SESSION) -> str:
"""
Get the very latest state from the database.

If a session is passed, we use and looking up the state becomes part of the session,
otherwise a new session is used.

sqlalchemy.inspect is used here to get the primary keys ensuring that if they change
it will not regress

:param session: SQLAlchemy ORM Session
"""
filters = (col == getattr(self, col.name) for col in inspect(TaskInstance).primary_key)
return session.query(TaskInstance.state).filter(*filters).scalar()

@provide_session
def error(self, session: Session = NEW_SESSION) -> None:
"""
Expand Down
29 changes: 0 additions & 29 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,6 @@ def test_set_task_dates(self, dag_maker):
assert op3.start_date == DEFAULT_DATE + datetime.timedelta(days=1)
assert op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9)

def test_current_state(self, create_task_instance, session):
ti = create_task_instance(session=session)
assert ti.current_state(session=session) is None
ti.run()
assert ti.current_state(session=session) == State.SUCCESS

def test_set_dag(self, dag_maker):
"""
Test assigning Operators to Dags, including deferred assignment
Expand Down Expand Up @@ -2250,29 +2244,6 @@ def test_outlet_assets_failed(self, create_task_instance, testing_dag_bundle):
# check that no asset events were generated
assert session.query(AssetEvent).count() == 0

def test_mapped_current_state(self, dag_maker):
with dag_maker(dag_id="test_mapped_current_state") as _:
from airflow.sdk import task

@task()
def raise_an_exception(placeholder: int):
if placeholder == 0:
raise AirflowFailException("failing task")
else:
pass

_ = raise_an_exception.expand(placeholder=[0, 1])

tis = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances
for task_instance in tis:
if task_instance.map_index == 0:
with pytest.raises(AirflowFailException):
task_instance.run()
assert task_instance.current_state() == TaskInstanceState.FAILED
else:
task_instance.run()
assert task_instance.current_state() == TaskInstanceState.SUCCESS

def test_outlet_assets_skipped(self, testing_dag_bundle):
"""
Verify that when we have an outlet asset on a task, and the task
Expand Down