diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 9b9de71681cf6..5faf1563fe407 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -133,7 +133,7 @@ def _run( for key, value in kube_config.kube_client_request_args.items(): kwargs[key] = value - last_resource_version: Optional[str] = None + last_resource_version: str = "0" if self.multi_namespace_mode: list_worker_pods = functools.partial( watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs @@ -167,7 +167,9 @@ def _run( resource_version=task.metadata.resource_version, event=event, ) - last_resource_version = task.metadata.resource_version + task_resource_version = task.metadata.resource_version + if task_resource_version: + last_resource_version = str(max(int(last_resource_version), int(task_resource_version))) return last_resource_version diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index a677fe598b47f..6b9105f31c1ad 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -866,7 +866,7 @@ def setUp(self): ) self.events = [] - def _run(self): + def _run(self, assert_resource_version=True): with mock.patch('airflow.executors.kubernetes_executor.watch') as mock_watch: mock_watch.Watch.return_value.stream.return_value = self.events latest_resource_version = self.watcher._run( @@ -875,7 +875,9 @@ def _run(self): self.watcher.scheduler_job_id, self.watcher.kube_config, ) - assert self.pod.metadata.resource_version == latest_resource_version + if assert_resource_version: + assert self.pod.metadata.resource_version == latest_resource_version + return latest_resource_version def assert_watcher_queue_called_once_with_state(self, state): self.watcher.watcher_queue.put.assert_called_once_with( @@ -957,3 +959,29 @@ def test_process_error_event_for_raise_if_not_410(self): f"Kubernetes failure for {raw_object['reason']} " f"with code {raw_object['code']} and message: {raw_object['message']}" ) + + def test_last_resource_version_even_if_watch_unsorted(self): + pod1 = k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + name="pod1", + annotations={"airflow-worker": "bar", **self.core_annotations}, + namespace="airflow", + resource_version="900", + ), + status=k8s.V1PodStatus(phase="Running"), + ) + self.events.append({"type": 'MODIFIED', "object": pod1}) + + pod2 = k8s.V1Pod( + metadata=k8s.V1ObjectMeta( + name="pod2", + annotations={"airflow-worker": "bar", **self.core_annotations}, + namespace="airflow", + resource_version="800", + ), + status=k8s.V1PodStatus(phase="Running"), + ) + self.events.append({"type": 'MODIFIED', "object": pod2}) + + resource_version = self._run(False) + assert resource_version == "900"