Skip to content

Commit

Permalink
Immediately fail the task in case of worker pod having a fatal contai…
Browse files Browse the repository at this point in the history
…ner state (apache#37670)

* fail the task in case of worker pod having fatal container state

* version number updated
  • Loading branch information
dirrao authored and utkarsharma2 committed Apr 22, 2024
1 parent d6261de commit bbf9b6b
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,34 @@ def process_status(
self.watcher_queue.put(
(pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version)
)
elif (
self.kube_config.worker_pod_pending_fatal_container_state_reasons
and "status" in event["raw_object"]
):
self.log.info("Event: %s Pending, annotations: %s", pod_name, annotations_string)
# Init containers and base container statuses to check.
# Skipping the other containers statuses check.
container_statuses_to_check = []
if "initContainerStatuses" in event["raw_object"]["status"]:
container_statuses_to_check.extend(event["raw_object"]["status"]["initContainerStatuses"])
if "containerStatuses" in event["raw_object"]["status"]:
container_statuses_to_check.append(event["raw_object"]["status"]["containerStatuses"][0])
for container_status in container_statuses_to_check:
container_status_state = container_status["state"]
if "waiting" in container_status_state:
if (
container_status_state["waiting"]["reason"]
in self.kube_config.worker_pod_pending_fatal_container_state_reasons
):
if (
container_status_state["waiting"]["reason"] == "ErrImagePull"
and container_status_state["waiting"]["message"] == "pull QPS exceeded"
):
continue
self.watcher_queue.put(
(pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version)
)
break
else:
self.log.debug("Event: %s Pending, annotations: %s", pod_name, annotations_string)
elif status == "Failed":
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/cncf/kubernetes/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def __init__(self):
self.delete_worker_pods_on_failure = conf.getboolean(
self.kubernetes_section, "delete_worker_pods_on_failure"
)
self.worker_pod_pending_fatal_container_state_reasons = []
if conf.get(self.kubernetes_section, "worker_pod_pending_fatal_container_state_reasons", fallback=""):
self.worker_pod_pending_fatal_container_state_reasons = conf.get(
self.kubernetes_section, "worker_pod_pending_fatal_container_state_reasons"
).split(",")

self.worker_pods_creation_batch_size = conf.getint(
self.kubernetes_section, "worker_pods_creation_batch_size"
)
Expand Down
10 changes: 10 additions & 0 deletions airflow/providers/cncf/kubernetes/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ config:
type: string
example: ~
default: "False"
worker_pod_pending_fatal_container_state_reasons:
description: |
If the worker pods are in a pending state due to a fatal container
state reasons, then fail the task and delete the worker pod
if delete_worker_pods is True and delete_worker_pods_on_failure is True.
version_added: 8.1.0
type: string
example: ~
default: 'CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError,
InvalidImageName'
worker_pods_creation_batch_size:
description: |
Number of Kubernetes Worker Pod creation calls per scheduler loop.
Expand Down
189 changes: 186 additions & 3 deletions tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,14 @@ def setup_method(self):
scheduler_job_id="123",
kube_config=mock.MagicMock(),
)
self.watcher.kube_config.worker_pod_pending_fatal_container_state_reasons = [
"CreateContainerConfigError",
"CrashLoopBackOff",
"ErrImagePull",
"CreateContainerError",
"ImageInspectError",
"InvalidImageName",
]
self.kube_client = mock.MagicMock()
self.core_annotations = {
"dag_id": "dag",
Expand Down Expand Up @@ -1532,11 +1540,186 @@ def assert_watcher_queue_called_once_with_state(self, state):
)
)

