Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6885] Delete worker on success #7507

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1733,7 +1733,14 @@
default: "IfNotPresent"
- name: delete_worker_pods
description: |
If True (default), worker pods will be deleted upon termination
If True, all worker pods will be deleted upon termination
version_added: ~
type: string
example: ~
default: "True"
- name: delete_worker_pods_on_success
description: |
If True (default), worker pods will be deleted only on task success
version_added: ~
type: string
example: ~
Expand Down
5 changes: 4 additions & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,12 @@ worker_container_tag =
# The imagePullPolicy of the Kubernetes Image for the Worker to Run
worker_container_image_pull_policy = IfNotPresent

# If True (default), worker pods will be deleted upon termination
# If True, all worker pods will be deleted upon termination
delete_worker_pods = True

# If True (default), worker pods will be deleted only on task success
delete_worker_pods_on_success = True

# Number of Kubernetes Worker Pod creation calls per scheduler loop.
# Note that the current default of "1" will only launch a single pod
# per-heartbeat. It is HIGHLY recommended that users increase this
Expand Down
8 changes: 7 additions & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def __init__(self): # pylint: disable=too-many-statements
self.kube_labels = configuration_dict.get('kubernetes_labels', {})
self.delete_worker_pods = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods')
self.delete_worker_pods_on_success = conf.getboolean(
self.kubernetes_section, 'delete_worker_pods_on_success')
self.worker_pods_creation_batch_size = conf.getint(
self.kubernetes_section, 'worker_pods_creation_batch_size')
self.worker_service_account_name = conf.get(
Expand Down Expand Up @@ -879,7 +881,11 @@ def _change_state(self,
pod_id: str,
namespace: str) -> None:
if state != State.RUNNING:
if self.kube_config.delete_worker_pods:
if self.kube_config.delete_worker_pods_on_success and state is State.SUCCESS:
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
self.kube_scheduler.delete_pod(pod_id, namespace)
elif self.kube_config.delete_worker_pods:
dimberman marked this conversation as resolved.
Show resolved Hide resolved
if not self.kube_scheduler:
raise AirflowException(NOT_STARTED_MESSAGE)
dimberman marked this conversation as resolved.
Show resolved Hide resolved
self.kube_scheduler.delete_pod(pod_id, namespace)
Expand Down
27 changes: 25 additions & 2 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,21 @@ def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
def test_change_state_failed_no_deletion(
self,
mock_delete_pod,
mock_get_kube_client,
mock_kubernetes_job_watcher
):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_success = True
executor.start()
test_time = timezone.utcnow()
key = ('dag_id', 'task_id', test_time, 'try_number3')
executor._change_state(key, State.FAILED, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_called_once_with('pod_id', 'default')
mock_delete_pod.assert_not_called()
dimberman marked this conversation as resolved.
Show resolved Hide resolved
# pylint: enable=unused-argument

@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
Expand All @@ -274,12 +281,28 @@ def test_change_state_skip_pod_deletion(self, mock_delete_pod, mock_get_kube_cli
test_time = timezone.utcnow()
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_success = False

executor.start()
key = ('dag_id', 'task_id', test_time, 'try_number2')
executor._change_state(key, State.SUCCESS, 'pod_id', 'default')
self.assertTrue(executor.event_buffer[key] == State.SUCCESS)
mock_delete_pod.assert_not_called()

@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod')
def test_change_state_failed_pod_deletion(self, mock_delete_pod, mock_get_kube_client,
mock_kubernetes_job_watcher):
executor = KubernetesExecutor()
executor.kube_config.delete_worker_pods_on_success = True

executor.start()
key = ('dag_id', 'task_id', 'ex_time', 'try_number2')
executor._change_state(key, State.FAILED, 'pod_id', 'test-namespace')
self.assertTrue(executor.event_buffer[key] == State.FAILED)
mock_delete_pod.assert_called_once_with('pod_id', 'test-namespace')


if __name__ == '__main__':
unittest.main()