Skip to content

Commit

Permalink
Delete worker on success (apache#7507)
Browse files Browse the repository at this point in the history
Users now have the option to only delete worker pods when they are successful

Co-authored-by: Daniel Imberman <daniel@astronomer.io>
  • Loading branch information
2 people authored and Chris Fei committed Mar 5, 2021
1 parent 5838e1f commit c466532
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 14 deletions.
9 changes: 8 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1750,7 +1750,14 @@
default: "IfNotPresent"
- name: delete_worker_pods
description: |
If True (default), worker pods will be deleted upon termination
If True, all worker pods will be deleted upon termination
version_added: ~
type: string
example: ~
default: "True"
- name: delete_worker_pods_on_success
description: |
If True (default), worker pods will be deleted only on task success
version_added: ~
type: string
example: ~
Expand Down
5 changes: 4 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,12 @@ worker_container_repository =
worker_container_tag =
worker_container_image_pull_policy = IfNotPresent

# If True (default), worker pods will be deleted upon termination
# If True, all worker pods will be deleted upon termination
delete_worker_pods = True

# If True (default), worker pods will be deleted only on task success
delete_worker_pods_on_success = True

# Number of Kubernetes Worker Pod creation calls per scheduler loop
worker_pods_creation_batch_size = 1

Expand Down
10 changes: 9 additions & 1 deletion airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ def __init__(self):
self.kube_labels = configuration_dict.get('kubernetes_labels', {})
self.delete_worker_pods = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods')
self.delete_worker_pods_on_success = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods_on_success')
self.worker_pods_creation_batch_size = conf.getint(
self.kubernetes_section, 'worker_pods_creation_batch_size')
self.worker_service_account_name = conf.get(
Expand Down Expand Up @@ -891,7 +893,13 @@ def sync(self):

def _change_state(self, key, state, pod_id, namespace):
if state != State.RUNNING:
if self.kube_config.delete_worker_pods:
if self.kube_config.delete_worker_pods_on_success and state is State.SUCCESS:
if not self.kube_scheduler:
raise AirflowException("The executor should be started first!")
self.kube_scheduler.delete_pod(pod_id, namespace)
elif self.kube_config.delete_worker_pods:
if not self.kube_scheduler:
raise AirflowException("The executor should be started first!")
self.kube_scheduler.delete_pod(pod_id, namespace)
self.log.info('Deleted pod: %s in namespace %s', str(key), str(namespace))
try:
Expand Down
36 changes: 25 additions & 11 deletions tests/contrib/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1041,48 +1041,62 @@ def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_wa
executor._change_state(key, State.RUNNING, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.RUNNING)

@mock.patch('airflow.contrib.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
mock_kube_config):
def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
executor.start()
test_time = timezone.utcnow()
key = ('dag_id', 'task_id', test_time, 'try_number2')
executor._change_state(key, State.SUCCESS, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_called_with('pod_id', 'default')
mock_delete_pod.assert_called_once_with('pod_id', 'default')

@mock.patch('airflow.contrib.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher,
mock_kube_config):
def test_change_state_failed_no_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_success = True
executor.start()
key = ('dag_id', 'task_id', 'ex_time', 'try_number3')
test_time = timezone.utcnow()
key = ('dag_id', 'task_id', test_time, 'try_number3')
executor._change_state(key, State.FAILED, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_called_with('pod_id', 'default')
mock_delete_pod.assert_not_called()

@mock.patch('airflow.contrib.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher, mock_kube_config):
mock_kubernetes_job_watcher):
test_time = timezone.utcnow()
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_success = False
executor.start()
key = ('dag_id', 'task_id', test_time, 'try_number2')
executor._change_state(key, State.SUCCESS, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_not_called()

@mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods_on_success = True

executor.start()
key = ('dag_id', 'task_id', 'ex_time', 'try_number2')
executor._change_state(key, State.FAILED, 'pod_id', 'test-namespace')
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_called_once_with('pod_id', 'test-namespace')


if __name__ == '__main__':
unittest.main()

0 comments on commit c466532

Please sign in to comment.