def test_process_status_pending(self):
self.events.append({"type": "MODIFIED", "object": self.pod})
@pytest.mark.parametrize(
"raw_object, is_watcher_queue_called",
[
pytest.param(
{
"status": {
"startTime": "2020-05-12T03:49:57Z",
"containerStatuses": [
{
"name": "base",
"state": {
"waiting": {
"reason": "CreateContainerConfigError",
"message": 'secret "my-secret" not found',
}
},
"lastState": {},
"ready": False,
"restartCount": 0,
"image": "dockerhub.com/apache/airflow:latest",
"imageID": "",
}
],
}
},
True,
id="CreateContainerConfigError",
),
pytest.param(
{
"status": {
"startTime": "2020-05-12T03:49:57Z",
"containerStatuses": [
{
"name": "base",
"state": {
"waiting": {"reason": "ErrImagePull", "message": "pull QPS exceeded"}
},
"lastState": {},
"ready": False,
"restartCount": 0,
"image": "dockerhub.com/apache/airflow:latest",
"imageID": "",
}
],
}
},
False,
id="ErrImagePull Image QPS Exceeded",
),
pytest.param(
{
"status": {
"startTime": "2020-05-12T03:49:57Z",
"containerStatuses": [
{
"name": "base",
"state": {
"waiting": {
"reason": "ErrImagePull",
"message": "rpc error: code = Unknown desc = Error response from daemon: manifest for dockerhub.com/apache/airflow:xyz not found: manifest unknown: Requested image not found",
}
},
"lastState": {},
"ready": False,
"restartCount": 0,
"image": "dockerhub.com/apache/airflow:xyz",
"imageID": "",
}
],
}
},
True,
id="ErrImagePull Image Not Found",
),
pytest.param(
{
"status": {
"startTime": "2020-05-12T03:49:57Z",
"containerStatuses": [
{
"name": "base",
"state": {
"waiting": {
"reason": "CreateContainerError",
"message": 'Error: Error response from daemon: create \invalid\path: "\\invalid\path" includes invalid characters for a local volume name, only "[a-zA-Z0-9][a-zA-Z0-9_.-]" are allowed. If you intended to pass a host directory, use absolute path',
}
},
"lastState": {},
"ready": False,
"restartCount": 0,
"image": "dockerhub.com/apache/airflow:latest",
"imageID": "",
}
],
}
},
True,
id="CreateContainerError",
),
pytest.param(
{
"status": {
"startTime": "2020-05-12T03:49:57Z",
"containerStatuses": [
{
"name": "base",
"state": {
"waiting": {
"reason": "ImageInspectError",
"message": 'Failed to inspect image "dockerhub.com/apache/airflow:latest": rpc error: code = Unknown desc = Error response from daemon: readlink /var/lib/docker/overlay2: invalid argument',
}
},
"lastState": {},
"ready": False,
"restartCount": 0,
"image": "dockerhub.com/apache/airflow:latest",
"imageID": "",
}
],
}
},
True,
id="ImageInspectError",
),
pytest.param(
{
"status": {
"startTime": "2020-05-12T03:49:57Z",
"containerStatuses": [
{
"name": "base",
"state": {
"waiting": {
"reason": "InvalidImageName",
"message": 'Failed to apply default image tag "dockerhub.com/apache/airflow:latest+07": couldnot parse image reference "dockerhub.com/apache/airflow:latest+07": invalid reference format',
}
},
"lastState": {},
"ready": False,
"restartCount": 0,
"image": "dockerhub.com/apache/airflow:latest+07",
"imageID": "",
}
],
}
},
True,
id="InvalidImageName",
),
pytest.param(
{
"status": {
"startTime": "2020-05-12T03:49:57Z",
"containerStatuses": [
{
"name": "base",
"state": {"waiting": {"reason": "OtherReasons", "message": ""}},
"lastState": {},
"ready": False,
"restartCount": 0,
"image": "dockerhub.com/apache/airflow:latest",
"imageID": "",
}
],
}
},
False,
id="OtherReasons",
),
],
)
def test_process_status_pending(self, raw_object, is_watcher_queue_called):
self.events.append({"type": "MODIFIED", "object": self.pod, "raw_object": raw_object})

self._run()
self.watcher.watcher_queue.put.assert_not_called()
if is_watcher_queue_called:
self.assert_watcher_queue_called_once_with_state(State.FAILED)
else:
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_pending_deleted(self):
self.events.append({"type": "DELETED", "object": self.pod})
Expand Down

0 comments on commit bbf9b6b

Please sign in to comment.