Skip to content

Commit

Permalink
[AIRFLOW-5581] Cleanly shutdown KubernetesJobWatcher for safe Schedul…
Browse files Browse the repository at this point in the history
…er shutdown on SIGTERM (#6237)
  • Loading branch information
kpathak13 authored and ashb committed Oct 11, 2019
1 parent c7a5f1f commit 12f916a
Showing 1 changed file with 55 additions and 0 deletions.
55 changes: 55 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,28 @@ def _labels_to_key(self, labels):
)
return None

def _flush_watcher_queue(self):
self.log.debug('Executor shutting down, watcher_queue approx. size=%d', self.watcher_queue.qsize())
while True:
try:
task = self.watcher_queue.get_nowait()
# Ignoring it since it can only have either FAILED or SUCCEEDED pods
self.log.warning('Executor shutting down, IGNORING watcher task=%s', task)
self.watcher_queue.task_done()
except Empty:
break

def terminate(self):
"""Termninates the watcher."""
self.log.debug("Terminating kube_watcher...")
self.kube_watcher.terminate()
self.kube_watcher.join()
self.log.debug("kube_watcher=%s", self.kube_watcher)
self.log.debug("Flushing watcher_queue...")
self._flush_watcher_queue()
# Queue should be empty...
self.watcher_queue.join()
self.log.debug("Shutting down manager...")
self._manager.shutdown()


Expand Down Expand Up @@ -768,9 +787,45 @@ def _change_state(self, key, state, pod_id: str) -> None:
self.log.debug('Could not find key: %s', str(key))
self.event_buffer[key] = state

def _flush_task_queue(self):
self.log.debug('Executor shutting down, task_queue approximate size=%d', self.task_queue.qsize())
while True:
try:
task = self.task_queue.get_nowait()
# This is a new task to run thus ok to ignore.
self.log.warning('Executor shutting down, will NOT run task=%s', task)
self.task_queue.task_done()
except Empty:
break

def _flush_result_queue(self):
self.log.debug('Executor shutting down, result_queue approximate size=%d', self.result_queue.qsize())
while True: # pylint: disable=too-many-nested-blocks
try:
results = self.result_queue.get_nowait()
self.log.warning('Executor shutting down, flushing results=%s', results)
try:
key, state, pod_id, resource_version = results
self.log.info('Changing state of %s to %s : resource_version=%d', results, state,
resource_version)
try:
self._change_state(key, state, pod_id)
except Exception as e: # pylint: disable=broad-except
self.log.exception('Ignoring exception: %s when attempting to change state of %s '
'to %s.', e, results, state)
finally:
self.result_queue.task_done()
except Empty:
break

def end(self):
"""Called when the executor shuts down"""
self.log.info('Shutting down Kubernetes executor')
self.log.debug('Flushing task_queue...')
self._flush_task_queue()
self.log.debug('Flushing result_queue...')
self._flush_result_queue()
# Both queues should be empty...
self.task_queue.join()
self.result_queue.join()
if self.kube_scheduler:
Expand Down

0 comments on commit 12f916a

Please sign in to comment.