Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def execute_sync(self, context: Context):
if self.get_logs:
self.pod_manager.fetch_requested_container_logs(
pod=self.pod,
container_logs=self.container_logs,
containers=self.container_logs,
follow_logs=True,
)
if not self.get_logs or (
Expand Down
69 changes: 43 additions & 26 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ def fetch_container_logs(
Between when the pod starts and logs being available, there might be a delay due to CSR not approved
and signed yet. In such situation, ApiException is thrown. This is why we are retrying on this
specific exception.

:meta private:
"""

@tenacity.retry(
Expand Down Expand Up @@ -476,53 +478,68 @@ def consume_logs(
else: # follow requested, but container is done
break

def fetch_requested_container_logs(
self, pod: V1Pod, container_logs: Iterable[str] | str | Literal[True], follow_logs=False
) -> None:
"""
Follow the logs of containers in the specified pod and publish it to airflow logging.

Returns when all the containers exit.
"""
all_containers = self.get_container_names(pod)
if all_containers:
if isinstance(container_logs, str):
def _reconcile_requested_log_containers(
self, requested: Iterable[str] | str | bool, actual: list[str], pod_name
) -> list[str]:
"""Return actual containers based on requested."""
containers_to_log = []
if actual:
if isinstance(requested, str):
# fetch logs only for requested container if only one container is provided
if container_logs in all_containers:
self.fetch_container_logs(pod=pod, container_name=container_logs, follow=follow_logs)
if requested in actual:
containers_to_log.append(requested)
else:
self.log.error(
"container %s whose logs were requested not found in the pod %s",
container_logs,
pod.metadata.name,
requested,
pod_name,
)
elif isinstance(container_logs, bool):
elif isinstance(requested, bool):
# if True is provided, get logs for all the containers
if container_logs is True:
for container_name in all_containers:
self.fetch_container_logs(pod=pod, container_name=container_name, follow=follow_logs)
if requested is True:
containers_to_log.extend(actual)
else:
self.log.error(
"False is not a valid value for container_logs",
)
else:
# if a sequence of containers are provided, iterate for every container in the pod
if isinstance(container_logs, Iterable):
for container in container_logs:
if container in all_containers:
self.fetch_container_logs(pod=pod, container_name=container, follow=follow_logs)
if isinstance(requested, Iterable):
for container in requested:
if container in actual:
containers_to_log.append(container)
else:
self.log.error(
"Container %s whose logs were requests not found in the pod %s",
container,
pod.metadata.name,
pod_name,
)
else:
self.log.error(
"Invalid type %s specified for container names input parameter", type(container_logs)
"Invalid type %s specified for container names input parameter", type(requested)
)
else:
self.log.error("Could not retrieve containers for the pod: %s", pod.metadata.name)
self.log.error("Could not retrieve containers for the pod: %s", pod_name)
return containers_to_log

def fetch_requested_container_logs(
self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], follow_logs=False
) -> None:
"""
Follow the logs of containers in the specified pod and publish it to airflow logging.

Returns when all the containers exit.

:meta private:
"""
all_containers = self.get_container_names(pod)
containers_to_log = self._reconcile_requested_log_containers(
requested=containers,
actual=all_containers,
pod_name=pod.metadata.name,
)
for c in containers_to_log:
self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs)

def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
"""
Expand Down
4 changes: 1 addition & 3 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,9 +1371,7 @@ def test_get_logs_but_not_for_base_container(
pod = self.run_pod(k)

# check that the base container is not included in the logs
mock_fetch_log.assert_called_once_with(
pod=pod, container_logs=["some_init_container"], follow_logs=True
)
mock_fetch_log.assert_called_once_with(pod=pod, containers=["some_init_container"], follow_logs=True)
# check that KPO waits for the base container to complete before proceeding to extract XCom
mock_await_container_completion.assert_called_once_with(pod=pod, container_name="base")
# check that we wait for the xcom sidecar to start before extracting XCom
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def test_fetch_requested_container_logs(self, container_is_running, container_lo
)

self.pod_manager.fetch_requested_container_logs(
pod=mock_pod, container_logs=container_logs, follow_logs=follow
pod=mock_pod, containers=container_logs, follow_logs=follow
)
calls = {tuple(x[1].values()) for x in container_is_running.call_args_list}
pod = self.pod_manager.read_pod.return_value
Expand Down