Skip to content

Commit

Permalink
Fix waiting the base container when reading the logs of other contain…
Browse files Browse the repository at this point in the history
…ers (#33092)

* Fix waiting the base container when reading the logs of other containers

* add condition for await_container_completion to fix unit tests
  • Loading branch information
hussein-awala committed Aug 4, 2023
1 parent 900ad8c commit d31c775
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
4 changes: 3 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Expand Up @@ -590,7 +590,9 @@ def execute_sync(self, context: Context):
container_logs=self.container_logs,
follow_logs=True,
)
else:
if not self.get_logs or (
self.container_logs is not True and self.base_container_name not in self.container_logs
):
self.pod_manager.await_container_completion(
pod=self.pod, container_name=self.base_container_name
)
Expand Down
40 changes: 40 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Expand Up @@ -1232,6 +1232,46 @@ def test_task_skip_when_pod_exit_with_certain_code(
with pytest.raises(expected_exc):
self.run_pod(k)

@patch(f"{POD_MANAGER_CLASS}.extract_xcom")
@patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
@patch(f"{POD_MANAGER_CLASS}.await_container_completion")
@patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs")
@patch(HOOK_CLASS)
def test_get_logs_but_not_for_base_container(
self,
hook_mock,
mock_fetch_log,
mock_await_container_completion,
mock_await_xcom_sidecar,
mock_extract_xcom,
):
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
do_xcom_push=True,
container_logs=["some_init_container"],
get_logs=True,
)
mock_extract_xcom.return_value = "{}"
remote_pod_mock = MagicMock()
remote_pod_mock.status.phase = "Succeeded"
self.await_pod_mock.return_value = remote_pod_mock
pod = self.run_pod(k)

# check that the base container is not included in the logs
mock_fetch_log.assert_called_once_with(
pod=pod, container_logs=["some_init_container"], follow_logs=True
)
# check that KPO waits for the base container to complete before proceeding to extract XCom
mock_await_container_completion.assert_called_once_with(pod=pod, container_name="base")
# check that we wait for the xcom sidecar to start before extracting XCom
mock_await_xcom_sidecar.assert_called_once_with(pod=pod)


class TestSuppress:
def test__suppress(self, caplog):
Expand Down

0 comments on commit d31c775

Please sign in to comment.