diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 7c318fc499ed6..2cb336c8ea737 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1306,6 +1306,7 @@ def _on_term(signum, frame): try: result = _execute_task(context=context, ti=ti, log=log) + log.info("::group::Post Execute") except Exception: import jinja2 @@ -1325,22 +1326,24 @@ def _on_term(signum, frame): # Send update only if value changed (e.g., user set context variables during execution) if ti.rendered_map_index and ti.rendered_map_index != previous_rendered_map_index: SUPERVISOR_COMMS.send(msg=SetRenderedMapIndex(rendered_map_index=ti.rendered_map_index)) - finally: - log.info("::group::Post Execute") _push_xcom_if_needed(result, ti, log) msg, state = _handle_current_task_success(context, ti) except DownstreamTasksSkipped as skip: + log.info("::group::Post Execute") log.info("Skipping downstream tasks.") tasks_to_skip = skip.tasks if isinstance(skip.tasks, list) else [skip.tasks] SUPERVISOR_COMMS.send(msg=SkipDownstreamTasks(tasks=tasks_to_skip)) msg, state = _handle_current_task_success(context, ti) except DagRunTriggerException as drte: + log.info("::group::Post Execute") msg, state = _handle_trigger_dag_run(drte, context, ti, log) except TaskDeferred as defer: + log.info("::group::Post Execute") msg, state = _defer_task(defer, ti, log) except AirflowSkipException as e: + log.info("::group::Post Execute") if e.args: log.info("Skipping task.", reason=e.args[0]) msg = TaskState( @@ -1350,6 +1353,7 @@ def _on_term(signum, frame): ) state = TaskInstanceState.SKIPPED except AirflowRescheduleException as reschedule: + log.info("::group::Post Execute") log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE") msg = RescheduleTask( reschedule_date=reschedule.reschedule_date, end_date=datetime.now(tz=timezone.utc) @@ -1359,6 +1363,7 @@ def _on_term(signum, frame): # If AirflowFailException is raised, task should not retry. # If a sensor in reschedule mode reaches timeout, task should not retry. log.exception("Task failed with exception") + log.info("::group::Post Execute") ti.end_date = datetime.now(tz=timezone.utc) msg = TaskState( state=TaskInstanceState.FAILED, @@ -1370,6 +1375,7 @@ def _on_term(signum, frame): except (AirflowTaskTimeout, AirflowException, AirflowRuntimeError) as e: # We should allow retries if the task has defined it. log.exception("Task failed with exception") + log.info("::group::Post Execute") msg, state = _apply_retry_policy_or_default(ti, e, log, context) error = e except AirflowTaskTerminated as e: @@ -1377,6 +1383,7 @@ def _on_term(signum, frame): # updated already be another UI API. So, these exceptions should ideally never be thrown. # If these are thrown, we should mark the TI state as failed. log.exception("Task failed with exception") + log.info("::group::Post Execute") ti.end_date = datetime.now(tz=timezone.utc) msg = TaskState( state=TaskInstanceState.FAILED, @@ -1388,10 +1395,12 @@ def _on_term(signum, frame): except SystemExit as e: # SystemExit needs to be retried if they are eligible. log.error("Task exited", exit_code=e.code) + log.info("::group::Post Execute") msg, state = _apply_retry_policy_or_default(ti, e, log, context) error = e except BaseException as e: log.exception("Task failed with exception") + log.info("::group::Post Execute") msg, state = _apply_retry_policy_or_default(ti, e, log, context) error = e finally: