Under which category would you file this issue?
Task SDK
Apache Airflow version
3.2.2
What happened and how to reproduce it?
In an Airflow multi-team setup (AIRFLOW__CORE__MULTI_TEAM = 'true'), the system is unable to access the team_a_retry_call_back connection during retry callback execution. However, the team_a_success_call_back and team_a_failure_call_back connections work correctly during their respective success and failure callbacks. This issue occurs because retry tasks are moved from the TaskInstance table to the TaskInstanceHistory table during retries, and the current team_name resolution logic (get_team_name_dep) fails to query the TaskInstanceHistory table.
As a result, retry callbacks cannot resolve the team_name, leading to failures when accessing team-specific connections and configurations.
Steps to Reproduce:
Prerequisites:
- Ensure
AIRFLOW__CORE__MULTI_TEAM is set to 'true' in the Airflow configuration. - Define a
dag_bundle_config_list to segregate DAGs by teams:dag_bundle_config_list = [
{
"name": "team_a_dags",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/opt/airflow/dags/team_a"},
"team_name": "team_a",
},
{
"name": "team_b_dags",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/opt/airflow/dags/team_b"},
"team_name": "team_b",
},
{
"name": "shared_dags",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/opt/airflow/dags/shared"},
},
]
Connection Setup:
Ensure the following connections are scoped to team_a:
Connection ID | Type | Host | Description | Team Name
-- | -- | -- | -- | --
team_a_failure_call_back | http | team_a_failure_call_back | Failure callback connection | team_a
team_a_retry_call_back | http | team_a_retry_call_back | Retry callback connection | team_a
team_a_success_call_back | http | (empty) | Success callback connection | team_a
Steps:
- Create a DAG for
team_a with the following tasks:- Success Task (
success_task): Uses a success callback with the team_a_success_call_back connection. - Failure Task (
failure_task): Uses a failure callback with the team_a_failure_call_back connection. - Retry Task (
retry_task): Uses a retry callback with the team_a_retry_call_back connection.
- Execute the DAG:
- Allow the
success_task to succeed and trigger its success callback. - Force the
failure_task to fail immediately and trigger its failure callback. - Allow the
retry_task to fail and retry, triggering its retry callback.
- Observe the behavior during retries for
retry_task.
Observed Behavior:
What you think should happen instead?
Expected Behavior:
The team_name should be resolved correctly during retries, even if the task has been moved to the TaskInstanceHistory table.
The retry callback should successfully access the team_a_retry_call_back connection.
Root Cause:
The get_team_name_dep logic only queries the TaskInstance table to resolve the team_name. When tasks are moved to the TaskInstanceHistory table during retries, they become inaccessible to the current query, resulting in a failure to resolve the team_name and access the necessary connections.
Suggested Fix:
Update get_team_name_dep Logic:
Modify the get_team_name_dep function in airflow-core/src/airflow/api_fastapi/execution_api/security.py to query both the TaskInstance and TaskInstanceHistory tables. If a team_name is not found in the TaskInstance table, the function should fall back to the TaskInstanceHistory table.
Code Update:
from sqlalchemy import union_all
def get_team_name_dep(token):
# Query the TaskInstance table
stmt = (
select(Team.name)
.select_from(TaskInstance)
.join(DagModel, DagModel.dag_id == TaskInstance.dag_id)
.join(DagBundleModel, DagBundleModel.name == DagModel.bundle_name)
.join(DagBundleModel.teams)
.where(TaskInstance.id == token.id)
)
# Query the TaskInstanceHistory table
stmt_history = (
select(Team.name)
.select_from(TaskInstanceHistory)
.join(DagModel, DagModel.dag_id == TaskInstanceHistory.dag_id)
.join(DagBundleModel, DagBundleModel.name == DagModel.bundle_name)
.join(DagBundleModel.teams)
.where(TaskInstanceHistory.task_instance_id == token.id)
)
# Combine both queries with UNION ALL
combined_stmt = union_all(stmt, stmt_history)
# Return the first valid result
return select(Team.name).from_statement(combined_stmt).limit(1)
Operating System
No response
Deployment
Docker-Compose
Apache Airflow Provider(s)
No response
Versions of Apache Airflow Providers
No response
Official Helm Chart version
Not Applicable
Kubernetes Version
No response
Helm Chart configuration
No response
Docker Image customizations
NA
Anything else?
No response
Are you willing to submit PR?
Code of Conduct
Under which category would you file this issue?
Task SDK
Apache Airflow version
3.2.2
What happened and how to reproduce it?
In an Airflow multi-team setup (AIRFLOW__CORE__MULTI_TEAM = 'true'), the system is unable to access the team_a_retry_call_back connection during retry callback execution. However, the team_a_success_call_back and team_a_failure_call_back connections work correctly during their respective success and failure callbacks. This issue occurs because retry tasks are moved from the TaskInstance table to the TaskInstanceHistory table during retries, and the current team_name resolution logic (get_team_name_dep) fails to query the TaskInstanceHistory table.
As a result, retry callbacks cannot resolve the team_name, leading to failures when accessing team-specific connections and configurations.
Steps to Reproduce:
Prerequisites:
AIRFLOW__CORE__MULTI_TEAMis set to'true'in the Airflow configuration.dag_bundle_config_listto segregate DAGs by teams:Connection Setup:
Ensure the following connections are scoped to
Connection ID | Type | Host | Description | Team Name -- | -- | -- | -- | -- team_a_failure_call_back | http | team_a_failure_call_back | Failure callback connection | team_a team_a_retry_call_back | http | team_a_retry_call_back | Retry callback connection | team_a team_a_success_call_back | http | (empty) | Success callback connection | team_ateam_a:Steps:
team_awith the following tasks:success_task): Uses a success callback with theteam_a_success_call_backconnection.failure_task): Uses a failure callback with theteam_a_failure_call_backconnection.retry_task): Uses a retry callback with theteam_a_retry_call_backconnection.success_taskto succeed and trigger its success callback.failure_taskto fail immediately and trigger its failure callback.retry_taskto fail and retry, triggering its retry callback.retry_task.Observed Behavior:
team_a_success_call_backconnection.team_a_failure_call_backconnection.get_team_name_deplogic fails to resolve theteam_namefor tasks moved to theTaskInstanceHistorytable during retries.What you think should happen instead?
Expected Behavior:
The team_name should be resolved correctly during retries, even if the task has been moved to the TaskInstanceHistory table.
The retry callback should successfully access the team_a_retry_call_back connection.
Root Cause:
The get_team_name_dep logic only queries the TaskInstance table to resolve the team_name. When tasks are moved to the TaskInstanceHistory table during retries, they become inaccessible to the current query, resulting in a failure to resolve the team_name and access the necessary connections.
Suggested Fix:
Update get_team_name_dep Logic:
Modify the get_team_name_dep function in airflow-core/src/airflow/api_fastapi/execution_api/security.py to query both the TaskInstance and TaskInstanceHistory tables. If a team_name is not found in the TaskInstance table, the function should fall back to the TaskInstanceHistory table.
Code Update:
from sqlalchemy import union_all
def get_team_name_dep(token):
# Query the TaskInstance table
stmt = (
select(Team.name)
.select_from(TaskInstance)
.join(DagModel, DagModel.dag_id == TaskInstance.dag_id)
.join(DagBundleModel, DagBundleModel.name == DagModel.bundle_name)
.join(DagBundleModel.teams)
.where(TaskInstance.id == token.id)
)
Operating System
No response
Deployment
Docker-Compose
Apache Airflow Provider(s)
No response
Versions of Apache Airflow Providers
No response
Official Helm Chart version
Not Applicable
Kubernetes Version
No response
Helm Chart configuration
No response
Docker Image customizations
NA
Anything else?
No response
Are you willing to submit PR?
Code of Conduct