Skip to content

Commit

Permalink
Fix timeout when using XCom with KubernetesPodOperator (#15388)
Browse files Browse the repository at this point in the history
* Fix timeout when using XCom with KubernetesPodOperator

Currently, the xcom sidecar container for the KubernetesPodOperator will
sleep for 30 seconds before checking if the xcom has completed. This is
far too long of a wait, as a 1 second wait will ensure that the process
is not consistently blocked.

* get rid of unused variable

* remove unecessary poddefaults

* update docs

* Add UPDATING

* more description

* fix tests

* fix tests

* Finallyfound it
  • Loading branch information
dimberman committed Apr 24, 2021
1 parent a3b0a27 commit d3cc67a
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 27 deletions.
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ https://developers.google.com/style/inclusive-documentation
-->

### Deprecated PodDefaults and add_xcom_sidecar in airflow.kubernetes.pod_generator

We have moved PodDefaults from `airflow.kubernetes.pod_generator.PodDefaults` to
`airflow.providers.cncf.kubernetes.utils.xcom_sidecar.PodDefaults` and moved add_xcom_sidecar
from `airflow.kubernetes.pod_generator.PodGenerator.add_xcom_sidecar`to
`airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar`.
This change will allow us to modify the KubernetesPodOperator XCom functionality without requiring airflow upgrades.

### Removed pod_launcher from core airflow

Moved the pod launcher from `airflow.kubernetes.pod_launcher` to `airflow.providers.cncf.kubernetes.utils.pod_launcher`
Expand Down
27 changes: 5 additions & 22 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,12 @@

import airflow.utils.yaml as yaml
from airflow.exceptions import AirflowConfigException
from airflow.kubernetes.pod_generator_deprecated import PodGenerator as PodGeneratorDeprecated
from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated
from airflow.version import version as airflow_version

MAX_LABEL_LEN = 63


class PodDefaults:
"""Static defaults for Pods"""

XCOM_MOUNT_PATH = '/airflow/xcom'
SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 30; done;'
VOLUME_MOUNT = k8s.V1VolumeMount(name='xcom', mount_path=XCOM_MOUNT_PATH)
VOLUME = k8s.V1Volume(name='xcom', empty_dir=k8s.V1EmptyDirVolumeSource())
SIDECAR_CONTAINER = k8s.V1Container(
name=SIDECAR_CONTAINER_NAME,
command=['sh', '-c', XCOM_CMD],
image='alpine',
volume_mounts=[VOLUME_MOUNT],
resources=k8s.V1ResourceRequirements(
requests={
"cpu": "1m",
}
),
)


def make_safe_label_value(string):
"""
Valid label values must be 63 characters or less and must be empty or begin and
Expand Down Expand Up @@ -157,6 +136,10 @@ def gen_pod(self) -> k8s.V1Pod:
@staticmethod
def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
"""Adds sidecar"""
warnings.warn(
"This function is deprecated. "
"Please use airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead"
)
pod_cp = copy.deepcopy(pod)
pod_cp.spec.volumes = pod.spec.volumes or []
pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
convert_volume_mount,
)
from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.providers.cncf.kubernetes.utils import pod_launcher
from airflow.providers.cncf.kubernetes.utils import pod_launcher, xcom_sidecar
from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import validate_key
from airflow.utils.state import State
Expand Down Expand Up @@ -487,7 +487,7 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
pod = secret.attach_to_pod(pod)
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = PodGenerator.add_xcom_sidecar(pod)
pod = xcom_sidecar.add_xcom_sidecar(pod)
return pod

def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]:
Expand Down
57 changes: 57 additions & 0 deletions airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module handles all xcom functionality for the KubernetesPodOperator
by attaching a sidecar container that blocks the pod from completing until
Airflow has pulled result data into the worker for xcom serialization.
"""
import copy

from kubernetes.client import models as k8s


class PodDefaults:
"""Static defaults for Pods"""

XCOM_MOUNT_PATH = '/airflow/xcom'
SIDECAR_CONTAINER_NAME = 'airflow-xcom-sidecar'
XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;'
VOLUME_MOUNT = k8s.V1VolumeMount(name='xcom', mount_path=XCOM_MOUNT_PATH)
VOLUME = k8s.V1Volume(name='xcom', empty_dir=k8s.V1EmptyDirVolumeSource())
SIDECAR_CONTAINER = k8s.V1Container(
name=SIDECAR_CONTAINER_NAME,
command=['sh', '-c', XCOM_CMD],
image='alpine',
volume_mounts=[VOLUME_MOUNT],
resources=k8s.V1ResourceRequirements(
requests={
"cpu": "1m",
}
),
)


def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
"""Adds sidecar"""
pod_cp = copy.deepcopy(pod)
pod_cp.spec.volumes = pod.spec.volumes or []
pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)

return pod_cp
4 changes: 2 additions & 2 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@

from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.kubernetes.secret import Secret
from airflow.models import DAG, TaskInstance
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
from airflow.version import version as airflow_version

Expand Down Expand Up @@ -904,7 +904,7 @@ def test_pod_template_file(
'volumeMounts': [{'mountPath': '/airflow/xcom', 'name': 'xcom'}],
},
{
'command': ['sh', '-c', 'trap "exit 0" INT; while true; do sleep 30; done;'],
'command': ['sh', '-c', 'trap "exit 0" INT; while true; do sleep 1; done;'],
'image': 'alpine',
'name': 'airflow-xcom-sidecar',
'resources': {'requests': {'cpu': '1m'}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client
from airflow.kubernetes.pod import Port
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.models import DAG, TaskInstance
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.version import version as airflow_version
Expand Down

0 comments on commit d3cc67a

Please sign in to comment.