Skip to content

Commit

Permalink
The task is stuck in a queued state forever in case of pod launch err…
Browse files Browse the repository at this point in the history
…ors (#36882)
  • Loading branch information
dirrao committed Feb 10, 2024
1 parent 70fd6ad commit e994879
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 28 deletions.
11 changes: 11 additions & 0 deletions airflow/providers/cncf/kubernetes/CHANGELOG.rst
Expand Up @@ -52,6 +52,17 @@ Misc
* ``Changing wording in docstring for CNCF provider (#36547)``
* ``Add support of Pendulum 3 (#36281)``

Breaking changes
~~~~~~~~~~~~~~~~

In the case of Kube API exceeded quota errors, we have introduced the ``task_publish_max_retries``
flag to control the re-queuing task behavior. Changed the default behavior from unlimited
retries to 0. The default behavior is no retries (``task_publish_max_retries==0``). For
unlimited retries, set ``task_publish_max_retries=-1``. For a fixed number of retries, set
``task_publish_max_retries`` to any positive integer.

* ``Fix: The task is stuck in a queued state forever in case of pod launch errors (#36882)``

.. Below changes are excluded from the changelog. Move them to
appropriate section above if needed. Do not delete the lines(!):
* ``Prepare docs 1st wave of Providers January 2024 (#36640)``
Expand Down
34 changes: 24 additions & 10 deletions airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Expand Up @@ -28,7 +28,7 @@
import logging
import multiprocessing
import time
from collections import defaultdict
from collections import Counter, defaultdict
from contextlib import suppress
from datetime import datetime
from queue import Empty, Queue
Expand Down Expand Up @@ -161,6 +161,8 @@ def __init__(self):
self.event_scheduler: EventScheduler | None = None
self.last_handled: dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: str | None = None
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
self.task_publish_max_retries = conf.getint("kubernetes", "task_publish_max_retries", fallback=0)
super().__init__(parallelism=self.kube_config.parallelism)

def _list_pods(self, query_kwargs):
Expand Down Expand Up @@ -425,7 +427,9 @@ def sync(self) -> None:
task = self.task_queue.get_nowait()

try:
key, command, kube_executor_config, pod_template_file = task
self.kube_scheduler.run_next(task)
self.task_publish_retries.pop(key, None)
except PodReconciliationError as e:
self.log.error(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
Expand All @@ -434,19 +438,29 @@ def sync(self) -> None:
)
self.fail(task[0], e)
except ApiException as e:
# These codes indicate something is wrong with pod definition; otherwise we assume pod
# definition is ok, and that retrying may work
if e.status in (400, 422):
self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
key, _, _, _ = task
self.fail(key, e)
else:
body = json.loads(e.body)
retries = self.task_publish_retries[key]
# In case of exceeded quota errors, requeue the task as per the task_publish_max_retries
if (
e.status == 403
and "exceeded quota" in body["message"]
and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries)
):
self.log.warning(
"ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s",
"[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
self.task_publish_retries[key] + 1,
self.task_publish_max_retries,
key,
e.reason,
json.loads(e.body)["message"],
body["message"],
)
self.task_queue.put(task)
self.task_publish_retries[key] = retries + 1
else:
self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
key, _, _, _ = task
self.fail(key, e)
self.task_publish_retries.pop(key, None)
except PodMutationHookException as e:
key, _, _, _ = task
self.log.error(
Expand Down
9 changes: 9 additions & 0 deletions airflow/providers/cncf/kubernetes/provider.yaml
Expand Up @@ -350,6 +350,15 @@ config:
type: string
example: ~
default: ""
task_publish_max_retries:
description: |
The Maximum number of retries for queuing the task to the kubernetes scheduler when
failing due to Kube API exceeded quota errors before giving up and marking task as failed.
-1 for unlimited times.
version_added: ~
type: integer
example: ~
default: "0"

executors:
- airflow.providers.cncf.kubernetes.kubernetes_executor.KubernetesExecutor
173 changes: 155 additions & 18 deletions tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
Expand Up @@ -263,37 +263,170 @@ def setup_method(self) -> None:
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
@pytest.mark.parametrize(
"status, should_requeue",
"response, task_publish_max_retries, should_requeue, task_expected_state",
[
pytest.param(403, True, id="403 Forbidden"),
pytest.param(12345, True, id="12345 fake-unhandled-reason"),
pytest.param(422, False, id="422 Unprocessable Entity"),
pytest.param(400, False, id="400 BadRequest"),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=400),
0,
False,
State.FAILED,
id="400 BadRequest",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=400),
1,
False,
State.FAILED,
id="400 BadRequest (task_publish_max_retries=1)",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=400),
0,
False,
State.FAILED,
id="400 BadRequest",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=400),
1,
False,
State.FAILED,
id="400 BadRequest (task_publish_max_retries=1)",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=403),
0,
False,
State.FAILED,
id="403 Forbidden (permission denied)",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=403),
1,
False,
State.FAILED,
id="403 Forbidden (permission denied) (task_publish_max_retries=1)",
),
pytest.param(
HTTPResponse(
body='{"message": "pods pod1 is forbidden: exceeded quota: '
"resouece-quota, requested: pods=1, used: pods=10, "
'limited: pods=10"}',
status=403,
),
0,
False,
State.FAILED,
id="403 Forbidden (exceeded quota)",
),
pytest.param(
HTTPResponse(
body='{"message": "pods pod1 is forbidden: exceeded quota: '
"resouece-quota, requested: pods=1, used: pods=10, "
'limited: pods=10"}',
status=403,
),
1,
True,
State.SUCCESS,
id="403 Forbidden (exceeded quota) (task_publish_max_retries=1) (retry succeeded)",
),
pytest.param(
HTTPResponse(
body='{"message": "pods pod1 is forbidden: exceeded quota: '
"resouece-quota, requested: pods=1, used: pods=10, "
'limited: pods=10"}',
status=403,
),
1,
True,
State.FAILED,
id="403 Forbidden (exceeded quota) (task_publish_max_retries=1) (retry failed)",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=404),
0,
False,
State.FAILED,
id="404 Not Found",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=404),
1,
False,
State.FAILED,
id="404 Not Found (task_publish_max_retries=1)",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=422),
0,
False,
State.FAILED,
id="422 Unprocessable Entity",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=422),
1,
False,
State.FAILED,
id="422 Unprocessable Entity (task_publish_max_retries=1)",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=12345),
0,
False,
State.FAILED,
id="12345 fake-unhandled-reason",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=12345),
1,
False,
State.FAILED,
id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry succeeded)",
),
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=12345),
1,
False,
State.FAILED,
id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry failed)",
),
],
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_run_next_exception_requeue(
self, mock_get_kube_client, mock_kubernetes_job_watcher, status, should_requeue
self,
mock_get_kube_client,
mock_kubernetes_job_watcher,
response,
task_publish_max_retries,
should_requeue,
task_expected_state,
):
"""
When pod scheduling fails with either reason 'Forbidden', or any reason not yet
handled in the relevant try-except block, the task should stay in the ``task_queue``
and be attempted on a subsequent executor sync. When reason is 'Unprocessable Entity'
or 'BadRequest', the task should be failed without being re-queued.
When pod scheduling fails with any reason not yet
handled in the relevant try-except block and task publish retries not exhausted, the task should stay
in the ``task_queue`` and be attempted on a subsequent executor sync.
When reason is 'Unprocessable Entity' or 'BadRequest' or task publish retries exhausted,
the task should be failed without being re-queued.
Note on error scenarios:
- 403 Forbidden will be returned when your request exceeds namespace quota.
- 422 Unprocessable Entity is returned when your parameters are valid but unsupported
e.g. limits lower than requests.
- 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
- 400 BadRequest will returns in scenarios like
- your request parameters are invalid e.g. asking for cpu=100ABC123.
- 403 Forbidden will returns in scenarios like
- your request exceeds the namespace quota
- scheduler role doesn't have permission to launch the pod
- 404 Not Found will returns in scenarios like
- your requested namespace doesn't exists
- 422 Unprocessable Entity will returns in scenarios like
- your request parameters are valid but unsupported e.g. limits lower than requests.
"""
path = sys.path[0] + "/tests/providers/cncf/kubernetes/pod_generator_base_with_secrets.yaml"

response = HTTPResponse(body='{"message": "any message"}', status=status)

# A mock kube_client that throws errors when making a pod
mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", autospec=True)
mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=ApiException(http_resp=response))
Expand All @@ -306,6 +439,7 @@ def test_run_next_exception_requeue(
}
with conf_vars(config):
kubernetes_executor = self.kubernetes_executor
kubernetes_executor.task_publish_max_retries = task_publish_max_retries
kubernetes_executor.start()
try:
# Execute a task while the Api Throws errors
Expand All @@ -324,16 +458,19 @@ def test_run_next_exception_requeue(
assert not kubernetes_executor.task_queue.empty()

# Disable the ApiException
mock_kube_client.create_namespaced_pod.side_effect = None
if task_expected_state == State.SUCCESS:
mock_kube_client.create_namespaced_pod.side_effect = None

# Execute the task without errors should empty the queue
mock_kube_client.create_namespaced_pod.reset_mock()
kubernetes_executor.sync()
assert mock_kube_client.create_namespaced_pod.called
assert kubernetes_executor.task_queue.empty()
if task_expected_state != State.SUCCESS:
assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
else:
assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
finally:
kubernetes_executor.end()

Expand Down

0 comments on commit e994879

Please sign in to comment.