From 501d5ad15e0e33a3ba7db7f4aaed45d8cc7e4dcf Mon Sep 17 00:00:00 2001 From: pratiksha badheka Date: Wed, 20 May 2026 22:38:12 -0700 Subject: [PATCH] fix key argument type of execute_async function --- .../providers/amazon/aws/executors/batch/batch_executor.py | 5 ++--- .../cncf/kubernetes/executors/test_kubernetes_executor.py | 7 ++++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py index 835dfe2e1c4d7..811e279da90ba 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py @@ -54,6 +54,7 @@ AllBatchConfigKeys, BatchJob, BatchJobCollection, + BatchJobWorkloadKey, BatchQueuedJob, ) from airflow.utils.state import State @@ -402,9 +403,7 @@ def _describe_jobs(self, job_ids) -> list[BatchJob]: all_jobs.extend(describe_workloads_response["jobs"]) return all_jobs - def execute_async( - self, key: TaskInstanceKey | str, command: CommandType, queue=None, executor_config=None - ): + def execute_async(self, key: BatchJobWorkloadKey, command: CommandType, queue=None, executor_config=None): """Save the workload to be executed in the next sync using Boto3's RunTask API.""" if executor_config and "command" in executor_config: raise ValueError('Executor Config should never override "command"') diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 9269c4debd05f..861737b5d0ab0 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -808,7 +808,12 @@ def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job try: assert executor.event_buffer == {} executor.execute_async( - key=("dag", "task", timezone.utcnow(), 1), + key=TaskInstanceKey( + dag_id="dag", + task_id="task", + run_id="run", + try_number=1, + ), queue=None, command=["airflow", "tasks", "run", "true", "some_parameter"], executor_config=k8s.V1Pod(