Skip to content

Commit 60eced9

Browse files
authored
Add logging support for init containers in KubernetesPodOperator (#42498) (#43853)
This change adds an option to print logs for init containers. The get_init_containers_logs and init_container_logs functions allow displaying the logs of init containers. Fixes: #42498
1 parent 39cdc9c commit 60eced9

File tree

4 files changed

+210
-13
lines changed

4 files changed

+210
-13
lines changed

kubernetes_tests/test_kubernetes_pod_operator.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,6 +1306,98 @@ class MyK8SPodOperator(KubernetesPodOperator):
13061306
)
13071307
assert MyK8SPodOperator(task_id=str(uuid4())).base_container_name == "tomato-sauce"
13081308

1309+
def test_init_container_logs(self, mock_get_connection):
1310+
marker_from_init_container = f"{uuid4()}"
1311+
marker_from_main_container = f"{uuid4()}"
1312+
callback = MagicMock()
1313+
init_container = k8s.V1Container(
1314+
name="init-container",
1315+
image="busybox",
1316+
command=["sh", "-cx"],
1317+
args=[f"echo {marker_from_init_container}"],
1318+
)
1319+
k = KubernetesPodOperator(
1320+
namespace="default",
1321+
image="busybox",
1322+
cmds=["sh", "-cx"],
1323+
arguments=[f"echo {marker_from_main_container}"],
1324+
labels=self.labels,
1325+
task_id=str(uuid4()),
1326+
in_cluster=False,
1327+
do_xcom_push=False,
1328+
startup_timeout_seconds=60,
1329+
init_containers=[init_container],
1330+
init_container_logs=True,
1331+
callbacks=callback,
1332+
)
1333+
context = create_context(k)
1334+
k.execute(context)
1335+
1336+
calls_args = "\n".join(["".join(c.kwargs["line"]) for c in callback.progress_callback.call_args_list])
1337+
assert marker_from_init_container in calls_args
1338+
assert marker_from_main_container in calls_args
1339+
1340+
def test_init_container_logs_filtered(self, mock_get_connection):
1341+
marker_from_init_container_to_log_1 = f"{uuid4()}"
1342+
marker_from_init_container_to_log_2 = f"{uuid4()}"
1343+
marker_from_init_container_to_ignore = f"{uuid4()}"
1344+
marker_from_main_container = f"{uuid4()}"
1345+
callback = MagicMock()
1346+
init_container_to_log_1 = k8s.V1Container(
1347+
name="init-container-to-log-1",
1348+
image="busybox",
1349+
command=["sh", "-cx"],
1350+
args=[f"echo {marker_from_init_container_to_log_1}"],
1351+
)
1352+
init_container_to_log_2 = k8s.V1Container(
1353+
name="init-container-to-log-2",
1354+
image="busybox",
1355+
command=["sh", "-cx"],
1356+
args=[f"echo {marker_from_init_container_to_log_2}"],
1357+
)
1358+
init_container_to_ignore = k8s.V1Container(
1359+
name="init-container-to-ignore",
1360+
image="busybox",
1361+
command=["sh", "-cx"],
1362+
args=[f"echo {marker_from_init_container_to_ignore}"],
1363+
)
1364+
k = KubernetesPodOperator(
1365+
namespace="default",
1366+
image="busybox",
1367+
cmds=["sh", "-cx"],
1368+
arguments=[f"echo {marker_from_main_container}"],
1369+
labels=self.labels,
1370+
task_id=str(uuid4()),
1371+
in_cluster=False,
1372+
do_xcom_push=False,
1373+
startup_timeout_seconds=60,
1374+
init_containers=[
1375+
init_container_to_log_1,
1376+
init_container_to_log_2,
1377+
init_container_to_ignore,
1378+
],
1379+
init_container_logs=[
1380+
# not same order as defined in init_containers
1381+
"init-container-to-log-2",
1382+
"init-container-to-log-1",
1383+
],
1384+
callbacks=callback,
1385+
)
1386+
context = create_context(k)
1387+
k.execute(context)
1388+
1389+
calls_args = "\n".join(["".join(c.kwargs["line"]) for c in callback.progress_callback.call_args_list])
1390+
assert marker_from_init_container_to_log_1 in calls_args
1391+
assert marker_from_init_container_to_log_2 in calls_args
1392+
assert marker_from_init_container_to_ignore not in calls_args
1393+
assert marker_from_main_container in calls_args
1394+
1395+
assert (
1396+
calls_args.find(marker_from_init_container_to_log_1)
1397+
< calls_args.find(marker_from_init_container_to_log_2)
1398+
< calls_args.find(marker_from_main_container)
1399+
)
1400+
13091401

