Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use "remote" pod when patching KPO pod as "checked" #23676

Merged
merged 3 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ def client(self) -> CoreV1Api:
kwargs.update(in_cluster=self.in_cluster)
return kube_client.get_kube_client(**kwargs)

def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
def find_pod(self, namespace, context, *, exclude_checked=True) -> Optional[k8s.V1Pod]:
"""Returns an already-running pod for this task instance if one exists."""
label_selector = self._build_find_pod_label_selector(context)
label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked)
pod_list = self.client.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
Expand Down Expand Up @@ -412,7 +412,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
if not self.is_delete_operator_pod:
with _suppress(Exception):
self.patch_already_checked(pod)
self.patch_already_checked(remote_pod)
if pod_phase != PodPhase.SUCCEEDED:
if self.log_events_on_failure:
with _suppress(Exception):
Expand All @@ -436,10 +436,14 @@ def process_pod_deletion(self, pod):
else:
self.log.info("skipping deleting pod: %s", pod.metadata.name)

def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str:
def _build_find_pod_label_selector(self, context: Optional[dict] = None, *, exclude_checked=True) -> str:
labels = self._get_ti_pod_labels(context, include_try_number=False)
label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())]
return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True,!airflow-worker'
labels_value = ','.join(label_strings)
if exclude_checked:
labels_value += f',{self.POD_CHECKED_KEY}!=True'
labels_value += ',!airflow-worker'
return labels_value

def _set_name(self, name):
if name is None:
Expand Down
55 changes: 52 additions & 3 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ def test_config_path_move(self):
context = create_context(k)
k.execute(context)
expected_pod = copy(self.expected_pod)
expected_pod['metadata']['labels']['already_checked'] = 'True'
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
assert expected_pod == actual_pod
assert actual_pod == expected_pod

def test_working_pod(self):
k = KubernetesPodOperator(
Expand Down Expand Up @@ -210,6 +209,57 @@ def test_delete_operator_pod(self):
assert self.expected_pod['spec'] == actual_pod['spec']
assert self.expected_pod['metadata']['labels'] == actual_pod['metadata']['labels']

def test_already_checked_on_success(self):
"""
When ``is_delete_operator_pod=False``, pod should have 'already_checked'
label, whether pod is successful or not.
"""
pod_name = "test-" + str(random.randint(0, 1000000))
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name=pod_name,
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=False,
)
context = create_context(k)
k.execute(context)
actual_pod = k.find_pod('default', context, exclude_checked=False)
actual_pod = self.api_client.sanitize_for_serialization(actual_pod)
assert actual_pod['metadata']['labels']['already_checked'] == 'True'

def test_already_checked_on_failure(self):
"""
When ``is_delete_operator_pod=False``, pod should have 'already_checked'
label, whether pod is successful or not.
"""
pod_name = "test-" + str(random.randint(0, 1000000))
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["lalala"],
labels={"foo": "bar"},
name=pod_name,
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=False,
)
context = create_context(k)
with pytest.raises(AirflowException):
k.execute(context)
actual_pod = k.find_pod('default', context, exclude_checked=False)
actual_pod = self.api_client.sanitize_for_serialization(actual_pod)
status = next(iter(filter(lambda x: x['name'] == 'base', actual_pod['status']['containerStatuses'])))
assert status['state']['terminated']['reason'] == 'Error'
assert actual_pod['metadata']['labels']['already_checked'] == 'True'

def test_pod_hostnetwork(self):
k = KubernetesPodOperator(
namespace='default',
Expand Down Expand Up @@ -763,7 +813,6 @@ def test_full_pod_spec(self):
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
'already_checked': 'True',
}
assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name", value="value")]
assert result == {"hello": "world"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ def test_volume_mount(self):
expected_pod['spec']['volumes'] = [
{'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 'test-volume'}}
]
expected_pod['metadata']['labels']['already_checked'] = 'True'
assert expected_pod == actual_pod

def test_run_as_user_root(self):
Expand Down