Skip to content
Permalink
Browse files
Prevent KubernetesJobWatcher getting stuck on resource too old (#23521)
* Prevent KubernetesJobWatcher getting stuck on resource too old

If the watch fails because "resource too old" the
KubernetesJobWatcher should not retry with the same resource version
as that will end up in loop where there is no progress.

* Reset ResourceVersion().resource_version to 0
  • Loading branch information
ecerulm committed May 11, 2022
1 parent cfa95af commit dee05b2ebca6ab66f1b447837e11fe204f98b2df
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
@@ -109,6 +109,8 @@ def run(self) -> None:
time.sleep(1)
except Exception:
self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
self.resource_version = "0"
ResourceVersion().resource_version = "0"
raise
else:
self.log.warning(
@@ -288,6 +290,7 @@ def _health_check_kube_watcher(self):
self.log.error(
'Error while health checking kube watcher process. Process died for unknown reasons'
)
ResourceVersion().resource_version = "0"
self.kube_watcher = self._make_kube_watcher()

def run_next(self, next_job: KubernetesJobType) -> None:
@@ -39,6 +39,7 @@
AirflowKubernetesScheduler,
KubernetesExecutor,
KubernetesJobWatcher,
ResourceVersion,
create_pod_id,
get_base_pod_from_template,
)
@@ -957,3 +958,36 @@ def test_process_error_event_for_raise_if_not_410(self):
f"Kubernetes failure for {raw_object['reason']} "
f"with code {raw_object['code']} and message: {raw_object['message']}"
)

def test_recover_from_resource_too_old(self):
# too old resource
mock_underscore_run = mock.MagicMock()

def effect():
yield '500'
while True:
yield Exception('sentinel')

mock_underscore_run.side_effect = effect()

self.watcher._run = mock_underscore_run

with mock.patch('airflow.executors.kubernetes_executor.get_kube_client'):
try:
# self.watcher._run() is mocked and return "500" as last resource_version
self.watcher.run()
except Exception as e:
assert e.args == ('sentinel',)

# both resource_version should be 0 after _run raises and exception
assert self.watcher.resource_version == '0'
assert ResourceVersion().resource_version == '0'

# check that in the next run, _run is invoked with resource_version = 0
mock_underscore_run.reset_mock()
try:
self.watcher.run()
except Exception as e:
assert e.args == ('sentinel',)

mock_underscore_run.assert_called_once_with(mock.ANY, '0', mock.ANY, mock.ANY)

0 comments on commit dee05b2

Please sign in to comment.