Skip to content

Commit

Permalink
Test patch_already_checked directly (and separate from the other tests)
Browse files Browse the repository at this point in the history
The "already_checked" label was added to a few "expected pods" recently, when we changed it to patch even in the case of a successful pod.

Since we are changing the "patch" code to patch with the latest read on the pod that we have (i.e. using the `remote_pod` variable), and no longer the pod object stored on `k.pod`, the label no longer shows up in those tests (that's because in k.pod isn't actually a read of the remote pod, but just happens to get mutated in the patch function before it is used to actually patch the pod).

Further, since the `remote_pod` is a local variable, we can't observe it in tests.  So we have to read the pod using k8s api. _But_, our "find pod" function excludes "already checked" pods!  So we have to make this configurable.

So, now we have a proper integration test for the "already_checked" behavior (there was already a unit test).
  • Loading branch information
dstandish committed May 16, 2022
1 parent ba948fd commit b4f12f8
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 7 deletions.
12 changes: 8 additions & 4 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ def client(self) -> CoreV1Api:
kwargs.update(in_cluster=self.in_cluster)
return kube_client.get_kube_client(**kwargs)

def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
def find_pod(self, namespace, context, *, exclude_checked=True) -> Optional[k8s.V1Pod]:
"""Returns an already-running pod for this task instance if one exists."""
label_selector = self._build_find_pod_label_selector(context)
label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked)
pod_list = self.client.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
Expand Down Expand Up @@ -436,10 +436,14 @@ def process_pod_deletion(self, pod):
else:
self.log.info("skipping deleting pod: %s", pod.metadata.name)

def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str:
def _build_find_pod_label_selector(self, context: Optional[dict] = None, *, exclude_checked=True) -> str:
labels = self._get_ti_pod_labels(context, include_try_number=False)
label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())]
return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True,!airflow-worker'
labels_value = ','.join(label_strings)
if exclude_checked:
labels_value += f',{self.POD_CHECKED_KEY}!=True'
labels_value += ',!airflow-worker'
return labels_value

def _set_name(self, name):
if name is None:
Expand Down
55 changes: 52 additions & 3 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ def test_config_path_move(self):
context = create_context(k)
k.execute(context)
expected_pod = copy(self.expected_pod)
expected_pod['metadata']['labels']['already_checked'] = 'True'
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
assert expected_pod == actual_pod
assert actual_pod == expected_pod

def test_working_pod(self):
k = KubernetesPodOperator(
Expand Down Expand Up @@ -210,6 +209,57 @@ def test_delete_operator_pod(self):
assert self.expected_pod['spec'] == actual_pod['spec']
assert self.expected_pod['metadata']['labels'] == actual_pod['metadata']['labels']

def test_already_checked_on_success(self):
"""
When ``is_delete_operator_pod=False``, pod should have 'already_checked'
label, whether pod is successful or not.
"""
pod_name = "test-" + str(random.randint(0, 1000000))
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name=pod_name,
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=False,
)
context = create_context(k)
k.execute(context)
actual_pod = k.find_pod('default', context, exclude_checked=False)
actual_pod = self.api_client.sanitize_for_serialization(actual_pod)
assert actual_pod['metadata']['labels']['already_checked'] == 'True'

def test_already_checked_on_failure(self):
"""
When ``is_delete_operator_pod=False``, pod should have 'already_checked'
label, whether pod is successful or not.
"""
pod_name = "test-" + str(random.randint(0, 1000000))
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["lalala"],
labels={"foo": "bar"},
name=pod_name,
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=False,
)
context = create_context(k)
with pytest.raises(AirflowException):
k.execute(context)
actual_pod = k.find_pod('default', context, exclude_checked=False)
actual_pod = self.api_client.sanitize_for_serialization(actual_pod)
status = next(iter(filter(lambda x: x['name'] == 'base', actual_pod['status']['containerStatuses'])))
assert status['state']['terminated']['reason'] == 'Error'
assert actual_pod['metadata']['labels']['already_checked'] == 'True'

def test_pod_hostnetwork(self):
k = KubernetesPodOperator(
namespace='default',
Expand Down Expand Up @@ -763,7 +813,6 @@ def test_full_pod_spec(self):
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
'already_checked': 'True',
}
assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name", value="value")]
assert result == {"hello": "world"}
Expand Down

0 comments on commit b4f12f8

Please sign in to comment.