Skip to content

Commit

Permalink
Make grace_period_seconds option on K8sPodOperator (#10727)
Browse files Browse the repository at this point in the history
* Make grace_period_seconds option on K8sPodOperator

This PR allows users to choose whether they want to gracefully kill
pods when they delete tasks in the UI or if they would like to
immediately kill them.

* Update airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
  • Loading branch information
dimberman and kaxil committed Sep 4, 2020
1 parent 3f7831c commit 90c1505
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
10 changes: 9 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:type pod_template_file: str
:param priority_class_name: priority class name for the launched Pod
:type priority_class_name: str
:param termination_grace_period: Termination grace period if task killed in UI,
defaults to kubernetes default
:type termination_grace_period: int
"""
template_fields: Iterable[str] = (
'image', 'cmds', 'arguments', 'env_vars', 'config_file', 'pod_template_file')
Expand Down Expand Up @@ -191,6 +194,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
do_xcom_push: bool = False,
pod_template_file: Optional[str] = None,
priority_class_name: Optional[str] = None,
termination_grace_period: Optional[int] = None,
**kwargs):
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
Expand Down Expand Up @@ -235,6 +239,7 @@ def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
self.priority_class_name = priority_class_name
self.pod_template_file = pod_template_file
self.name = self._set_name(name)
self.termination_grace_period = termination_grace_period
self.client = None

@staticmethod
Expand Down Expand Up @@ -450,4 +455,7 @@ def on_kill(self) -> None:
pod: k8s.V1Pod = self.pod
namespace = pod.metadata.namespace
name = pod.metadata.name
self.client.delete_namespaced_pod(name=name, namespace=namespace, grace_period_seconds=0)
kwargs = {}
if self.termination_grace_period is not None:
kwargs = {"grace_period_seconds": self.termination_grace_period}
self.client.delete_namespaced_pod(name=name, namespace=namespace, **kwargs)
4 changes: 2 additions & 2 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,7 @@ def test_on_kill(self,
task_id=name,
in_cluster=False,
do_xcom_push=False,
termination_grace_period=0,
)
context = create_context(k)
monitor_mock.return_value = (State.SUCCESS, None)
Expand All @@ -903,7 +904,6 @@ def test_on_kill(self,
self.assertEqual(pod.status.phase, "Running")
k.on_kill()
with self.assertRaises(ApiException):
# pod should be deleted
client.read_namespaced_pod(name=name, namespace=namespace)
pod = client.read_namespaced_pod(name=name, namespace=namespace)

# pylint: enable=unused-argument

0 comments on commit 90c1505

Please sign in to comment.