13101402
def test_hide_sensitive_field_in_templated_fields_on_error(caplog, monkeypatch):
13111403
logger = logging.getLogger("airflow.task")

providers/src/airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ class KubernetesPodOperator(BaseOperator):
155155
:param startup_timeout_seconds: timeout in seconds to startup the pod.
156156
:param startup_check_interval_seconds: interval in seconds to check if the pod has already started
157157
:param get_logs: get the stdout of the base container as logs of the tasks.
158+
:param init_container_logs: list of init containers whose logs will be published to stdout
159+
Takes a sequence of containers, a single container name or True. If True,
160+
all the containers logs are published.
158161
:param container_logs: list of containers whose logs will be published to stdout
159162
Takes a sequence of containers, a single container name or True. If True,
160163
all the containers logs are published. Works in conjunction with get_logs param.
@@ -278,6 +281,7 @@ def __init__(
278281
startup_check_interval_seconds: int = 5,
279282
get_logs: bool = True,
280283
base_container_name: str | None = None,
284+
init_container_logs: Iterable[str] | str | Literal[True] | None = None,
281285
container_logs: Iterable[str] | str | Literal[True] | None = None,
282286
image_pull_policy: str | None = None,
283287
annotations: dict | None = None,
@@ -352,6 +356,7 @@ def __init__(
352356
# Fallback to the class variable BASE_CONTAINER_NAME here instead of via default argument value
353357
# in the init method signature, to be compatible with subclasses overloading the class variable value.
354358
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
359+
self.init_container_logs = init_container_logs
355360
self.container_logs = container_logs or self.base_container_name
356361
self.image_pull_policy = image_pull_policy
357362
self.node_selector = node_selector or {}
@@ -600,6 +605,9 @@ def execute_sync(self, context: Context):
600605
self.callbacks.on_pod_creation(
601606
pod=self.remote_pod, client=self.client, mode=ExecutionMode.SYNC
602607
)
608+
609+
self.await_init_containers_completion(pod=self.pod)
610+
603611
self.await_pod_start(pod=self.pod)
604612
if self.callbacks:
605613
self.callbacks.on_pod_starting(
@@ -635,6 +643,22 @@ def execute_sync(self, context: Context):
635643
if self.do_xcom_push:
636644
return result
637645

646+
@tenacity.retry(
647+
wait=tenacity.wait_exponential(max=15),
648+
retry=tenacity.retry_if_exception_type(PodCredentialsExpiredFailure),
649+
reraise=True,
650+
)
651+
def await_init_containers_completion(self, pod: k8s.V1Pod):
652+
try:
653+
if self.init_container_logs:
654+
self.pod_manager.fetch_requested_init_container_logs(
655+
pod=pod,
656+
init_containers=self.init_container_logs,
657+
follow_logs=True,
658+
)
659+
except kubernetes.client.exceptions.ApiException as exc:
660+
self._handle_api_exception(exc, pod)
661+
638662
@tenacity.retry(
639663
wait=tenacity.wait_exponential(max=15),
640664
retry=tenacity.retry_if_exception_type(PodCredentialsExpiredFailure),
@@ -653,16 +677,21 @@ def await_pod_completion(self, pod: k8s.V1Pod):
653677
):
654678
self.pod_manager.await_container_completion(pod=pod, container_name=self.base_container_name)
655679
except kubernetes.client.exceptions.ApiException as exc:
656-
if exc.status and str(exc.status) == "401":
657-
self.log.warning(
658-
"Failed to check container status due to permission error. Refreshing credentials and retrying."
659-
)
660-
self._refresh_cached_properties()
661-
self.pod_manager.read_pod(
662-
pod=pod
663-
) # attempt using refreshed credentials, raises if still invalid
664-
raise PodCredentialsExpiredFailure("Kubernetes credentials expired, retrying after refresh.")
665-
raise exc
680+
self._handle_api_exception(exc, pod)
681+
682+
def _handle_api_exception(
683+
self,
684+
exc: kubernetes.client.exceptions.ApiException,
685+
pod: k8s.V1Pod,
686+
):
687+
if exc.status and str(exc.status) == "401":
688+
self.log.warning(
689+
"Failed to check container status due to permission error. Refreshing credentials and retrying."
690+
)
691+
self._refresh_cached_properties()
692+
self.pod_manager.read_pod(pod=pod) # attempt using refreshed credentials, raises if still invalid
693+
raise PodCredentialsExpiredFailure("Kubernetes credentials expired, retrying after refresh.")
694+
raise exc
666695

667696
def _refresh_cached_properties(self):
668697
del self.hook

providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from __future__ import annotations
2020

2121
import enum
22+
import itertools
2223
import json
2324
import math
2425
import time
@@ -117,7 +118,13 @@ def get_xcom_sidecar_container_resources(self) -> str | None:
117118

118119
def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
119120
"""Retrieve container status."""
120-
container_statuses = pod.status.container_statuses if pod and pod.status else None
121+
if pod and pod.status:
122+
container_statuses = itertools.chain(
123+
pod.status.container_statuses, pod.status.init_container_statuses
124+
)
125+
else:
126+
container_statuses = None
127+
121128
if container_statuses:
122129
# In general the variable container_statuses can store multiple items matching different containers.
123130
# The following generator expression yields all items that have name equal to the container_name.
@@ -166,6 +173,19 @@ def container_is_succeeded(pod: V1Pod, container_name: str) -> bool:
166173
return container_status.state.terminated.exit_code == 0
167174

168175

176+
def container_is_wait(pod: V1Pod, container_name: str) -> bool:
177+
"""
178+
Examine V1Pod ``pod`` to determine whether ``container_name`` is waiting.
179+
180+
If that container is present and waiting, returns True. Returns False otherwise.
181+
"""
182+
container_status = get_container_status(pod, container_name)
183+
if not container_status:
184+
return False
185+
186+
return container_status.state.waiting is not None
187+
188+
169189
def container_is_terminated(pod: V1Pod, container_name: str) -> bool:
170190
"""
171191
Examine V1Pod ``pod`` to determine whether ``container_name`` is terminated.
@@ -509,7 +529,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
509529
time.sleep(1)
510530

511531
def _reconcile_requested_log_containers(
512-
self, requested: Iterable[str] | str | bool, actual: list[str], pod_name
532+
self, requested: Iterable[str] | str | bool | None, actual: list[str], pod_name
513533
) -> list[str]:
514534
"""Return actual containers based on requested."""
515535
containers_to_log = []
@@ -552,6 +572,31 @@ def _reconcile_requested_log_containers(
552572
self.log.error("Could not retrieve containers for the pod: %s", pod_name)
553573
return containers_to_log
554574

575+
def fetch_requested_init_container_logs(
576+
self, pod: V1Pod, init_containers: Iterable[str] | str | Literal[True] | None, follow_logs=False
577+
) -> list[PodLoggingStatus]:
578+
"""
579+
Follow the logs of containers in the specified pod and publish it to airflow logging.
580+
581+
Returns when all the containers exit.
582+
583+
:meta private:
584+
"""
585+
pod_logging_statuses = []
586+
all_containers = self.get_init_container_names(pod)
587+
containers_to_log = self._reconcile_requested_log_containers(
588+
requested=init_containers,
589+
actual=all_containers,
590+
pod_name=pod.metadata.name,
591+
)
592+
# sort by spec.initContainers because containers runs sequentially
593+
containers_to_log = sorted(containers_to_log, key=lambda cn: all_containers.index(cn))
594+
for c in containers_to_log:
595+
self._await_init_container_start(pod=pod, container_name=c)
596+
status = self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs)
597+
pod_logging_statuses.append(status)
598+
return pod_logging_statuses
599+
555600
def fetch_requested_container_logs(
556601
self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], follow_logs=False
557602
) -> list[PodLoggingStatus]:
@@ -679,9 +724,22 @@ def read_pod_logs(
679724
post_termination_timeout=post_termination_timeout,
680725
)
681726

727+
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
728+
def get_init_container_names(self, pod: V1Pod) -> list[str]:
729+
"""
730+
Return container names from the POD except for the airflow-xcom-sidecar container.
731+
732+
:meta private:
733+
"""
734+
return [container_spec.name for container_spec in pod.spec.init_containers]
735+
682736
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
683737
def get_container_names(self, pod: V1Pod) -> list[str]:
684-
"""Return container names from the POD except for the airflow-xcom-sidecar container."""
738+
"""
739+
Return container names from the POD except for the airflow-xcom-sidecar container.
740+
741+
:meta private:
742+
"""
685743
pod_info = self.read_pod(pod)
686744
return [
687745
container_spec.name
@@ -819,6 +877,20 @@ def _exec_pod_command(self, resp, command: str) -> str | None:
819877
return res
820878
return None
821879

880+
def _await_init_container_start(self, pod: V1Pod, container_name: str):
881+
while True:
882+
remote_pod = self.read_pod(pod)
883+
884+
if (
885+
remote_pod.status is not None
886+
and remote_pod.status.phase != PodPhase.PENDING
887+
and get_container_status(remote_pod, container_name) is not None
888+
and not container_is_wait(remote_pod, container_name)
889+
):
890+
return
891+
892+
time.sleep(1)
893+
822894

823895
class OnFinishAction(str, enum.Enum):
824896
"""Action to take when the pod finishes."""

providers/tests/cncf/kubernetes/utils/test_pod_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ def remote_pod(running=None, not_running=None):
623623
e = RemotePodMock()
624624
e.status = RemotePodMock()
625625
e.status.container_statuses = []
626+
e.status.init_container_statuses = []
626627
for r in not_running or []:
627628
e.status.container_statuses.append(container(r, False))
628629
for r in running or []:
@@ -643,6 +644,7 @@ def container(name, running):
643644
p = RemotePodMock()
644645
p.status = RemotePodMock()
645646
p.status.container_statuses = []
647+
p.status.init_container_statuses = []
646648
pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses"))
647649
pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty"))
648650
pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 running"))
@@ -858,6 +860,7 @@ def remote_pod(succeeded=None, not_succeeded=None):
858860
e = RemotePodMock()
859861
e.status = RemotePodMock()
860862
e.status.container_statuses = []
863+
e.status.init_container_statuses = []
861864
for r in not_succeeded or []:
862865
e.status.container_statuses.append(container(r, False))
863866
for r in succeeded or []:
@@ -878,6 +881,7 @@ def container(name, succeeded):
878881
p = RemotePodMock()
879882
p.status = RemotePodMock()
880883
p.status.container_statuses = []
884+
p.status.init_container_statuses = []
881885
pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses"))
882886
pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty"))
883887
pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 succeeded"))

0 commit comments

Comments
 (0)