Skip to content

Commit

Permalink
[AIRFLOW-6885] Delete worker on success (#7507)
Browse files Browse the repository at this point in the history
Users now have the option to only delete worker pods when they are successful

Co-authored-by: Daniel Imberman <daniel@astronomer.io>
  • Loading branch information
dimberman and astro-sql-decorator committed Mar 19, 2020
1 parent 92ca751 commit d027b87
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
9 changes: 8 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1733,7 +1733,14 @@
default: "IfNotPresent"
- name: delete_worker_pods
description: |
If True (default), worker pods will be deleted upon termination
If True, all worker pods will be deleted upon termination
version_added: ~
type: string
example: ~
default: "True"
- name: delete_worker_pods_on_success
description: |
If True (default), worker pods will be deleted only on task success
version_added: ~
type: string
example: ~
Expand Down
5 changes: 4 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,12 @@ worker_container_tag =
# The imagePullPolicy of the Kubernetes Image for the Worker to Run
worker_container_image_pull_policy = IfNotPresent

# If True (default), worker pods will be deleted upon termination
# If True, all worker pods will be deleted upon termination
delete_worker_pods = True

# If True (default), worker pods will be deleted only on task success
delete_worker_pods_on_success = True

# Number of Kubernetes Worker Pod creation calls per scheduler loop.
# Note that the current default of "1" will only launch a single pod
# per-heartbeat. It is HIGHLY recommended that users increase this
Expand Down
8 changes: 7 additions & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def __init__(self): # pylint: disable=too-many-statements
self.kube_labels = configuration_dict.get('kubernetes_labels', {})
self.delete_worker_pods = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods')
self.delete_worker_pods_on_success = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods_on_success')
self.worker_pods_creation_batch_size = conf.getint(
self.kubernetes_section, 'worker_pods_creation_batch_size')
self.worker_service_account_name = conf.get(
Expand Down Expand Up @@ -879,7 +881,11 @@ def _change_state(self,
pod_id: str,
namespace: str) -> None:
if state != State.RUNNING:
if self.kube_config.delete_worker_pods:
if self.kube_config.delete_worker_pods_on_success and state is State.SUCCESS:
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
self.kube_scheduler.delete_pod(pod_id, namespace)
elif self.kube_config.delete_worker_pods:
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
self.kube_scheduler.delete_pod(pod_id, namespace)
Expand Down
27 changes: 25 additions & 2 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,21 @@ def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
def test_change_state_failed_no_deletion(
self,
mock_delete_pod,
mock_get_kube_client,
mock_kubernetes_job_watcher
):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_success = True
executor.start()
test_time = timezone.utcnow()
key = ('dag_id', 'task_id', test_time, 'try_number3')
executor._change_state(key, State.FAILED, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_called_once_with('pod_id', 'default')
mock_delete_pod.assert_not_called()
# pylint: enable=unused-argument

@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
Expand All @@ -274,12 +281,28 @@ def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_cli
test_time = timezone.utcnow()
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_success = False

executor.start()
key = ('dag_id', 'task_id', test_time, 'try_number2')
executor._change_state(key, State.SUCCESS, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_not_called()

@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods_on_success = True

executor.start()
key = ('dag_id', 'task_id', 'ex_time', 'try_number2')
executor._change_state(key, State.FAILED, 'pod_id', 'test-namespace')
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_called_once_with('pod_id', 'test-namespace')


if __name__ == '__main__':
unittest.main()

0 comments on commit d027b87

Please sign in to comment.