From f19dfa81127312c20d2cec855c4ee925e2803b2f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 12 Nov 2021 22:22:31 -0800 Subject: [PATCH 01/25] Refactor KubernetesPodOperator for clarity This refactor has the following goals: * a simpler, easier-to-follow execute method; less logic and more readable method calls * remove reliance on mutation of pod and namespace instance attributes - the `self.pod` attribute is no longer referenced or mutated anywhere outside of `execute` - the only reason we need the `pod` attribute at all is for `on_kill` * reduce code duplication * improve method names for greater transparency allow usage of kubernetes hook (waiting on Add config and context params to KubernetesHook #19695 before implementing) --- .../kubernetes/operators/kubernetes_pod.py | 341 +++++++++--------- .../cncf/kubernetes/utils/pod_launcher.py | 251 +++++++------ .../test_kubernetes_pod_operator.py | 123 ++++--- ...test_kubernetes_pod_operator_backcompat.py | 58 +-- .../operators/test_kubernetes_pod.py | 158 +++++--- .../kubernetes/utils/test_pod_launcher.py | 124 +++++-- 6 files changed, 590 insertions(+), 465 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 50e75a4565995..f0dcd2292c807 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -15,17 +15,27 @@ # specific language governing permissions and limitations # under the License. """Executes task in a Kubernetes POD""" +import json +import logging import re import warnings -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type +from contextlib import AbstractContextManager +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional from kubernetes.client import CoreV1Api, models as k8s +from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLaunchFailedException, PodStatus + try: import airflow.utils.yaml as yaml except ImportError: import yaml +try: + from functools import cached_property +except ImportError: + from cached_property import cached_property + from airflow.exceptions import AirflowException from airflow.kubernetes import kube_client, pod_generator from airflow.kubernetes.pod_generator import PodGenerator @@ -46,13 +56,16 @@ from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv from airflow.providers.cncf.kubernetes.utils import pod_launcher, xcom_sidecar from airflow.utils.helpers import validate_key -from airflow.utils.state import State from airflow.version import version as airflow_version if TYPE_CHECKING: import jinja2 +class PodReattachFailure(AirflowException): + """When we expect to be able to find a pod but cannot.""" + + class KubernetesPodOperator(BaseOperator): """ Execute a task in a Kubernetes Pod @@ -163,8 +176,12 @@ class KubernetesPodOperator(BaseOperator): :param termination_grace_period: Termination grace period if task killed in UI, defaults to kubernetes default :type termination_grace_period: int + """ + BASE_CONTAINER_NAME = 'base' + POD_CHECKED_KEY = 'already_checked' + template_fields: Iterable[str] = ( 'image', 'cmds', @@ -176,9 +193,7 @@ class KubernetesPodOperator(BaseOperator): 'namespace', ) - # fmt: off def __init__( - # fmt: on self, *, namespace: Optional[str] = None, @@ -269,8 +284,9 @@ def __init__( self.service_account_name = service_account_name self.is_delete_operator_pod = is_delete_operator_pod self.hostnetwork = hostnetwork - self.tolerations = [convert_toleration(toleration) for toleration in tolerations] \ - if tolerations else [] + self.tolerations = ( + [convert_toleration(toleration) for toleration in tolerations] if tolerations else [] + ) self.security_context = security_context or {} self.dnspolicy = dnspolicy self.schedulername = schedulername @@ -282,8 +298,8 @@ def __init__( self.name = self._set_name(name) self.random_name_suffix = random_name_suffix self.termination_grace_period = termination_grace_period - self.client: CoreV1Api = None - self.pod: k8s.V1Pod = None + self.pod_request_obj: Optional[k8s.V1Pod] = None + self.pod: Optional[k8s.V1Pod] = None def _render_nested_template_fields( self, @@ -297,15 +313,10 @@ def _render_nested_template_fields( self._do_render_template_fields(content, ('value', 'name'), context, jinja_env, seen_oids) return - super()._render_nested_template_fields( - content, - context, - jinja_env, - seen_oids - ) + super()._render_nested_template_fields(content, context, jinja_env, seen_oids) @staticmethod - def create_labels_for_pod(context) -> dict: + def _create_labels_for_pod(context) -> dict: """ Generate labels for the pod to track the pod in case of Operator crash @@ -328,101 +339,133 @@ def create_labels_for_pod(context) -> dict: labels[label_id] = safe_label return labels - def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]: - return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push) + @cached_property + def launcher(self) -> pod_launcher.PodLauncher: + return pod_launcher.PodLauncher(kube_client=self.client) - def execute(self, context) -> Optional[str]: + @cached_property + def client(self) -> CoreV1Api: + # todo: use airflow Connection / hook to authenticate to the cluster + kwargs: Dict[str, Any] = dict( + cluster_context=self.cluster_context, + config_file=self.config_file, + ) + if self.in_cluster is not None: + kwargs.update(in_cluster=self.in_cluster) + return kube_client.get_kube_client(**kwargs) + + def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: + """Returns an already-running pod for this task instance if one exists.""" + labels = self._create_labels_for_pod(context) + label_selector = self._get_pod_identifying_label_string(labels) + pod_list = self.client.list_namespaced_pod( + namespace=namespace, + label_selector=label_selector, + ).items + + num_pods = len(pod_list) + if num_pods > 1: + raise AirflowException(f'More than one pod running with labels {label_selector}') + elif num_pods == 1: + pod = pod_list[0] + self.log.info("Found matching pod %s", pod.metadata.name) + self._compare_try_numbers(context, pod) + return pod + + def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context): + if self.reattach_on_restart: + pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context) + if pod: + return pod + self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict())) + self.launcher.create_pod(pod=pod_request_obj) + return pod_request_obj + + def await_pod_start(self, pod): try: - if self.in_cluster is not None: - client = kube_client.get_kube_client( - in_cluster=self.in_cluster, - cluster_context=self.cluster_context, - config_file=self.config_file, - ) - else: - client = kube_client.get_kube_client( - cluster_context=self.cluster_context, config_file=self.config_file - ) - - self.client = client - - self.pod = self.create_pod_request_obj() - self.namespace = self.pod.metadata.namespace - - # Add combination of labels to uniquely identify a running pod - labels = self.create_labels_for_pod(context) - - label_selector = self._get_pod_identifying_label_string(labels) - - pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector) + self.launcher.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds) + except PodLaunchFailedException: + if self.log_events_on_failure: + for event in self.launcher.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + raise - if len(pod_list.items) > 1 and self.reattach_on_restart: - raise AirflowException( - f'More than one pod running with labels: {label_selector}' - ) + def extract_xcom(self, pod): + """Retrieves xcom value and kills xcom sidecar container""" + result = self.launcher.extract_xcom(pod) + self.log.info(result) + return json.loads(result) - launcher = self.create_pod_launcher() + def execute(self, context): + remote_pod = None + try: + self.pod_request_obj = self.build_pod_request_obj(context) + self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill` + pod_request_obj=self.pod_request_obj, + context=context, + ) + self.await_pod_start(pod=self.pod) - if len(pod_list.items) == 1: - try_numbers_match = self._try_numbers_match(context, pod_list.items[0]) - final_state, remote_pod, result = self.handle_pod_overlap( - labels, try_numbers_match, launcher, pod_list.items[0] + if self.get_logs: + self.launcher.follow_container_logs( + pod=self.pod, + container_name=self.BASE_CONTAINER_NAME, ) else: - self.log.info("creating pod with labels %s and launcher %s", labels, launcher) - final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) - if final_state != State.SUCCESS: - raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}') - context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name) - context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace) - return result - except AirflowException as ex: - raise AirflowException(f'Pod Launching failed: {ex}') + self.launcher.await_container_completion( + pod=self.pod, container_name=self.BASE_CONTAINER_NAME + ) - def handle_pod_overlap( - self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod - ) -> Tuple[State, k8s.V1Pod, Optional[str]]: - """ + if self.do_xcom_push: + result = self.extract_xcom(pod=self.pod) + remote_pod = self.launcher.await_pod_completion(self.pod) + finally: + self.cleanup( + pod=self.pod or self.pod_request_obj, + remote_pod=remote_pod, + ) + if self.do_xcom_push: + ti = context['ti'] + if remote_pod: + ti.xcom_push(key='pod_name', value=remote_pod.metadata.name) + ti.xcom_push(key='pod_namespace', value=remote_pod.metadata.namespace) + return result - In cases where the Scheduler restarts while a KubernetesPodOperator task is running, - this function will either continue to monitor the existing pod or launch a new pod - based on the `reattach_on_restart` parameter. + def cleanup(self, pod, remote_pod): + with _suppress(Exception): + self.process_pod_deletion(pod) - :param labels: labels used to determine if a pod is repeated - :type labels: dict - :param try_numbers_match: do the try numbers match? Only needed for logging purposes - :type try_numbers_match: bool - :param launcher: PodLauncher - :param pod: Pod found with matching labels - """ - if try_numbers_match: - log_line = f"found a running pod with labels {labels} and the same try_number." - else: - log_line = f"found a running pod with labels {labels} but a different try_number." - - # In case of failed pods, should reattach the first time, but only once - # as the task will have already failed. - if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"): - log_line += " Will attach to this pod and monitor instead of starting new one" - self.log.info(log_line) - self.pod = pod - final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod) + pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None + if pod_phase != PodStatus.SUCCEEDED: + if self.log_events_on_failure: + with _suppress(Exception): + for event in self.launcher.read_pod_events(pod).items: + self.log.error("Pod Event: %s - %s", event.reason, event.message) + if not self.is_delete_operator_pod: + with _suppress(Exception): + self.patch_already_checked(pod) + raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}') + + def process_pod_deletion(self, pod): + if self.is_delete_operator_pod: + self.log.info("deleting pod: %s", pod.metadata.name) + self.launcher.delete_pod(pod) else: - log_line += f"creating pod with labels {labels} and launcher {launcher}" - self.log.info(log_line) - final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) - return final_state, remote_pod, result + self.log.info("skipping deleting pod: %s", pod.metadata.name) - @staticmethod - def _get_pod_identifying_label_string(labels) -> str: + def _get_pod_identifying_label_string(self, labels) -> str: label_strings = [ f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number' ] - return ','.join(label_strings) + ',already_checked!=True' - - @staticmethod - def _try_numbers_match(context, pod) -> bool: - return pod.metadata.labels['try_number'] == context['ti'].try_number + return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True' + + def _compare_try_numbers(self, context, pod): + tries_match = pod.metadata.labels['try_number'] == context['ti'].try_number + self.log.info( + "found a running pod with labels %s %s try_number.", + pod.metadata.labels, + "and the same" if tries_match else "but a different", + ) def _set_name(self, name): if name is None: @@ -433,7 +476,24 @@ def _set_name(self, name): validate_key(name, max_length=220) return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) - def create_pod_request_obj(self) -> k8s.V1Pod: + def patch_already_checked(self, pod: k8s.V1Pod): + """Add an "already checked" annotation to ensure we don't reattach on retries""" + pod.metadata.labels[self.POD_CHECKED_KEY] = "True" + body = PodGenerator.serialize_pod(pod) + self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body) + + def on_kill(self) -> None: + if self.pod: + pod = self.pod + kwargs = dict( + name=pod.metadata.name, + namespace=pod.metadata.namespace, + ) + if self.termination_grace_period is not None: + kwargs.update(grace_period_seconds=self.termination_grace_period) + self.client.delete_namespaced_pod(**kwargs) + + def build_pod_request_obj(self, context): """ Creates a V1Pod based on user parameters. Note that a `pod` or `pod_template_file` will supersede all other values. @@ -467,7 +527,7 @@ def create_pod_request_obj(self) -> k8s.V1Pod: containers=[ k8s.V1Container( image=self.image, - name="base", + name=self.BASE_CONTAINER_NAME, command=self.cmds, ports=self.ports, image_pull_policy=self.image_pull_policy, @@ -501,83 +561,40 @@ def create_pod_request_obj(self) -> k8s.V1Pod: if self.do_xcom_push: self.log.debug("Adding xcom sidecar to task %s", self.task_id) pod = xcom_sidecar.add_xcom_sidecar(pod) - return pod - def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]: - """ - Creates a new pod and monitors for duration of task - - :param labels: labels used to track pod - :param launcher: pod launcher that will manage launching and monitoring pods - :return: - """ - self.log.debug( - "Adding KubernetesPodOperator labels to pod before launch for task %s", self.task_id - ) + labels = self._create_labels_for_pod(context) + self.log.info("creating pod %s with labels: %s", pod.metadata.name, labels) # Merge Pod Identifying labels with labels passed to operator - self.pod.metadata.labels.update(labels) + pod.metadata.labels.update(labels) # Add Airflow Version to the label # And a label to identify that pod is launched by KubernetesPodOperator - self.pod.metadata.labels.update( + pod.metadata.labels.update( { 'airflow_version': airflow_version.replace('+', '-'), 'kubernetes_pod_operator': 'True', } ) + return pod - self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict())) - final_state = None - try: - launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds) - final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs) - except AirflowException: - if self.log_events_on_failure: - for event in launcher.read_pod_events(self.pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) - raise - finally: - if self.is_delete_operator_pod: - self.log.debug("Deleting pod for task %s", self.task_id) - launcher.delete_pod(self.pod) - elif final_state != State.SUCCESS: - self.patch_already_checked(self.pod) - return final_state, remote_pod, result - def patch_already_checked(self, pod: k8s.V1Pod): - """Add an "already tried annotation to ensure we only retry once""" - pod.metadata.labels["already_checked"] = "True" - body = PodGenerator.serialize_pod(pod) - self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body) +class _suppress(AbstractContextManager): + """ + This behaves the same as contextlib.suppress but logs the suppressed + exceptions as errors with traceback. + """ - def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]: - """ - Monitors a pod to completion that was created by a previous KubernetesPodOperator + def __init__(self, *exceptions): + self._exceptions = exceptions + self.exception = None - :param launcher: pod launcher that will manage launching and monitoring pods - :param pod: podspec used to find pod using k8s API - :return: - """ - try: - (final_state, remote_pod, result) = launcher.monitor_pod(pod, get_logs=self.get_logs) - finally: - if self.is_delete_operator_pod: - launcher.delete_pod(pod) - if final_state != State.SUCCESS: - if self.log_events_on_failure: - for event in launcher.read_pod_events(pod).items: - self.log.error("Pod Event: %s - %s", event.reason, event.message) - if not self.is_delete_operator_pod: - self.patch_already_checked(pod) - raise AirflowException(f'Pod returned a failure: {final_state}') - return final_state, remote_pod, result + def __enter__(self): + return self - def on_kill(self) -> None: - if self.pod: - pod: k8s.V1Pod = self.pod - namespace = pod.metadata.namespace - name = pod.metadata.name - kwargs = {} - if self.termination_grace_period is not None: - kwargs = {"grace_period_seconds": self.termination_grace_period} - self.client.delete_namespaced_pod(name=name, namespace=namespace, **kwargs) + def __exit__(self, exctype, excinst, exctb): + caught_error = exctype is not None and issubclass(exctype, self._exceptions) + if caught_error: + self.exception = excinst + logger = logging.getLogger() + logger.error(str(excinst), exc_info=True) + return caught_error diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index e76ae40b7b1e3..ab56d25cdfaa9 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -18,13 +18,13 @@ import json import math import time +from contextlib import closing from datetime import datetime as dt from typing import Iterable, Optional, Tuple, Union import pendulum import tenacity from kubernetes import client, watch -from kubernetes.client.models.v1_event import V1Event from kubernetes.client.models.v1_event_list import V1EventList from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.rest import ApiException @@ -38,7 +38,10 @@ from airflow.kubernetes.pod_generator import PodDefaults from airflow.settings import pod_mutation_hook from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.state import State + + +class PodLaunchFailedException(AirflowException): + """When pod launching fails in KubernetesPodOperator.""" def should_retry_start_pod(exception: Exception) -> bool: @@ -51,10 +54,22 @@ def should_retry_start_pod(exception: Exception) -> bool: class PodStatus: """Status of the PODs""" - PENDING = 'pending' - RUNNING = 'running' - FAILED = 'failed' - SUCCEEDED = 'succeeded' + PENDING = 'Pending' + RUNNING = 'Running' + FAILED = 'Failed' + SUCCEEDED = 'Succeeded' + + terminal_states = {FAILED, SUCCEEDED} + + +def container_is_running(pod: V1Pod, container_name: str) -> bool: + container_statuses = pod.status.container_statuses if pod and pod.status else None + if not container_statuses: + return False + container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) + if not container_status: + return False + return container_status.state.running is not None class PodLauncher(LoggingMixin): @@ -65,7 +80,6 @@ def __init__( kube_client: client.CoreV1Api = None, in_cluster: bool = True, cluster_context: Optional[str] = None, - extract_xcom: bool = False, ): """ Creates the launcher. @@ -73,12 +87,10 @@ def __init__( :param kube_client: kubernetes client :param in_cluster: whether we are in cluster :param cluster_context: context of the cluster - :param extract_xcom: whether we should extract xcom """ super().__init__() self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) self._watch = watch.Watch() - self.extract_xcom = extract_xcom def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod: """Runs POD asynchronously""" @@ -117,79 +129,99 @@ def delete_pod(self, pod: V1Pod) -> None: reraise=True, retry=tenacity.retry_if_exception(should_retry_start_pod), ) - def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None: + def create_pod(self, pod: V1Pod) -> V1Pod: + """Launches the pod asynchronously.""" + return self.run_pod_async(pod) + + def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None: """ - Launches the pod synchronously and waits for completion. + Waits for the pod to reach phase other than ``Pending`` :param pod: :param startup_timeout: Timeout (in seconds) for startup of the pod (if pod is pending for too long, fails task) :return: """ - resp = self.run_pod_async(pod) curr_time = dt.now() - if resp.status.start_time is None: - while self.pod_not_started(pod): - self.log.warning("Pod not yet started: %s", pod.metadata.name) - delta = dt.now() - curr_time - if delta.total_seconds() >= startup_timeout: - msg = ( - f"Pod took longer than {startup_timeout} seconds to start. " - "Check the pod events in kubernetes to determine why." - ) - raise AirflowException(msg) - time.sleep(1) - - def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optional[str]]: + while True: + remote_pod = self.read_pod(pod) + if remote_pod.status.phase != PodStatus.PENDING: + break + self.log.warning("Pod not yet started: %s", pod.metadata.name) + delta = dt.now() - curr_time + if delta.total_seconds() >= startup_timeout: + msg = ( + f"Pod took longer than {startup_timeout} seconds to start. " + "Check the pod events in kubernetes to determine why." + ) + raise PodLaunchFailedException(msg) + time.sleep(1) + + def follow_container_logs(self, pod: V1Pod, container_name: str): + """ + Follows the logs of container and streams to airflow logging. + Returns when container exits. + """ + container_stopped = False + read_logs_since_sec = None + last_log_time = None + + # `read_pod_logs` follows the logs so we shouldn't necessarily _need_ to loop + # but in a long-running process we might lose connectivity and this way we + # can resume following the logs + while True: + try: + logs = self.read_pod_logs( + pod=pod, + container_name=container_name, + timestamps=True, + since_seconds=read_logs_since_sec, + ) + for line in logs: # type: bytes + timestamp, message = self.parse_log_line(line.decode('utf-8')) + self.log.info(message) + if timestamp: + last_log_time = timestamp + except BaseHTTPError: # Catches errors like ProtocolError(TimeoutError). + self.log.warning( + 'Failed to read logs for pod %s', + pod.metadata.name, + exc_info=True, + ) + + if container_stopped is True: + break + + if last_log_time: + delta = pendulum.now() - last_log_time + read_logs_since_sec = math.ceil(delta.total_seconds()) + + time.sleep(1) + + if self.container_is_running(pod, container_name=container_name): + self.log.info('Container %s is running', pod.metadata.name) + self.log.warning('Pod %s log read interrupted', pod.metadata.name) + else: + container_stopped = True # fetch logs once more and exit + + def await_container_completion(self, pod: V1Pod, container_name: str) -> None: + while not self.container_is_running(pod=pod, container_name=container_name): + time.sleep(1) + + def await_pod_completion(self, pod: V1Pod) -> V1Pod: """ - Monitors a pod and returns the final state, pod and xcom result + Monitors a pod and returns the final state :param pod: pod spec that will be monitored - :param get_logs: whether to read the logs locally :return: Tuple[State, Optional[str]] """ - if get_logs: - read_logs_since_sec = None - last_log_time = None - while True: - try: - logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec) - for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8')) - self.log.info(message) - if timestamp: - last_log_time = timestamp - except BaseHTTPError: - # Catches errors like ProtocolError(TimeoutError). - self.log.warning( - 'Failed to read logs for pod %s', - pod.metadata.name, - exc_info=True, - ) - - time.sleep(1) - - if not self.base_container_is_running(pod): - break - - self.log.warning('Pod %s log read interrupted', pod.metadata.name) - if last_log_time: - delta = pendulum.now() - last_log_time - # Prefer logs duplication rather than loss - read_logs_since_sec = math.ceil(delta.total_seconds()) - result = None - if self.extract_xcom: - while self.base_container_is_running(pod): - self.log.info('Container %s has state %s', pod.metadata.name, State.RUNNING) - time.sleep(2) - result = self._extract_xcom(pod) - self.log.info(result) - result = json.loads(result) - while self.pod_is_running(pod): - self.log.info('Pod %s has state %s', pod.metadata.name, State.RUNNING) + while True: + remote_pod = self.read_pod(pod) + if remote_pod.status.phase in PodStatus.terminal_states: + break + self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) time.sleep(2) - remote_pod = self.read_pod(pod) - return self._task_status(remote_pod), remote_pod, result + return remote_pod def parse_log_line(self, line: str) -> Tuple[Optional[Union[Date, Time, DateTime, Duration]], str]: """ @@ -212,35 +244,16 @@ def parse_log_line(self, line: str) -> Tuple[Optional[Union[Date, Time, DateTime return None, line return last_log_time, message - def _task_status(self, event: V1Event) -> str: - self.log.info('Event: %s had an event of type %s', event.metadata.name, event.status.phase) - status = self.process_status(event.metadata.name, event.status.phase) - return status - - def pod_not_started(self, pod: V1Pod) -> bool: - """Tests if pod has not started""" - state = self._task_status(self.read_pod(pod)) - return state == State.QUEUED - - def pod_is_running(self, pod: V1Pod) -> bool: - """Tests if pod is running""" - state = self._task_status(self.read_pod(pod)) - return state not in (State.SUCCESS, State.FAILED) - - def base_container_is_running(self, pod: V1Pod) -> bool: - """Tests if base container is running""" - event = self.read_pod(pod) - if not (event and event.status and event.status.container_statuses): - return False - status = next(iter(filter(lambda s: s.name == 'base', event.status.container_statuses)), None) - if not status: - return False - return status.state.running is not None + def container_is_running(self, pod: V1Pod, container_name) -> bool: + """Reads pod and checks if container is running""" + remote_pod = self.read_pod(pod) + return container_is_running(pod=remote_pod, container_name=container_name) @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def read_pod_logs( self, pod: V1Pod, + container_name: str, tail_lines: Optional[int] = None, timestamps: bool = False, since_seconds: Optional[int] = None, @@ -257,7 +270,7 @@ def read_pod_logs( return self._client.read_namespaced_pod_log( name=pod.metadata.name, namespace=pod.metadata.namespace, - container='base', + container=container_name, follow=True, timestamps=timestamps, _preload_content=False, @@ -265,7 +278,6 @@ def read_pod_logs( ) except BaseHTTPError: self.log.exception('There was an error reading the kubernetes API.') - # Reraise to be caught by self.monitor_pod. raise @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) @@ -286,29 +298,29 @@ def read_pod(self, pod: V1Pod) -> V1Pod: except BaseHTTPError as e: raise AirflowException(f'There was an error reading the kubernetes API: {e}') - def _extract_xcom(self, pod: V1Pod) -> str: - resp = kubernetes_stream( - self._client.connect_get_namespaced_pod_exec, - pod.metadata.name, - pod.metadata.namespace, - container=PodDefaults.SIDECAR_CONTAINER_NAME, - command=['/bin/sh'], - stdin=True, - stdout=True, - stderr=True, - tty=False, - _preload_content=False, - ) - try: + def extract_xcom(self, pod: V1Pod) -> str: + """Retrieves xcom value using xcom value and kills xcom sidecar container""" + with closing( + kubernetes_stream( + self._client.connect_get_namespaced_pod_exec, + pod.metadata.name, + pod.metadata.namespace, + container=PodDefaults.SIDECAR_CONTAINER_NAME, + command=['/bin/sh'], + stdin=True, + stdout=True, + stderr=True, + tty=False, + _preload_content=False, + ) + ) as resp: result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json') self._exec_pod_command(resp, 'kill -s SIGINT 1') - finally: - resp.close() if result is None: raise AirflowException(f'Failed to extract xcom from pod: {pod.metadata.name}') return result - def _exec_pod_command(self, resp, command: str) -> None: + def _exec_pod_command(self, resp, command: str) -> Optional[str]: if resp.is_open(): self.log.info('Running command... %s\n', command) resp.write_stdin(command + '\n') @@ -320,20 +332,3 @@ def _exec_pod_command(self, resp, command: str) -> None: self.log.info(resp.read_stderr()) break return None - - def process_status(self, job_id: str, status: str) -> str: - """Process status information for the JOB""" - status = status.lower() - if status == PodStatus.PENDING: - return State.QUEUED - elif status == PodStatus.FAILED: - self.log.error('Event with job id %s Failed', job_id) - return State.FAILED - elif status == PodStatus.SUCCEEDED: - self.log.info('Event with job id %s Succeeded', job_id) - return State.SUCCESS - elif status == PodStatus.RUNNING: - return State.RUNNING - else: - self.log.error('Event: Invalid state %s on job %s', status, job_id) - return State.FAILED diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 3944dc7974db1..082d2ceb39b1a 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -23,7 +23,7 @@ import textwrap import unittest from unittest import mock -from unittest.mock import ANY +from unittest.mock import ANY, MagicMock import pendulum import pytest @@ -34,7 +34,7 @@ from airflow.exceptions import AirflowException from airflow.kubernetes import kube_client from airflow.kubernetes.secret import Secret -from airflow.models import DAG, DagRun, TaskInstance +from airflow.models import DAG, XCOM_RETURN_KEY, DagRun, TaskInstance from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults @@ -156,6 +156,7 @@ def test_config_path_move(self): task_id="task" + self.get_current_task_name(), in_cluster=False, do_xcom_push=False, + is_delete_operator_pod=False, config_file=new_config_path, ) context = create_context(k) @@ -516,12 +517,10 @@ def test_faulty_service_account(self): startup_timeout_seconds=5, service_account_name=bad_service_account_name, ) - with pytest.raises(ApiException): - context = create_context(k) - k.execute(context) - actual_pod = self.api_client.sanitize_for_serialization(k.pod) - self.expected_pod['spec']['serviceAccountName'] = bad_service_account_name - assert self.expected_pod == actual_pod + context = create_context(k) + pod = k.build_pod_request_obj(context) + with pytest.raises(ApiException, match="error looking up service account default/foobar"): + k.get_or_create_pod(pod, context) def test_pod_failure(self): """ @@ -546,7 +545,8 @@ def test_pod_failure(self): self.expected_pod['spec']['containers'][0]['args'] = bad_internal_command assert self.expected_pod == actual_pod - def test_xcom_push(self): + @mock.patch("airflow.models.taskinstance.TaskInstance.xcom_push") + def test_xcom_push(self, xcom_push): return_value = '{"foo": "bar"\n, "buzz": 2}' args = [f'echo \'{return_value}\' > /airflow/xcom/return.json'] k = KubernetesPodOperator( @@ -561,7 +561,8 @@ def test_xcom_push(self): do_xcom_push=True, ) context = create_context(k) - assert k.execute(context) == json.loads(return_value) + k.execute(context) + assert xcom_push.called_once_with(key=XCOM_RETURN_KEY, value=json.loads(return_value)) actual_pod = self.api_client.sanitize_for_serialization(k.pod) volume = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME) volume_mount = self.api_client.sanitize_for_serialization(PodDefaults.VOLUME_MOUNT) @@ -572,12 +573,11 @@ def test_xcom_push(self): self.expected_pod['spec']['containers'].append(container) assert self.expected_pod == actual_pod - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock): + def test_envs_from_secrets(self, mock_client, await_pod_completion_mock, create_pod): # GIVEN - from airflow.utils.state import State secret_ref = 'secret_name' secrets = [Secret('env', None, secret_ref)] @@ -595,10 +595,11 @@ def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock): do_xcom_push=False, ) # THEN - monitor_mock.return_value = (State.SUCCESS, None, None) + await_pod_completion_mock.return_value = None context = create_context(k) - k.execute(context) - assert start_mock.call_args[0][0].spec.containers[0].env_from == [ + with pytest.raises(AirflowException): + k.execute(context) + assert create_pod.call_args[1]['pod'].spec.containers[0].env_from == [ k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=secret_ref)) ] @@ -625,12 +626,9 @@ def test_env_vars(self): in_cluster=False, do_xcom_push=False, ) - - context = create_context(k) - k.execute(context) - # THEN - actual_pod = self.api_client.sanitize_for_serialization(k.pod) + context = create_context(k) + actual_pod = self.api_client.sanitize_for_serialization(k.build_pod_request_obj(context)) self.expected_pod['spec']['containers'][0]['env'] = [ {'name': 'ENV1', 'value': 'val1'}, {'name': 'ENV2', 'value': 'val2'}, @@ -741,6 +739,7 @@ def test_full_pod_spec(self): in_cluster=False, full_pod_spec=pod_spec, do_xcom_push=True, + is_delete_operator_pod=False, ) context = create_context(k) @@ -814,12 +813,12 @@ def test_init_container(self): ] assert self.expected_pod == actual_pod - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.extract_xcom") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_pod_template_file(self, mock_client, monitor_mock, start_mock): - from airflow.utils.state import State - + def test_pod_template_file(self, mock_client, await_pod_completion_mock, create_mock, extract_xcom_mock): + extract_xcom_mock.return_value = '{}' path = sys.path[0] + '/tests/kubernetes/pod.yaml' k = KubernetesPodOperator( task_id="task" + self.get_current_task_name(), @@ -827,8 +826,9 @@ def test_pod_template_file(self, mock_client, monitor_mock, start_mock): pod_template_file=path, do_xcom_push=True, ) - - monitor_mock.return_value = (State.SUCCESS, None, None) + pod_mock = MagicMock() + pod_mock.status.phase = 'Succeeded' + await_pod_completion_mock.return_value = pod_mock context = create_context(k) with self.assertLogs(k.log, level=logging.DEBUG) as cm: k.execute(context) @@ -899,12 +899,11 @@ def test_pod_template_file(self, mock_client, monitor_mock, start_mock): del actual_pod['metadata']['labels']['airflow_version'] assert expected_dict == actual_pod - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_pod_priority_class_name(self, mock_client, monitor_mock, start_mock): + def test_pod_priority_class_name(self, mock_client, await_pod_completion_mock, create_mock): """Test ability to assign priorityClassName to pod""" - from airflow.utils.state import State priority_class_name = "medium-test" k = KubernetesPodOperator( @@ -920,7 +919,9 @@ def test_pod_priority_class_name(self, mock_client, monitor_mock, start_mock): priority_class_name=priority_class_name, ) - monitor_mock.return_value = (State.SUCCESS, None, None) + pod_mock = MagicMock() + pod_mock.status.phase = 'Succeeded' + await_pod_completion_mock.return_value = pod_mock context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) @@ -942,9 +943,8 @@ def test_pod_name(self): do_xcom_push=False, ) - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod") - def test_on_kill(self, monitor_mock): - from airflow.utils.state import State + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") + def test_on_kill(self, await_pod_completion_mock): client = kube_client.get_kube_client(in_cluster=False) name = "test" @@ -959,21 +959,20 @@ def test_on_kill(self, monitor_mock): task_id=name, in_cluster=False, do_xcom_push=False, + get_logs=False, termination_grace_period=0, ) context = create_context(k) - monitor_mock.return_value = (State.SUCCESS, None, None) - k.execute(context) + with pytest.raises(AirflowException): + k.execute(context) name = k.pod.metadata.name pod = client.read_namespaced_pod(name=name, namespace=namespace) assert pod.status.phase == "Running" k.on_kill() - with pytest.raises(ApiException): - pod = client.read_namespaced_pod(name=name, namespace=namespace) + with pytest.raises(ApiException, match=r'pods \\"test.[a-z0-9]+\\" not found'): + client.read_namespaced_pod(name=name, namespace=namespace) def test_reattach_failing_pod_once(self): - from airflow.utils.state import State - client = kube_client.get_kube_client(in_cluster=False) name = "test" namespace = "default" @@ -993,24 +992,38 @@ def test_reattach_failing_pod_once(self): context = create_context(k) + # launch pod with mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod" - ) as monitor_mock: - monitor_mock.return_value = (State.SUCCESS, None, None) + "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion" + ) as await_pod_completion_mock: + pod_mock = MagicMock() + + # we don't want failure because we don't want the pod to be patched as "already_checked" + pod_mock.status.phase = 'Succeeded' + await_pod_completion_mock.return_value = pod_mock k.execute(context) name = k.pod.metadata.name pod = client.read_namespaced_pod(name=name, namespace=namespace) while pod.status.phase != "Failed": pod = client.read_namespaced_pod(name=name, namespace=namespace) - with pytest.raises(AirflowException): - k.execute(context) - pod = client.read_namespaced_pod(name=name, namespace=namespace) - assert pod.metadata.labels["already_checked"] == "True" + assert 'already_checked' not in pod.metadata.labels + + # should not call `create_pod`, because there's a pod there it should find + # should use the found pod and patch as "already_checked" (in failure block) with mock.patch( - "airflow.providers.cncf.kubernetes" - ".operators.kubernetes_pod.KubernetesPodOperator" - ".create_new_pod_for_operator" + "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod" ) as create_mock: - create_mock.return_value = ("success", {}, {}) - k.execute(context) + with pytest.raises(AirflowException): + k.execute(context) + pod = client.read_namespaced_pod(name=name, namespace=namespace) + assert pod.metadata.labels["already_checked"] == "True" + create_mock.assert_not_called() + + # `create_pod` should be called because though there's still a pod to be found, + # it will be `already_checked` + with mock.patch( + "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod" + ) as create_mock: + with pytest.raises(AirflowException): + k.execute(context) create_mock.assert_called_once() diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py index 6cbfabd5e91ad..c11375579a67b 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py +++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py @@ -19,7 +19,7 @@ import sys import unittest from unittest import mock -from unittest.mock import patch +from unittest.mock import MagicMock, patch import kubernetes.client.models as k8s import pendulum @@ -39,7 +39,6 @@ from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils import timezone -from airflow.utils.state import State from airflow.version import version as airflow_version # noinspection DuplicatedCode @@ -118,10 +117,10 @@ def tearDown(self): client = kube_client.get_kube_client(in_cluster=False) client.delete_collection_namespaced_pod(namespace="default") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_image_pull_secrets_correctly_set(self, mock_client, monitor_mock, start_mock): + def test_image_pull_secrets_correctly_set(self, mock_client, await_pod_completion_mock, create_mock): fake_pull_secrets = "fakeSecret" k = KubernetesPodOperator( namespace='default', @@ -136,10 +135,12 @@ def test_image_pull_secrets_correctly_set(self, mock_client, monitor_mock, start image_pull_secrets=fake_pull_secrets, cluster_context='default', ) - monitor_mock.return_value = (State.SUCCESS, None, None) + mock_pod = MagicMock() + mock_pod.status.phase = 'Succeeded' + await_pod_completion_mock.return_value = mock_pod context = create_context(k) k.execute(context=context) - assert start_mock.call_args[0][0].spec.image_pull_secrets == [ + assert create_mock.call_args[1]['pod'].spec.image_pull_secrets == [ k8s.V1LocalObjectReference(name=fake_pull_secrets) ] @@ -378,9 +379,11 @@ def test_fs_group(self): assert self.expected_pod == actual_pod def test_faulty_service_account(self): - bad_service_account_name = "foobar" + """pod creation should fail when service account does not exist""" + service_account = "foobar" + namespace = "default" k = KubernetesPodOperator( - namespace='default', + namespace=namespace, image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -390,14 +393,14 @@ def test_faulty_service_account(self): in_cluster=False, do_xcom_push=False, startup_timeout_seconds=5, - service_account_name=bad_service_account_name, + service_account_name=service_account, ) - with pytest.raises(ApiException): - context = create_context(k) - k.execute(context) - actual_pod = self.api_client.sanitize_for_serialization(k.pod) - self.expected_pod['spec']['serviceAccountName'] = bad_service_account_name - assert self.expected_pod == actual_pod + context = create_context(k) + pod = k.build_pod_request_obj(context) + with pytest.raises( + ApiException, match=f"error looking up service account {namespace}/{service_account}" + ): + k.get_or_create_pod(pod, context) def test_pod_failure(self): """ @@ -448,8 +451,8 @@ def test_xcom_push(self): self.expected_pod['spec']['containers'].append(container) assert self.expected_pod == actual_pod - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start): # GIVEN @@ -468,17 +471,19 @@ def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start): configmaps=[configmap], ) # THEN - mock_monitor.return_value = (State.SUCCESS, None, None) + mock_pod = MagicMock() + mock_pod.status.phase = 'Succeeded' + mock_monitor.return_value = mock_pod context = create_context(k) k.execute(context) - assert mock_start.call_args[0][0].spec.containers[0].env_from == [ + assert mock_start.call_args[1]['pod'].spec.containers[0].env_from == [ k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)) ] - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") - def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock): + def test_envs_from_secrets(self, mock_client, await_pod_completion_mock, create_mock): # GIVEN secret_ref = 'secret_name' secrets = [Secret('env', None, secret_ref)] @@ -496,10 +501,13 @@ def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock): do_xcom_push=False, ) # THEN - monitor_mock.return_value = (State.SUCCESS, None, None) + + mock_pod = MagicMock() + mock_pod.status.phase = 'Succeeded' + await_pod_completion_mock.return_value = mock_pod context = create_context(k) k.execute(context) - assert start_mock.call_args[0][0].spec.containers[0].env_from == [ + assert create_mock.call_args[1]['pod'].spec.containers[0].env_from == [ k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=secret_ref)) ] diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index 5caa50062c5b3..f2a7e7a7dc85e 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -17,6 +17,7 @@ import unittest from tempfile import NamedTemporaryFile from unittest import mock +from unittest.mock import MagicMock import pytest from kubernetes.client import ApiClient, models as k8s @@ -25,27 +26,44 @@ from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun, TaskInstance from airflow.models.xcom import IN_MEMORY_DAGRUN_ID -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress from airflow.utils import timezone -from airflow.utils.state import State DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0) +def create_context(task): + dag = DAG(dag_id="dag") + task_instance = TaskInstance(task=task, run_id="kub_pod_test") + task_instance.dag_run = DagRun(run_id="kub_pod_test", execution_date=DEFAULT_DATE) + return { + "dag": dag, + "ts": DEFAULT_DATE.isoformat(), + "task": task, + "ti": task_instance, + "task_instance": task_instance, + } + + class TestKubernetesPodOperator(unittest.TestCase): def setUp(self): - self.start_patch = mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.start_pod" + self.create_pod_patch = mock.patch( + "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod" + ) + self.await_pod_patch = mock.patch( + "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_start" ) - self.monitor_patch = mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod" + self.await_pod_completion_patch = mock.patch( + "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion" ) self.client_patch = mock.patch("airflow.kubernetes.kube_client.get_kube_client") - self.start_mock = self.start_patch.start() - self.monitor_mock = self.monitor_patch.start() + self.create_mock = self.create_pod_patch.start() + self.await_start_mock = self.await_pod_patch.start() + self.await_pod_mock = self.await_pod_completion_patch.start() self.client_mock = self.client_patch.start() - self.addCleanup(self.start_patch.stop) - self.addCleanup(self.monitor_patch.stop) + self.addCleanup(self.create_pod_patch.stop) + self.addCleanup(self.await_pod_patch.stop) + self.addCleanup(self.await_pod_completion_patch.stop) self.addCleanup(self.client_patch.stop) @staticmethod @@ -62,10 +80,15 @@ def create_context(task): } def run_pod(self, operator) -> k8s.V1Pod: - self.monitor_mock.return_value = (State.SUCCESS, None, None) - context = self.create_context(operator) + context = create_context(operator) + pod_request_obj = operator.build_pod_request_obj(context) + remote_pod_mock = MagicMock() + remote_pod_mock.status.phase = 'Succeeded' + remote_pod_mock.metadata.name = pod_request_obj.metadata.name + remote_pod_mock.metadata.namespace = pod_request_obj.metadata.namespace + self.await_pod_mock.return_value = remote_pod_mock operator.execute(context=context) - return self.start_mock.call_args[0][0] + return self.await_start_mock.call_args[1]['pod'] def sanitize_for_serialization(self, obj): return ApiClient().sanitize_for_serialization(obj) @@ -85,9 +108,11 @@ def test_config_path(self): config_file=file_path, cluster_context="default", ) - self.monitor_mock.return_value = (State.SUCCESS, None, None) + remote_pod_mock = MagicMock() + remote_pod_mock.status.phase = 'Succeeded' + self.await_pod_mock.return_value = remote_pod_mock self.client_mock.list_namespaced_pod.return_value = [] - context = self.create_context(k) + context = create_context(k) k.execute(context=context) self.client_mock.assert_called_once_with( in_cluster=False, @@ -170,7 +195,8 @@ def test_image_pull_secrets_correctly_set(self): image_pull_secrets=[k8s.V1LocalObjectReference(fake_pull_secrets)], cluster_context="default", ) - pod = k.create_pod_request_obj() + + pod = k.build_pod_request_obj(create_context(k)) assert pod.spec.image_pull_secrets == [k8s.V1LocalObjectReference(name=fake_pull_secrets)] def test_image_pull_policy_correctly_set(self): @@ -187,7 +213,7 @@ def test_image_pull_policy_correctly_set(self): image_pull_policy="Always", cluster_context="default", ) - pod = k.create_pod_request_obj() + pod = k.build_pod_request_obj(create_context(k)) assert pod.spec.containers[0].image_pull_policy == "Always" @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") @@ -205,9 +231,9 @@ def test_pod_delete_even_on_launcher_error(self, delete_pod_mock): cluster_context="default", is_delete_operator_pod=True, ) - self.monitor_mock.side_effect = AirflowException("fake failure") + self.await_pod_mock.side_effect = AirflowException("fake failure") with pytest.raises(AirflowException): - context = self.create_context(k) + context = create_context(k) k.execute(context=context) assert delete_pod_mock.called @@ -225,7 +251,7 @@ def test_provided_pod_name(self, randomize_name): do_xcom_push=False, cluster_context="default", ) - pod = k.create_pod_request_obj() + pod = k.build_pod_request_obj(create_context(k)) if randomize_name: assert pod.metadata.name.startswith(name_base) @@ -462,7 +488,9 @@ def test_pod_template_file(self, randomize_name): "execution_date": mock.ANY, } - def test_describes_pod_on_failure(self): + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.follow_container_logs") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_container_completion") + def test_describes_pod_on_failure(self, await_container_mock, follow_container_mock): name_base = "test" k = KubernetesPodOperator( @@ -477,22 +505,20 @@ def test_describes_pod_on_failure(self): do_xcom_push=False, cluster_context="default", ) - failed_pod_status = "read_pod_namespaced_result" - self.monitor_mock.return_value = (State.FAILED, failed_pod_status, None) - read_namespaced_pod_mock = self.client_mock.return_value.read_namespaced_pod - read_namespaced_pod_mock.return_value = failed_pod_status + follow_container_mock.return_value = None + remote_pod_mock = MagicMock() + remote_pod_mock.status.phase = 'Failed' + self.await_pod_mock.return_value = remote_pod_mock - with pytest.raises(AirflowException) as ctx: - context = self.create_context(k) + with pytest.raises(AirflowException, match=f"Pod {name_base}.[a-z0-9]+ returned a failure: .*"): + context = create_context(k) k.execute(context=context) - assert ( - str(ctx.value) - == f"Pod Launching failed: Pod {k.pod.metadata.name} returned a failure: {failed_pod_status}" - ) assert not self.client_mock.return_value.read_namespaced_pod.called - def test_no_need_to_describe_pod_on_success(self): + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.follow_container_logs") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_container_completion") + def test_no_handle_failure_on_success(self, await_container_mock, follow_container_mock): name_base = "test" k = KubernetesPodOperator( @@ -507,12 +533,16 @@ def test_no_need_to_describe_pod_on_success(self): do_xcom_push=False, cluster_context="default", ) - self.monitor_mock.return_value = (State.SUCCESS, None, None) - context = self.create_context(k) - k.execute(context=context) + follow_container_mock.return_value = None + remote_pod_mock = MagicMock() + remote_pod_mock.status.phase = 'Succeeded' + self.await_pod_mock.return_value = remote_pod_mock - assert not self.client_mock.return_value.read_namespaced_pod.called + context = create_context(k) + + # assert does not raise + k.execute(context=context) def test_create_with_affinity(self): name_base = "test" @@ -544,7 +574,7 @@ def test_create_with_affinity(self): affinity=affinity, ) - pod = k.create_pod_request_obj() + pod = k.build_pod_request_obj(create_context(k)) sanitized_pod = self.sanitize_for_serialization(pod) assert isinstance(pod.spec.affinity, k8s.V1Affinity) assert sanitized_pod["spec"]["affinity"] == affinity @@ -578,7 +608,7 @@ def test_create_with_affinity(self): affinity=k8s_api_affinity, ) - pod = k.create_pod_request_obj() + pod = k.build_pod_request_obj(create_context(k)) sanitized_pod = self.sanitize_for_serialization(pod) assert isinstance(pod.spec.affinity, k8s.V1Affinity) assert sanitized_pod["spec"]["affinity"] == affinity @@ -602,7 +632,7 @@ def test_tolerations(self): tolerations=tolerations, ) - pod = k.create_pod_request_obj() + pod = k.build_pod_request_obj(create_context(k)) sanitized_pod = self.sanitize_for_serialization(pod) assert isinstance(pod.spec.tolerations[0], k8s.V1Toleration) assert sanitized_pod["spec"]["tolerations"] == tolerations @@ -621,7 +651,7 @@ def test_tolerations(self): tolerations=k8s_api_tolerations, ) - pod = k.create_pod_request_obj() + pod = k.build_pod_request_obj(create_context(k)) sanitized_pod = self.sanitize_for_serialization(pod) assert isinstance(pod.spec.tolerations[0], k8s.V1Toleration) assert sanitized_pod["spec"]["tolerations"] == tolerations @@ -643,7 +673,7 @@ def test_node_selector(self): node_selector=node_selector, ) - pod = k.create_pod_request_obj() + pod = k.build_pod_request_obj(create_context(k)) sanitized_pod = self.sanitize_for_serialization(pod) assert isinstance(pod.spec.node_selector, dict) assert sanitized_pod["spec"]["nodeSelector"] == node_selector @@ -666,12 +696,16 @@ def test_node_selector(self): node_selectors=node_selector, ) - pod = k.create_pod_request_obj() + pod = k.build_pod_request_obj(create_context(k)) sanitized_pod = self.sanitize_for_serialization(pod) assert isinstance(pod.spec.node_selector, dict) assert sanitized_pod["spec"]["nodeSelector"] == node_selector - def test_push_xcom_pod_info(self): + @mock.patch('airflow.kubernetes.pod_generator.PodGenerator.make_unique_pod_id') + @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.extract_xcom') + def test_push_xcom_pod_info(self, extract_xcom_mock, make_unique_pod_id): + extract_xcom_mock.return_value = '{}' + make_unique_pod_id.return_value = 'test-pod-a1b2c3' k = KubernetesPodOperator( namespace="default", image="ubuntu:16.04", @@ -679,7 +713,7 @@ def test_push_xcom_pod_info(self): name="test", task_id="task", in_cluster=False, - do_xcom_push=False, + do_xcom_push=True, ) pod = self.run_pod(k) ti = TaskInstance(task=k, run_id=IN_MEMORY_DAGRUN_ID) @@ -719,8 +753,10 @@ def test_mark_created_pod_if_not_deleted(self, mock_patch_already_checked, mock_ task_id="task", is_delete_operator_pod=False, ) - self.monitor_mock.return_value = (State.FAILED, None, None) - context = self.create_context(k) + remote_pod_mock = MagicMock() + remote_pod_mock.status.phase = 'Failed' + self.await_pod_mock.return_value = remote_pod_mock + context = create_context(k) with pytest.raises(AirflowException): k.execute(context=context) mock_patch_already_checked.assert_called_once() @@ -742,8 +778,8 @@ def test_mark_created_pod_if_not_deleted_during_exception( task_id="task", is_delete_operator_pod=False, ) - self.monitor_mock.side_effect = AirflowException("oops") - context = self.create_context(k) + self.await_pod_mock.side_effect = AirflowException("oops") + context = create_context(k) with pytest.raises(AirflowException): k.execute(context=context) mock_patch_already_checked.assert_called_once() @@ -751,8 +787,8 @@ def test_mark_created_pod_if_not_deleted_during_exception( @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") @mock.patch( - "airflow.providers.cncf.kubernetes.operators.kubernetes_pod" - ".KubernetesPodOperator.patch_already_checked" + "airflow.providers.cncf.kubernetes.operators." + "kubernetes_pod.KubernetesPodOperator.patch_already_checked" ) def test_mark_reattached_pod_if_not_deleted(self, mock_patch_already_checked, mock_delete_pod): """If we aren't deleting pods and have a failure, mark it so we don't reattach to it""" @@ -763,17 +799,21 @@ def test_mark_reattached_pod_if_not_deleted(self, mock_patch_already_checked, mo task_id="task", is_delete_operator_pod=False, ) - # Run it first to easily get the pod - pod = self.run_pod(k) - - # Now try and "reattach" - mock_patch_already_checked.reset_mock() - mock_delete_pod.reset_mock() - self.client_mock.return_value.list_namespaced_pod.return_value.items = [pod] - self.monitor_mock.return_value = (State.FAILED, None, None) + remote_pod_mock = MagicMock() + remote_pod_mock.status.phase = 'Failed' + self.await_pod_mock.return_value = remote_pod_mock - context = self.create_context(k) + context = create_context(k) with pytest.raises(AirflowException): k.execute(context=context) mock_patch_already_checked.assert_called_once() mock_delete_pod.assert_not_called() + + +def test__suppress(): + with mock.patch('logging.Logger.error') as mock_error: + + with _suppress(ValueError): + raise ValueError("failure") + + mock_error.assert_called_once_with("failure", exc_info=True) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py index eaef295d37e3e..67c7d4dc310d7 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py @@ -14,8 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import unittest from unittest import mock +from unittest.mock import MagicMock import pendulum import pytest @@ -23,18 +23,18 @@ from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException -from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher, PodStatus +from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher, PodStatus, container_is_running -class TestPodLauncher(unittest.TestCase): - def setUp(self): +class TestPodLauncher: + def setup_method(self): self.mock_kube_client = mock.Mock() self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client) def test_read_pod_logs_successfully_returns_logs(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs - logs = self.pod_launcher.read_pod_logs(mock.sentinel) + logs = self.pod_launcher.read_pod_logs(pod=mock.sentinel, container_name='base') assert mock.sentinel.logs == logs def test_read_pod_logs_retries_successfully(self): @@ -43,7 +43,7 @@ def test_read_pod_logs_retries_successfully(self): BaseHTTPError('Boom'), mock.sentinel.logs, ] - logs = self.pod_launcher.read_pod_logs(mock.sentinel) + logs = self.pod_launcher.read_pod_logs(pod=mock.sentinel, container_name='base') assert mock.sentinel.logs == logs self.mock_kube_client.read_namespaced_pod_log.assert_has_calls( [ @@ -74,12 +74,12 @@ def test_read_pod_logs_retries_fails(self): BaseHTTPError('Boom'), ] with pytest.raises(BaseHTTPError): - self.pod_launcher.read_pod_logs(mock.sentinel) + self.pod_launcher.read_pod_logs(pod=mock.sentinel, container_name='base') def test_read_pod_logs_successfully_with_tail_lines(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs] - logs = self.pod_launcher.read_pod_logs(mock.sentinel, tail_lines=100) + logs = self.pod_launcher.read_pod_logs(pod=mock.sentinel, container_name='base', tail_lines=100) assert mock.sentinel.logs == logs self.mock_kube_client.read_namespaced_pod_log.assert_has_calls( [ @@ -98,7 +98,7 @@ def test_read_pod_logs_successfully_with_tail_lines(self): def test_read_pod_logs_successfully_with_since_seconds(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs] - logs = self.pod_launcher.read_pod_logs(mock.sentinel, since_seconds=2) + logs = self.pod_launcher.read_pod_logs(mock.sentinel, 'base', since_seconds=2) assert mock.sentinel.logs == logs self.mock_kube_client.read_namespaced_pod_log.assert_has_calls( [ @@ -186,7 +186,7 @@ def pod_state_gen(): self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen() self.mock_kube_client.read_namespaced_pod_log.return_value = iter(()) - self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True) + self.pod_launcher.follow_container_logs(mock.sentinel, 'base') def test_monitor_pod_logs_failures_non_fatal(self): mock.sentinel.metadata = mock.MagicMock() @@ -209,7 +209,7 @@ def pod_log_gen(): self.mock_kube_client.read_namespaced_pod_log.side_effect = pod_log_gen() - self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True) + self.pod_launcher.follow_container_logs(mock.sentinel, 'base') def test_read_pod_retries_fails(self): mock.sentinel.metadata = mock.MagicMock() @@ -224,13 +224,13 @@ def test_read_pod_retries_fails(self): def test_parse_log_line(self): log_message = "This should return no timestamp" timestamp, line = self.pod_launcher.parse_log_line(log_message) - self.assertEqual(timestamp, None) - self.assertEqual(line, log_message) + assert timestamp is None + assert line == log_message real_timestamp = "2020-10-08T14:16:17.793417674Z" timestamp, line = self.pod_launcher.parse_log_line(" ".join([real_timestamp, log_message])) - self.assertEqual(timestamp, pendulum.parse(real_timestamp)) - self.assertEqual(line, log_message) + assert timestamp == pendulum.parse(real_timestamp) + assert line == log_message with pytest.raises(Exception): self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalidmessage\n') @@ -241,14 +241,14 @@ def test_start_pod_retries_on_409_error(self, mock_run_pod_async): ApiException(status=409), mock.MagicMock(), ] - self.pod_launcher.start_pod(mock.sentinel) + self.pod_launcher.create_pod(mock.sentinel) assert mock_run_pod_async.call_count == 2 @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async") def test_start_pod_fails_on_other_exception(self, mock_run_pod_async): mock_run_pod_async.side_effect = [ApiException(status=504)] with pytest.raises(ApiException): - self.pod_launcher.start_pod(mock.sentinel) + self.pod_launcher.create_pod(mock.sentinel) @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async") def test_start_pod_retries_three_times(self, mock_run_pod_async): @@ -259,31 +259,83 @@ def test_start_pod_retries_three_times(self, mock_run_pod_async): ApiException(status=409), ] with pytest.raises(ApiException): - self.pod_launcher.start_pod(mock.sentinel) + self.pod_launcher.create_pod(mock.sentinel) assert mock_run_pod_async.call_count == 3 - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.pod_not_started") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async") - def test_start_pod_raises_informative_error_on_timeout(self, mock_run_pod_async, mock_pod_not_started): + def test_start_pod_raises_informative_error_on_timeout(self): pod_response = mock.MagicMock() - pod_response.status.start_time = None - mock_run_pod_async.return_value = pod_response - mock_pod_not_started.return_value = True + pod_response.status.phase = 'Pending' + self.mock_kube_client.read_namespaced_pod.return_value = pod_response expected_msg = "Check the pod events in kubernetes" + mock_pod = MagicMock() with pytest.raises(AirflowException, match=expected_msg): - self.pod_launcher.start_pod( - pod=mock.sentinel, + self.pod_launcher.await_pod_start( + pod=mock_pod, startup_timeout=0, ) - def test_base_container_is_running_none_event(self): - event = mock.MagicMock() - event_status = mock.MagicMock() - event_status.status = None - event_container_statuses = mock.MagicMock() - event_container_statuses.status = mock.MagicMock() - event_container_statuses.status.container_statuses = None - for e in [event, event_status, event_container_statuses]: - self.pod_launcher.read_pod = mock.MagicMock(return_value=e) - assert self.pod_launcher.base_container_is_running(None) is False + @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_launcher.container_is_running') + def test_container_is_running(self, container_is_running_mock): + mock_pod = MagicMock() + self.pod_launcher.read_pod = mock.MagicMock(return_value=mock_pod) + self.pod_launcher.container_is_running(None, 'base') + container_is_running_mock.assert_called_with(pod=mock_pod, container_name='base') + + +def params_for_test_container_is_running(): + """The `container_is_running` method is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This function + emits params used in `test_container_is_running` to verify this behavior. + + We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, + tests like `e.hello is not None` are always True. + """ + + class RemotePodMock: + pass + + class ContainerStatusMock: + def __init__(self, name): + self.name = name + + def remote_pod(running=None, not_running=None): + e = RemotePodMock() + e.status = RemotePodMock() + e.status.container_statuses = [] + for r in not_running or []: + e.status.container_statuses.append(container(r, False)) + for r in running or []: + e.status.container_statuses.append(container(r, True)) + return e + + def container(name, running): + c = ContainerStatusMock(name) + c.state = RemotePodMock() + c.state.running = {'a': 'b'} if running else None + return c + + pod_mock_list = [] + pod_mock_list.append(pytest.param(None, False, id='None remote_pod')) + p = RemotePodMock() + p.status = None + pod_mock_list.append(pytest.param(p, False, id='None remote_pod.status')) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = [] + pod_mock_list.append(pytest.param(p, False, id='empty remote_pod.status.container_statuses')) + pod_mock_list.append(pytest.param(remote_pod(), False, id='filter empty')) + pod_mock_list.append(pytest.param(remote_pod(None, ['base']), False, id='filter 0 running')) + pod_mock_list.append(pytest.param(remote_pod(['hello'], ['base']), False, id='filter 1 not running')) + pod_mock_list.append(pytest.param(remote_pod(['base'], ['hello']), True, id='filter 1 running')) + return pod_mock_list + + +@pytest.mark.parametrize('remote_pod, result', params_for_test_container_is_running()) +def test_container_is_running(remote_pod, result): + """The `container_is_running` function is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This test + verifies the expected behavior.""" + assert container_is_running(remote_pod, 'base') is result From 54dab88333160327d18f76e71bcc89c64d8e4a14 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sat, 18 Dec 2021 12:13:26 -0800 Subject: [PATCH 02/25] Update airflow/providers/cncf/kubernetes/utils/pod_launcher.py Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- airflow/providers/cncf/kubernetes/utils/pod_launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index ab56d25cdfaa9..55a406dfe789d 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -329,6 +329,6 @@ def _exec_pod_command(self, resp, command: str) -> Optional[str]: if resp.peek_stdout(): return resp.read_stdout() if resp.peek_stderr(): - self.log.info(resp.read_stderr()) + self.log.info("stderr from command: %s", resp.read_stderr()) break return None From 74c0b94dd2847f31ce3f548f684494f4e67e1a20 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sat, 18 Dec 2021 12:15:40 -0800 Subject: [PATCH 03/25] fixup! Refactor KubernetesPodOperator for clarity --- airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index f0dcd2292c807..f8cd7f76c2c5a 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -393,7 +393,7 @@ def await_pod_start(self, pod): def extract_xcom(self, pod): """Retrieves xcom value and kills xcom sidecar container""" result = self.launcher.extract_xcom(pod) - self.log.info(result) + self.log.info("xcom result: \n%s", result) return json.loads(result) def execute(self, context): From 215e51ae88c6ac59c08639fe1e064156cf62adc7 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 20 Dec 2021 09:32:37 -0800 Subject: [PATCH 04/25] Apply suggestions from code review Co-authored-by: Bas Harenslak --- .../providers/cncf/kubernetes/operators/kubernetes_pod.py | 4 ++-- airflow/providers/cncf/kubernetes/utils/pod_launcher.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index f8cd7f76c2c5a..36b652dda6aac 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -448,7 +448,7 @@ def cleanup(self, pod, remote_pod): def process_pod_deletion(self, pod): if self.is_delete_operator_pod: - self.log.info("deleting pod: %s", pod.metadata.name) + self.log.info("Deleting pod: %s", pod.metadata.name) self.launcher.delete_pod(pod) else: self.log.info("skipping deleting pod: %s", pod.metadata.name) @@ -563,7 +563,7 @@ def build_pod_request_obj(self, context): pod = xcom_sidecar.add_xcom_sidecar(pod) labels = self._create_labels_for_pod(context) - self.log.info("creating pod %s with labels: %s", pod.metadata.name, labels) + self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels) # Merge Pod Identifying labels with labels passed to operator pod.metadata.labels.update(labels) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index 55a406dfe789d..a9387b02d1027 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -189,7 +189,7 @@ def follow_container_logs(self, pod: V1Pod, container_name: str): exc_info=True, ) - if container_stopped is True: + if container_stopped: break if last_log_time: @@ -244,7 +244,7 @@ def parse_log_line(self, line: str) -> Tuple[Optional[Union[Date, Time, DateTime return None, line return last_log_time, message - def container_is_running(self, pod: V1Pod, container_name) -> bool: + def container_is_running(self, pod: V1Pod, container_name: str) -> bool: """Reads pod and checks if container is running""" remote_pod = self.read_pod(pod) return container_is_running(pod=remote_pod, container_name=container_name) @@ -299,7 +299,7 @@ def read_pod(self, pod: V1Pod) -> V1Pod: raise AirflowException(f'There was an error reading the kubernetes API: {e}') def extract_xcom(self, pod: V1Pod) -> str: - """Retrieves xcom value using xcom value and kills xcom sidecar container""" + """Retrieves XCom value and kills xcom sidecar container""" with closing( kubernetes_stream( self._client.connect_get_namespaced_pod_exec, From 9508b0074c8025e47b49ebcd650da87438141118 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 20 Dec 2021 09:41:46 -0800 Subject: [PATCH 05/25] simplify try_number comparison logging --- .../cncf/kubernetes/operators/kubernetes_pod.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 36b652dda6aac..38087acc4cc55 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -368,8 +368,13 @@ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: raise AirflowException(f'More than one pod running with labels {label_selector}') elif num_pods == 1: pod = pod_list[0] - self.log.info("Found matching pod %s", pod.metadata.name) - self._compare_try_numbers(context, pod) + self.log.info("Found matching pod %s with labels %s", pod.metadata.name, pod.metadata.labels) + if not pod.metadata.labels['try_number'] == context['ti'].try_number: + self.log.info( + "`try_number` of current task instance is %s but pod has `try_number` %s", + context['ti'].try_number, + pod.metadata.labels['try_number'], + ) return pod def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context): @@ -459,14 +464,6 @@ def _get_pod_identifying_label_string(self, labels) -> str: ] return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True' - def _compare_try_numbers(self, context, pod): - tries_match = pod.metadata.labels['try_number'] == context['ti'].try_number - self.log.info( - "found a running pod with labels %s %s try_number.", - pod.metadata.labels, - "and the same" if tries_match else "but a different", - ) - def _set_name(self, name): if name is None: if self.pod_template_file or self.full_pod_spec: From 0e99629eb120dfa07c62e37dec715bec25657349 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 20 Dec 2021 09:42:23 -0800 Subject: [PATCH 06/25] add docstring --- airflow/providers/cncf/kubernetes/utils/pod_launcher.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index a9387b02d1027..d174c6bf704b0 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -63,6 +63,10 @@ class PodStatus: def container_is_running(pod: V1Pod, container_name: str) -> bool: + """ + Examines V1Pod ``pod`` to determine whether ``container_name`` is running. + If that container is present and running, returns True. Returns False otherwise. + """ container_statuses = pod.status.container_statuses if pod and pod.status else None if not container_statuses: return False From 730cd48757f01fc3639bd343b654b9bcd2b07273 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 20 Dec 2021 09:44:05 -0800 Subject: [PATCH 07/25] refactor follow_container_logs method --- .../cncf/kubernetes/utils/pod_launcher.py | 51 +++++++++++-------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index d174c6bf704b0..607792b267222 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. """Launches PODs""" +import datetime import json import math import time @@ -165,27 +166,33 @@ def follow_container_logs(self, pod: V1Pod, container_name: str): """ Follows the logs of container and streams to airflow logging. Returns when container exits. + + .. note:: ``read_pod_logs`` follows the logs, so we shouldn't necessarily *need* to loop + as we do here. But in a long-running process we might temporarily lose connectivity. + So the looping logic is there to let us resume following the logs. """ - container_stopped = False - read_logs_since_sec = None - last_log_time = None - # `read_pod_logs` follows the logs so we shouldn't necessarily _need_ to loop - # but in a long-running process we might lose connectivity and this way we - # can resume following the logs - while True: + def follow_logs(since_seconds: int = None) -> Optional[datetime.datetime]: + """ + Tries to follow container logs until container completes. + For a long-running container, sometimes the log read may be interrupted + Such errors of this kind are suppressed. + + Returns the last timestamp observed in logs. + """ try: logs = self.read_pod_logs( pod=pod, container_name=container_name, timestamps=True, - since_seconds=read_logs_since_sec, + since_seconds=since_seconds, ) + timestamp = None for line in logs: # type: bytes timestamp, message = self.parse_log_line(line.decode('utf-8')) self.log.info(message) - if timestamp: - last_log_time = timestamp + if timestamp: + return timestamp except BaseHTTPError: # Catches errors like ProtocolError(TimeoutError). self.log.warning( 'Failed to read logs for pod %s', @@ -193,20 +200,24 @@ def follow_container_logs(self, pod: V1Pod, container_name: str): exc_info=True, ) - if container_stopped: - break - + def get_since_seconds(last_log_time: datetime.datetime) -> int: + """Calculates number of seconds since ``last_log_time``""" if last_log_time: delta = pendulum.now() - last_log_time - read_logs_since_sec = math.ceil(delta.total_seconds()) + return math.ceil(delta.total_seconds()) - time.sleep(1) - - if self.container_is_running(pod, container_name=container_name): - self.log.info('Container %s is running', pod.metadata.name) - self.log.warning('Pod %s log read interrupted', pod.metadata.name) + last_log_time = None + while True: + last_log_time = follow_logs(since_seconds=get_since_seconds(last_log_time)) + if not self.container_is_running(pod, container_name=container_name): + return else: - container_stopped = True # fetch logs once more and exit + self.log.warning( + 'Pod %s log read interrupted but container %s still running', + pod.metadata.name, + container_name, + ) + time.sleep(1) def await_container_completion(self, pod: V1Pod, container_name: str) -> None: while not self.container_is_running(pod=pod, container_name=container_name): From 9815cbfa22a7648a9567bab4e6a0719bd4bdf706 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 20 Dec 2021 09:46:06 -0800 Subject: [PATCH 08/25] type hint v1pod --- airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 38087acc4cc55..a993d9c4692d7 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -436,7 +436,7 @@ def execute(self, context): ti.xcom_push(key='pod_namespace', value=remote_pod.metadata.namespace) return result - def cleanup(self, pod, remote_pod): + def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): with _suppress(Exception): self.process_pod_deletion(pod) From efb5320015e28f268420f4266fcdd122edd50e59 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 20 Dec 2021 09:52:04 -0800 Subject: [PATCH 09/25] simplify try_number logging --- .../providers/cncf/kubernetes/operators/kubernetes_pod.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index a993d9c4692d7..4ab275037248f 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -369,12 +369,8 @@ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: elif num_pods == 1: pod = pod_list[0] self.log.info("Found matching pod %s with labels %s", pod.metadata.name, pod.metadata.labels) - if not pod.metadata.labels['try_number'] == context['ti'].try_number: - self.log.info( - "`try_number` of current task instance is %s but pod has `try_number` %s", - context['ti'].try_number, - pod.metadata.labels['try_number'], - ) + self.log.info("`try_number` of task_instance: %s", context['ti'].try_number) + self.log.info("`try_number` of pod: %s", pod.metadata.labels['try_number']) return pod def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context): From b1b801705c2b5974315f964d6636566f9315bcd7 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 20 Dec 2021 16:01:23 -0800 Subject: [PATCH 10/25] push pod name and namespace always --- .../providers/cncf/kubernetes/operators/kubernetes_pod.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 4ab275037248f..db8e34f23c5fc 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -425,11 +425,10 @@ def execute(self, context): pod=self.pod or self.pod_request_obj, remote_pod=remote_pod, ) + ti = context['ti'] + ti.xcom_push(key='pod_name', value=self.pod.metadata.name) + ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace) if self.do_xcom_push: - ti = context['ti'] - if remote_pod: - ti.xcom_push(key='pod_name', value=remote_pod.metadata.name) - ti.xcom_push(key='pod_namespace', value=remote_pod.metadata.namespace) return result def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): From 041e477e8f0cbadd3a2b25695b742c34af6d0301 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 20 Dec 2021 16:12:40 -0800 Subject: [PATCH 11/25] fix xcom push test --- .../operators/test_kubernetes_pod.py | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index f2a7e7a7dc85e..8559811d77cf4 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import unittest +import uuid from tempfile import NamedTemporaryFile from unittest import mock from unittest.mock import MagicMock @@ -26,6 +26,7 @@ from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun, TaskInstance from airflow.models.xcom import IN_MEMORY_DAGRUN_ID +from airflow.operators.dummy import DummyOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress from airflow.utils import timezone @@ -34,8 +35,8 @@ def create_context(task): dag = DAG(dag_id="dag") - task_instance = TaskInstance(task=task, run_id="kub_pod_test") - task_instance.dag_run = DagRun(run_id="kub_pod_test", execution_date=DEFAULT_DATE) + task_instance = TaskInstance(task=task, run_id=IN_MEMORY_DAGRUN_ID) + task_instance.dag_run = DagRun(run_id=IN_MEMORY_DAGRUN_ID) return { "dag": dag, "ts": DEFAULT_DATE.isoformat(), @@ -45,32 +46,35 @@ def create_context(task): } -class TestKubernetesPodOperator(unittest.TestCase): - def setUp(self): - self.create_pod_patch = mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod" - ) - self.await_pod_patch = mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_start" - ) - self.await_pod_completion_patch = mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion" - ) +POD_LAUNCHER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher" +POD_GENERATOR_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_generator.PodGenerator" + + +class TestKubernetesPodOperator: + def setup_method(self): + self.create_pod_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.create_pod") + self.await_pod_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.await_pod_start") + self.await_pod_completion_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.await_pod_completion") + self.unique_pod_id_patch = mock.patch(f"{POD_GENERATOR_CLASS}.make_unique_pod_id") + self.unique_pod_id_patch.return_value = str(uuid.uuid4()) self.client_patch = mock.patch("airflow.kubernetes.kube_client.get_kube_client") self.create_mock = self.create_pod_patch.start() self.await_start_mock = self.await_pod_patch.start() self.await_pod_mock = self.await_pod_completion_patch.start() self.client_mock = self.client_patch.start() - self.addCleanup(self.create_pod_patch.stop) - self.addCleanup(self.await_pod_patch.stop) - self.addCleanup(self.await_pod_completion_patch.stop) - self.addCleanup(self.client_patch.stop) + + def teardown_method(self): + self.create_pod_patch.stop() + self.await_pod_patch.stop() + self.await_pod_completion_patch.stop() + self.client_patch.stop() + self.unique_pod_id_patch.stop() @staticmethod def create_context(task): dag = DAG(dag_id="dag") task_instance = TaskInstance(task=task, run_id=IN_MEMORY_DAGRUN_ID) - task_instance.dag_run = DagRun(run_id=IN_MEMORY_DAGRUN_ID, execution_date=DEFAULT_DATE) + task_instance.dag_run = DagRun(run_id=IN_MEMORY_DAGRUN_ID) return { "dag": dag, "ts": DEFAULT_DATE.isoformat(), @@ -81,11 +85,8 @@ def create_context(task): def run_pod(self, operator) -> k8s.V1Pod: context = create_context(operator) - pod_request_obj = operator.build_pod_request_obj(context) remote_pod_mock = MagicMock() remote_pod_mock.status.phase = 'Succeeded' - remote_pod_mock.metadata.name = pod_request_obj.metadata.name - remote_pod_mock.metadata.namespace = pod_request_obj.metadata.namespace self.await_pod_mock.return_value = remote_pod_mock operator.execute(context=context) return self.await_start_mock.call_args[1]['pod'] @@ -701,11 +702,11 @@ def test_node_selector(self): assert isinstance(pod.spec.node_selector, dict) assert sanitized_pod["spec"]["nodeSelector"] == node_selector - @mock.patch('airflow.kubernetes.pod_generator.PodGenerator.make_unique_pod_id') - @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.extract_xcom') - def test_push_xcom_pod_info(self, extract_xcom_mock, make_unique_pod_id): - extract_xcom_mock.return_value = '{}' - make_unique_pod_id.return_value = 'test-pod-a1b2c3' + @pytest.mark.parametrize('do_xcom_push', [True, False]) + @mock.patch(f"{POD_LAUNCHER_CLASS}.extract_xcom") + def test_push_xcom_pod_info(self, extract_xcom, do_xcom_push): + """pod name and namespace are *always* pushed; do_xcom_push only controls xcom sidecar""" + extract_xcom.return_value = '{}' k = KubernetesPodOperator( namespace="default", image="ubuntu:16.04", @@ -713,11 +714,12 @@ def test_push_xcom_pod_info(self, extract_xcom_mock, make_unique_pod_id): name="test", task_id="task", in_cluster=False, - do_xcom_push=True, + do_xcom_push=do_xcom_push, ) pod = self.run_pod(k) - ti = TaskInstance(task=k, run_id=IN_MEMORY_DAGRUN_ID) - ti.dag_run = DagRun(run_id=IN_MEMORY_DAGRUN_ID, execution_date=DEFAULT_DATE) + other_task = DummyOperator(task_id='task_to_pull_xcom') + ti = TaskInstance(task=other_task, run_id=IN_MEMORY_DAGRUN_ID) + ti.dag_run = DagRun(run_id=IN_MEMORY_DAGRUN_ID) pod_name = ti.xcom_pull(task_ids=k.task_id, key='pod_name') pod_namespace = ti.xcom_pull(task_ids=k.task_id, key='pod_namespace') assert pod_name and pod_name == pod.metadata.name From 072332267616ff3140eed97a96ab41adb0fc50b9 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 21 Dec 2021 11:24:38 -0800 Subject: [PATCH 12/25] ensure timestamp is returned when read logs exits with exc --- .../cncf/kubernetes/utils/pod_launcher.py | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index 607792b267222..23896dc562a2f 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -15,12 +15,11 @@ # specific language governing permissions and limitations # under the License. """Launches PODs""" -import datetime import json import math import time from contextlib import closing -from datetime import datetime as dt +from datetime import datetime from typing import Iterable, Optional, Tuple, Union import pendulum @@ -147,13 +146,13 @@ def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None: (if pod is pending for too long, fails task) :return: """ - curr_time = dt.now() + curr_time = datetime.now() while True: remote_pod = self.read_pod(pod) if remote_pod.status.phase != PodStatus.PENDING: break self.log.warning("Pod not yet started: %s", pod.metadata.name) - delta = dt.now() - curr_time + delta = datetime.now() - curr_time if delta.total_seconds() >= startup_timeout: msg = ( f"Pod took longer than {startup_timeout} seconds to start. " @@ -172,7 +171,13 @@ def follow_container_logs(self, pod: V1Pod, container_name: str): So the looping logic is there to let us resume following the logs. """ - def follow_logs(since_seconds: int = None) -> Optional[datetime.datetime]: + def get_since_seconds(since_time: datetime) -> int: + """Calculates number of seconds since ``last_log_time``""" + if since_time: + delta = pendulum.now() - since_time + return math.ceil(delta.total_seconds()) + + def follow_logs(since_time: Optional[datetime] = None) -> Optional[datetime]: """ Tries to follow container logs until container completes. For a long-running container, sometimes the log read may be interrupted @@ -180,35 +185,28 @@ def follow_logs(since_seconds: int = None) -> Optional[datetime.datetime]: Returns the last timestamp observed in logs. """ + timestamp = None try: logs = self.read_pod_logs( pod=pod, container_name=container_name, timestamps=True, - since_seconds=since_seconds, + since_seconds=get_since_seconds(since_time), ) - timestamp = None for line in logs: # type: bytes timestamp, message = self.parse_log_line(line.decode('utf-8')) self.log.info(message) - if timestamp: - return timestamp except BaseHTTPError: # Catches errors like ProtocolError(TimeoutError). self.log.warning( 'Failed to read logs for pod %s', pod.metadata.name, exc_info=True, ) - - def get_since_seconds(last_log_time: datetime.datetime) -> int: - """Calculates number of seconds since ``last_log_time``""" - if last_log_time: - delta = pendulum.now() - last_log_time - return math.ceil(delta.total_seconds()) + return timestamp or since_time last_log_time = None while True: - last_log_time = follow_logs(since_seconds=get_since_seconds(last_log_time)) + last_log_time = follow_logs(since_time=last_log_time) if not self.container_is_running(pod, container_name=container_name): return else: From d803460c046862dd5364755568f7b014e7cc7cfe Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 21 Dec 2021 17:30:22 -0800 Subject: [PATCH 13/25] fix tests --- .../kubernetes/operators/test_kubernetes_pod.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index 8559811d77cf4..c152cb5b92e4d 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -14,7 +14,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import uuid from tempfile import NamedTemporaryFile from unittest import mock from unittest.mock import MagicMock @@ -47,7 +46,6 @@ def create_context(task): POD_LAUNCHER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher" -POD_GENERATOR_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_generator.PodGenerator" class TestKubernetesPodOperator: @@ -55,8 +53,6 @@ def setup_method(self): self.create_pod_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.create_pod") self.await_pod_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.await_pod_start") self.await_pod_completion_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.await_pod_completion") - self.unique_pod_id_patch = mock.patch(f"{POD_GENERATOR_CLASS}.make_unique_pod_id") - self.unique_pod_id_patch.return_value = str(uuid.uuid4()) self.client_patch = mock.patch("airflow.kubernetes.kube_client.get_kube_client") self.create_mock = self.create_pod_patch.start() self.await_start_mock = self.await_pod_patch.start() @@ -68,7 +64,6 @@ def teardown_method(self): self.await_pod_patch.stop() self.await_pod_completion_patch.stop() self.client_patch.stop() - self.unique_pod_id_patch.stop() @staticmethod def create_context(task): @@ -238,23 +233,24 @@ def test_pod_delete_even_on_launcher_error(self, delete_pod_mock): k.execute(context=context) assert delete_pod_mock.called - @parameterized.expand([[True], [False]]) - def test_provided_pod_name(self, randomize_name): + @pytest.mark.parametrize('randomize', [True, False]) + def test_provided_pod_name(self, randomize): name_base = "test" k = KubernetesPodOperator( namespace="default", image="ubuntu:16.04", name=name_base, - random_name_suffix=randomize_name, + random_name_suffix=randomize, task_id="task", in_cluster=False, do_xcom_push=False, cluster_context="default", ) - pod = k.build_pod_request_obj(create_context(k)) + context = create_context(k) + pod = k.build_pod_request_obj(context) - if randomize_name: + if randomize: assert pod.metadata.name.startswith(name_base) assert pod.metadata.name != name_base else: From ed568a5b8cbc5a295d0c9f2b55c7fbe89bda7183 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 22 Dec 2021 05:40:16 -0800 Subject: [PATCH 14/25] PodStatus -> PodPhase --- .../cncf/kubernetes/operators/kubernetes_pod.py | 4 ++-- .../providers/cncf/kubernetes/utils/pod_launcher.py | 11 +++++++---- .../cncf/kubernetes/utils/test_pod_launcher.py | 6 +++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index db8e34f23c5fc..32a808193bcb2 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -24,7 +24,7 @@ from kubernetes.client import CoreV1Api, models as k8s -from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLaunchFailedException, PodStatus +from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLaunchFailedException, PodPhase try: import airflow.utils.yaml as yaml @@ -436,7 +436,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): self.process_pod_deletion(pod) pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None - if pod_phase != PodStatus.SUCCEEDED: + if pod_phase != PodPhase.SUCCEEDED: if self.log_events_on_failure: with _suppress(Exception): for event in self.launcher.read_pod_events(pod).items: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index 23896dc562a2f..9b6dad2392bf7 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -51,8 +51,11 @@ def should_retry_start_pod(exception: Exception) -> bool: return False -class PodStatus: - """Status of the PODs""" +class PodPhase: + """ + Possible pod phases + See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase. + """ PENDING = 'Pending' RUNNING = 'Running' @@ -149,7 +152,7 @@ def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None: curr_time = datetime.now() while True: remote_pod = self.read_pod(pod) - if remote_pod.status.phase != PodStatus.PENDING: + if remote_pod.status.phase != PodPhase.PENDING: break self.log.warning("Pod not yet started: %s", pod.metadata.name) delta = datetime.now() - curr_time @@ -230,7 +233,7 @@ def await_pod_completion(self, pod: V1Pod) -> V1Pod: """ while True: remote_pod = self.read_pod(pod) - if remote_pod.status.phase in PodStatus.terminal_states: + if remote_pod.status.phase in PodPhase.terminal_states: break self.log.info('Pod %s has phase %s', pod.metadata.name, remote_pod.status.phase) time.sleep(2) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py index 67c7d4dc310d7..c02c025fcd870 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py @@ -23,7 +23,7 @@ from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException -from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher, PodStatus, container_is_running +from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher, PodPhase, container_is_running class TestPodLauncher: @@ -177,7 +177,7 @@ def test_monitor_pod_empty_logs(self): running_status = mock.MagicMock() running_status.configure_mock(**{'name': 'base', 'state.running': True}) pod_info_running = mock.MagicMock(**{'status.container_statuses': [running_status]}) - pod_info_succeeded = mock.MagicMock(**{'status.phase': PodStatus.SUCCEEDED}) + pod_info_succeeded = mock.MagicMock(**{'status.phase': PodPhase.SUCCEEDED}) def pod_state_gen(): yield pod_info_running @@ -193,7 +193,7 @@ def test_monitor_pod_logs_failures_non_fatal(self): running_status = mock.MagicMock() running_status.configure_mock(**{'name': 'base', 'state.running': True}) pod_info_running = mock.MagicMock(**{'status.container_statuses': [running_status]}) - pod_info_succeeded = mock.MagicMock(**{'status.phase': PodStatus.SUCCEEDED}) + pod_info_succeeded = mock.MagicMock(**{'status.phase': PodPhase.SUCCEEDED}) def pod_state_gen(): yield pod_info_running From 7a1de3a76d41f94afce4c4757cf7d558eacf31ba Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 22 Dec 2021 05:56:07 -0800 Subject: [PATCH 15/25] remove get_since_seconds inner function --- .../cncf/kubernetes/utils/pod_launcher.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index 9b6dad2392bf7..43c4ebe597efd 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -164,22 +164,16 @@ def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None: raise PodLaunchFailedException(msg) time.sleep(1) - def follow_container_logs(self, pod: V1Pod, container_name: str): + def follow_container_logs(self, pod: V1Pod, container_name: str) -> None: """ Follows the logs of container and streams to airflow logging. Returns when container exits. - .. note:: ``read_pod_logs`` follows the logs, so we shouldn't necessarily *need* to loop - as we do here. But in a long-running process we might temporarily lose connectivity. + .. note:: :meth:`read_pod_logs` follows the logs, so we shouldn't necessarily *need* to + loop as we do here. But in a long-running process we might temporarily lose connectivity. So the looping logic is there to let us resume following the logs. """ - def get_since_seconds(since_time: datetime) -> int: - """Calculates number of seconds since ``last_log_time``""" - if since_time: - delta = pendulum.now() - since_time - return math.ceil(delta.total_seconds()) - def follow_logs(since_time: Optional[datetime] = None) -> Optional[datetime]: """ Tries to follow container logs until container completes. @@ -194,7 +188,9 @@ def follow_logs(since_time: Optional[datetime] = None) -> Optional[datetime]: pod=pod, container_name=container_name, timestamps=True, - since_seconds=get_since_seconds(since_time), + since_seconds=( + math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None + ), ) for line in logs: # type: bytes timestamp, message = self.parse_log_line(line.decode('utf-8')) From 608e3584a36725137d44af8bd9acd756c49b9e26 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 22 Dec 2021 07:08:54 -0800 Subject: [PATCH 16/25] add note in CHANGELOG --- airflow/providers/cncf/kubernetes/CHANGELOG.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index d3cd9d28173a2..0daed7ceb819b 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -19,6 +19,19 @@ Changelog --------- +3.0.0 +..... + +Breaking changes +~~~~~~~~~~~~~~~~ + +* ``Simplify KubernetesPodOperator (#19572)`` + +.. warning:: Many methods in :class:`~.KubernetesPodOperator` and class:`~.PodLauncher` have been renamed. + If you have subclassed :class:`~.KubernetesPodOperator` will need to update your subclass to reflect + the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``. + + 2.2.0 ..... From 176bb5e9cc1d0ed34a55741cef06c0b586390ae2 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 22 Dec 2021 08:49:38 -0800 Subject: [PATCH 17/25] fix spelling; make build_pod_request_obj dry-runnable --- .../providers/cncf/kubernetes/operators/kubernetes_pod.py | 7 +++++-- docs/spelling_wordlist.txt | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 32a808193bcb2..253988e81acf5 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -316,13 +316,16 @@ def _render_nested_template_fields( super()._render_nested_template_fields(content, context, jinja_env, seen_oids) @staticmethod - def _create_labels_for_pod(context) -> dict: + def _create_labels_for_pod(context=None) -> dict: """ Generate labels for the pod to track the pod in case of Operator crash :param context: task context provided by airflow DAG :return: dict """ + if not context: + return {} + labels = { 'dag_id': context['dag'].dag_id, 'task_id': context['task'].task_id, @@ -485,7 +488,7 @@ def on_kill(self) -> None: kwargs.update(grace_period_seconds=self.termination_grace_period) self.client.delete_namespaced_pod(**kwargs) - def build_pod_request_obj(self, context): + def build_pod_request_obj(self, context=None): """ Creates a V1Pod based on user parameters. Note that a `pod` or `pod_template_file` will supersede all other values. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index b196744b5b241..5f630b52b5b72 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1297,6 +1297,7 @@ storedInfoType str stringified subchart +subclassed subclasses subclassing subcluster From f1f0ecc648ada873b498f3052f0ea3267c635453 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 27 Dec 2021 10:53:02 -0800 Subject: [PATCH 18/25] add test for pod label selector & clarify names --- .../kubernetes/operators/kubernetes_pod.py | 17 ++++++++--------- .../operators/test_kubernetes_pod.py | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 253988e81acf5..f44f402532b77 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -316,7 +316,7 @@ def _render_nested_template_fields( super()._render_nested_template_fields(content, context, jinja_env, seen_oids) @staticmethod - def _create_labels_for_pod(context=None) -> dict: + def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool = True) -> dict: """ Generate labels for the pod to track the pod in case of Operator crash @@ -330,8 +330,9 @@ def _create_labels_for_pod(context=None) -> dict: 'dag_id': context['dag'].dag_id, 'task_id': context['task'].task_id, 'execution_date': context['ts'], - 'try_number': context['ti'].try_number, } + if include_try_number: + labels.update(try_number=context['ti'].try_number) # In the case of sub dags this is just useful if context['dag'].is_subdag: labels['parent_dag_id'] = context['dag'].parent_dag.dag_id @@ -359,8 +360,7 @@ def client(self) -> CoreV1Api: def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: """Returns an already-running pod for this task instance if one exists.""" - labels = self._create_labels_for_pod(context) - label_selector = self._get_pod_identifying_label_string(labels) + label_selector = self._build_find_pod_label_selector(context) pod_list = self.client.list_namespaced_pod( namespace=namespace, label_selector=label_selector, @@ -456,10 +456,9 @@ def process_pod_deletion(self, pod): else: self.log.info("skipping deleting pod: %s", pod.metadata.name) - def _get_pod_identifying_label_string(self, labels) -> str: - label_strings = [ - f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number' - ] + def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str: + labels = self._get_ti_pod_labels(context, include_try_number=False) + label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())] return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True' def _set_name(self, name): @@ -557,7 +556,7 @@ def build_pod_request_obj(self, context=None): self.log.debug("Adding xcom sidecar to task %s", self.task_id) pod = xcom_sidecar.add_xcom_sidecar(pod) - labels = self._create_labels_for_pod(context) + labels = self._get_ti_pod_labels(context) self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels) # Merge Pod Identifying labels with labels passed to operator diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index c152cb5b92e4d..6b897592ca6dd 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -176,6 +176,25 @@ def test_labels(self): "execution_date": mock.ANY, } + def test_find_pod_labels(self): + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + cmds=["bash", "-cx"], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + ) + self.run_pod(k) + self.client_mock.return_value.list_namespaced_pod.assert_called_once() + _, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args + assert ( + kwargs['label_selector'] + == 'dag_id=dag,execution_date=2016-01-01T0100000000-26816529d,task_id=task,already_checked!=True' + ) + def test_image_pull_secrets_correctly_set(self): fake_pull_secrets = "fakeSecret" k = KubernetesPodOperator( From 3fb12e547f62a7dc6c3975e6d35a02e2b71e1605 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 27 Dec 2021 11:33:53 -0800 Subject: [PATCH 19/25] docstring for _suppress --- .../providers/cncf/kubernetes/operators/kubernetes_pod.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index f44f402532b77..4b9e383efcd92 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -574,8 +574,11 @@ def build_pod_request_obj(self, context=None): class _suppress(AbstractContextManager): """ - This behaves the same as contextlib.suppress but logs the suppressed + This behaves the same as ``contextlib.suppress`` but logs the suppressed exceptions as errors with traceback. + + The caught exception is also stored on the context manager instance under + attribute ``exception``. """ def __init__(self, *exceptions): From 1d2c51e3608319c6cc47f4c09e4cbd8301d84808 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 28 Dec 2021 10:26:30 -0800 Subject: [PATCH 20/25] we should not delete the pod prior to trying to read events --- .../providers/cncf/kubernetes/operators/kubernetes_pod.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 4b9e383efcd92..7fd93951a1a9b 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -435,9 +435,6 @@ def execute(self, context): return result def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): - with _suppress(Exception): - self.process_pod_deletion(pod) - pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None if pod_phase != PodPhase.SUCCEEDED: if self.log_events_on_failure: @@ -447,7 +444,12 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): if not self.is_delete_operator_pod: with _suppress(Exception): self.patch_already_checked(pod) + with _suppress(Exception): + self.process_pod_deletion(pod) raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}') + else: + with _suppress(Exception): + self.process_pod_deletion(pod) def process_pod_deletion(self, pod): if self.is_delete_operator_pod: From b3dd258c7ea79fddb2b6ee8a3adadefc6056f77f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 28 Dec 2021 10:28:38 -0800 Subject: [PATCH 21/25] add notes to changelog --- .../providers/cncf/kubernetes/CHANGELOG.rst | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index 0daed7ceb819b..27abd678545cd 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -31,6 +31,57 @@ Breaking changes If you have subclassed :class:`~.KubernetesPodOperator` will need to update your subclass to reflect the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``. +Notes on changes KubernetesPodOperator and PodLauncher +`````````````````````````````````````````````````````` + +Overview +======== + +Generally speaking if you did not subclass ``KubernetesPodOperator`` and you didn't use the ``PodLauncher`` class directly, +then you don't need to worry about this change. If however you have subclassed ``KubernetesPodOperator, what follows are some notes on the changes in this release. + +One of the principal goals of the refactor is to clearly separate the "get or create pod" and +"wait for pod completion" phases. Previously the "wait for pod completion" logic would be invoked +differently depending on whether the operator were to "attach to an existing pod" (e.g. after a +worker failure) or "create a new pod". With this refactor we encapsulate the "get or create" step +into method :meth:`~.KubernetesPodOperator.get_or_create_pod`. This method tries first to find an +existing pod using labels specific to the task instance (see :meth:`~.KubernetesPodOperator.find_pod`). +If one does not exist it :meth:`creates a pod <~.PodLauncher.create_pod>`. + +The "waiting" part of execution has three components. The first step is to wait for the pod to leave the +``Pending`` phase (:meth:`~.KubernetesPodOperator.await_pod_start`). Next, if configured to do so, +the operator will :meth:`follow the base container logs <~.KubernetesPodOperator.await_pod_start>` +and forward these logs to the task logger until the ``base`` container is done. If not configured to harvest the +logs, the operator will instead :meth:`poll for container completion until done <~.KubernetesPodOperator.await_container_completion>`; +either way, we must await container completion before harvesting xcom. After (optionally) extracting the xcom +value from the base container, we :meth:`await pod completion <~.PodLauncher.await_pod_completion>`. + +Previously, depending on whether the pod was "reattached to" (e.g. after a worker failure) or +created anew, the waiting logic may have occurred in either ``handle_pod_overlap`` or ``create_new_pod_for_operator``. + +After the pod terminates, we execute different cleanup tasks depending on whether the pod terminated successfully. + +If the pod terminates *unsuccessfully*, we attempt to :meth:`log the pod events <~.PodLauncher.read_pod_events>`. If +additionally the task is configured *not* to delete the pod after termination, :meth:`we apply a label <~.KubernetesPodOperator.patch_already_checked>` +indicating that the pod failed and should not be "reattached to" in a retry. If the task is configured +to delete its pod, we :meth:`delete it <~.KubernetesPodOperator.process_pod_deletion>`. Finally, +we raise an AirflowException to fail the task instance. + +If the pod terminates successfully, we :meth:`delete the pod <~.KubernetesPodOperator.process_pod_deletion>` +(if configured to delete the pod) and push XCom (if configured to push XCom). + +Details on method renames, refactors, and deletions +=================================================== + +* Method ``create_pod_launcher`` is converted to cached property ``launcher`` +* Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client`` +* Logic to search for an existing pod (e.g. after an airflow worker failure) is moved out of ``execute`` and into method ``find_pod``. +* Method ``handle_pod_overlap`` is removed. Previously it monitored a "found" pod until completion. With this change the pod monitoring (and log following) is orchestrated directly from ``execute`` and it is the same whether it's a "found" pod or a "new" pod. See methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. +* Method ``create_pod_request_obj`` is renamed ``build_pod_request_obj``. It now takes argument ``context`` in order to add TI-specific pod labels; previously they were added after return. +* Method ``create_labels_for_pod`` is renamed ``_get_ti_pod_labels``. This method doesn't return *all* labels, but only those specific to the TI. We also add parameter ``include_try_number`` to control the inclusion of this label instead of possibly filtering it out later. +* Method ``_get_pod_identifying_label_string`` is renamed ``_build_find_pod_label_selector`` +* Method ``_try_numbers_match`` is removed. +* Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc. Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``. Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. 2.2.0 ..... From ee65fc6b13f5515cc31c7fc9a23a6ed658d1fc7b Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 28 Dec 2021 10:37:55 -0800 Subject: [PATCH 22/25] docs fixup --- airflow/providers/cncf/kubernetes/CHANGELOG.rst | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index 27abd678545cd..daecc592bfe23 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -43,9 +43,13 @@ then you don't need to worry about this change. If however you have subclassed One of the principal goals of the refactor is to clearly separate the "get or create pod" and "wait for pod completion" phases. Previously the "wait for pod completion" logic would be invoked differently depending on whether the operator were to "attach to an existing pod" (e.g. after a -worker failure) or "create a new pod". With this refactor we encapsulate the "get or create" step -into method :meth:`~.KubernetesPodOperator.get_or_create_pod`. This method tries first to find an -existing pod using labels specific to the task instance (see :meth:`~.KubernetesPodOperator.find_pod`). +worker failure) or "create a new pod" and this resulted in some code duplication and a bit more +nesting of logic. With this refactor we encapsulate the "get or create" step +into method :meth:`~.KubernetesPodOperator.get_or_create_pod`, and pull the monitoring and XCom logic up +into the top level of ``execute`` because it can be the same for "attached" pods and "new" pods. + +:meth:`~.KubernetesPodOperator.get_or_create_pod` tries first to find an existing pod using labels +specific to the task instance (see :meth:`~.KubernetesPodOperator.find_pod`). If one does not exist it :meth:`creates a pod <~.PodLauncher.create_pod>`. The "waiting" part of execution has three components. The first step is to wait for the pod to leave the From 6eeb09484b98fa30cbb5bc35b6a5310c8d87d132 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 28 Dec 2021 12:23:04 -0800 Subject: [PATCH 23/25] fix docs --- .../providers/cncf/kubernetes/CHANGELOG.rst | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index daecc592bfe23..978a293ab60ce 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -35,7 +35,7 @@ Notes on changes KubernetesPodOperator and PodLauncher `````````````````````````````````````````````````````` Overview -======== +'''''''' Generally speaking if you did not subclass ``KubernetesPodOperator`` and you didn't use the ``PodLauncher`` class directly, then you don't need to worry about this change. If however you have subclassed ``KubernetesPodOperator, what follows are some notes on the changes in this release. @@ -75,7 +75,9 @@ If the pod terminates successfully, we :meth:`delete the pod <~.KubernetesPodOpe (if configured to delete the pod) and push XCom (if configured to push XCom). Details on method renames, refactors, and deletions -=================================================== +''''''''''''''''''''''''''''''''''''''''''''''''''' + +In ``KubernetesPodOperator``: * Method ``create_pod_launcher`` is converted to cached property ``launcher`` * Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client`` @@ -87,6 +89,19 @@ Details on method renames, refactors, and deletions * Method ``_try_numbers_match`` is removed. * Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc. Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``. Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. +In ``pod_launcher.py``, in class ``PodLauncher``: + +* Method ``start_pod`` is removed and split into two methods: ``create_pod`` and ``await_pod_start``. +* Method ``monitor_pod`` is removed and split into methods ``follow_container_logs``, ``await_container_completion``, ``await_pod_completion`` +* Methods ``pod_not_started``, ``pod_is_running``, ``process_status``, and ``_task_status`` are removed. These were needed due to the way in which pod ``phase`` was mapped to task instance states; but we no longer do such a mapping and instead deal with pod phases directly and untransformed. +* Method ``_extract_xcom`` is renamed ``extract_xcom``. +* Method ``read_pod_logs`` now takes kwarg ``container_name`` + + +Other changes in ``pod_launcher.py``: + +* Enum-like class ``PodStatus`` is renamed ``PodPhase``, and the values are no longer lower-cased. + 2.2.0 ..... From 0cb521d1b0af69e459ecce80d2f546d9866d15d4 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 28 Dec 2021 13:17:45 -0800 Subject: [PATCH 24/25] docs fix --- airflow/providers/cncf/kubernetes/CHANGELOG.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index 978a293ab60ce..26f185246bd6d 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -38,7 +38,8 @@ Overview '''''''' Generally speaking if you did not subclass ``KubernetesPodOperator`` and you didn't use the ``PodLauncher`` class directly, -then you don't need to worry about this change. If however you have subclassed ``KubernetesPodOperator, what follows are some notes on the changes in this release. +then you don't need to worry about this change. If however you have subclassed ``KubernetesPodOperator``, what +follows are some notes on the changes in this release. One of the principal goals of the refactor is to clearly separate the "get or create pod" and "wait for pod completion" phases. Previously the "wait for pod completion" logic would be invoked From f48af4457a98d5728c248a40819654a921652ce2 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 28 Dec 2021 15:53:38 -0800 Subject: [PATCH 25/25] spelling list update --- docs/spelling_wordlist.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 5f630b52b5b72..d652742c87496 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1410,6 +1410,7 @@ unpausing unpredicted unqueued unterminated +untransformed unutilized updateMask updateonly