Skip to content

Commit

Permalink
Deprecate skip_exit_code in DockerOperator and `KubernetesPodOper…
Browse files Browse the repository at this point in the history
…ator` (#30733)

* Deprecate `skip_exit_code` in `DockerOperator` and `KubernetesPodOperator`

* satisfy mypy
  • Loading branch information
eladkal committed Apr 19, 2023
1 parent 719e5a9 commit 99a3bf2
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 24 deletions.
18 changes: 13 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Expand Up @@ -213,7 +213,7 @@ class KubernetesPodOperator(BaseOperator):
to populate the environment variables with. The contents of the target
ConfigMap's Data field will represent the key-value pairs as environment variables.
Extends env_from.
:param skip_exit_code: If task exits with this exit code, leave the task
:param skip_on_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param base_container_name: The name of the base container in the pod. This container's logs
Expand Down Expand Up @@ -292,6 +292,7 @@ def __init__(
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
skip_exit_code: int | None = None,
skip_on_exit_code: int | None = None,
base_container_name: str | None = None,
deferrable: bool = False,
poll_interval: float = 2,
Expand Down Expand Up @@ -361,7 +362,13 @@ def __init__(
self.termination_grace_period = termination_grace_period
self.pod_request_obj: k8s.V1Pod | None = None
self.pod: k8s.V1Pod | None = None
self.skip_exit_code = skip_exit_code
if skip_exit_code is not None:
warnings.warn(
"skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2
)
self.skip_on_exit_code: int | None = skip_exit_code
else:
self.skip_on_exit_code = skip_on_exit_code
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.deferrable = deferrable
self.poll_interval = poll_interval
Expand Down Expand Up @@ -675,7 +682,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):

error_message = get_container_termination_message(remote_pod, self.base_container_name)
error_message = "\n" + error_message if error_message else ""
if self.skip_exit_code is not None:
if self.skip_on_exit_code is not None:
container_statuses = (
remote_pod.status.container_statuses if remote_pod and remote_pod.status else None
) or []
Expand All @@ -689,9 +696,10 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
and base_container_status.last_state.terminated
else None
)
if exit_code == self.skip_exit_code:
if exit_code == self.skip_on_exit_code:
raise AirflowSkipException(
f"Pod {pod and pod.metadata.name} returned exit code {self.skip_exit_code}. Skipping."
f"Pod {pod and pod.metadata.name} returned exit code "
f"{self.skip_on_exit_code}. Skipping."
)
raise AirflowException(
f"Pod {pod and pod.metadata.name} returned a failure:\n{error_message}\n"
Expand Down
15 changes: 11 additions & 4 deletions airflow/providers/docker/operators/docker.py
Expand Up @@ -155,7 +155,7 @@ class DockerOperator(BaseOperator):
If rolling the logs creates excess files, the oldest file is removed.
Only effective when max-size is also set. A positive integer. Defaults to 1.
:param ipc_mode: Set the IPC mode for the container.
:param skip_exit_code: If task exits with this exit code, leave the task
:param skip_on_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
"""
Expand Down Expand Up @@ -215,6 +215,7 @@ def __init__(
log_opts_max_file: str | None = None,
ipc_mode: str | None = None,
skip_exit_code: int | None = None,
skip_on_exit_code: int | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand Down Expand Up @@ -276,7 +277,13 @@ def __init__(
self.log_opts_max_size = log_opts_max_size
self.log_opts_max_file = log_opts_max_file
self.ipc_mode = ipc_mode
self.skip_exit_code = skip_exit_code
if skip_exit_code is not None:
warnings.warn(
"skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2
)
self.skip_on_exit_code: int | None = skip_exit_code
else:
self.skip_on_exit_code = skip_on_exit_code

@cached_property
def hook(self) -> DockerHook:
Expand Down Expand Up @@ -377,9 +384,9 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[
self.log.info("%s", log_chunk)

result = self.cli.wait(self.container["Id"])
if result["StatusCode"] == self.skip_exit_code:
if result["StatusCode"] == self.skip_on_exit_code:
raise AirflowSkipException(
f"Docker container returned exit code {self.skip_exit_code}. Skipping."
f"Docker container returned exit code {self.skip_on_exit_code}. Skipping."
)
elif result["StatusCode"] != 0:
joined_log_lines = "\n".join(log_lines)
Expand Down
18 changes: 9 additions & 9 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Expand Up @@ -1089,16 +1089,16 @@ def test_task_id_as_name_dag_id_is_ignored(self):
"extra_kwargs, actual_exit_code, expected_exc",
[
(None, 99, AirflowException),
({"skip_exit_code": 100}, 100, AirflowSkipException),
({"skip_exit_code": 100}, 101, AirflowException),
({"skip_exit_code": None}, 100, AirflowException),
({"skip_on_exit_code": 100}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": None}, 100, AirflowException),
],
)
@patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
def test_task_skip_when_pod_exit_with_certain_code(
self, remote_pod, extra_kwargs, actual_exit_code, expected_exc
):
"""Tests that an AirflowSkipException is raised when the container exits with the skip_exit_code"""
"""Tests that an AirflowSkipException is raised when the container exits with the skip_on_exit_code"""
k = KubernetesPodOperator(
task_id="task", is_delete_operator_pod=True, **(extra_kwargs if extra_kwargs else {})
)
Expand Down Expand Up @@ -1284,13 +1284,13 @@ def test_async_create_pod_should_throw_exception(self, mocked_hook, mocked_clean
[
(None, 0, None, "Succeeded", "success"),
(None, 99, AirflowException, "Failed", "error"),
({"skip_exit_code": 100}, 100, AirflowSkipException, "Failed", "error"),
({"skip_exit_code": 100}, 101, AirflowException, "Failed", "error"),
({"skip_exit_code": None}, 100, AirflowException, "Failed", "error"),
({"skip_on_exit_code": 100}, 100, AirflowSkipException, "Failed", "error"),
({"skip_on_exit_code": 100}, 101, AirflowException, "Failed", "error"),
({"skip_on_exit_code": None}, 100, AirflowException, "Failed", "error"),
],
)
@patch(HOOK_CLASS)
def test_async_create_pod_with_skip_exit_code_should_skip(
def test_async_create_pod_with_skip_on_exit_code_should_skip(
self,
mocked_hook,
extra_kwargs,
Expand All @@ -1299,7 +1299,7 @@ def test_async_create_pod_with_skip_exit_code_should_skip(
pod_status,
event_status,
):
"""Tests that an AirflowSkipException is raised when the container exits with the skip_exit_code"""
"""Tests that an AirflowSkipException is raised when the container exits with the skip_on_exit_code"""

k = KubernetesPodOperator(
task_id=TEST_TASK_ID,
Expand Down
6 changes: 3 additions & 3 deletions tests/providers/docker/decorators/test_docker.py
Expand Up @@ -124,9 +124,9 @@ def do_run():
"extra_kwargs, actual_exit_code, expected_state",
[
(None, 99, TaskInstanceState.FAILED),
({"skip_exit_code": 100}, 100, TaskInstanceState.SKIPPED),
({"skip_exit_code": 100}, 101, TaskInstanceState.FAILED),
({"skip_exit_code": None}, 0, TaskInstanceState.SUCCESS),
({"skip_on_exit_code": 100}, 100, TaskInstanceState.SKIPPED),
({"skip_on_exit_code": 100}, 101, TaskInstanceState.FAILED),
({"skip_on_exit_code": None}, 0, TaskInstanceState.SUCCESS),
],
)
def test_skip_docker_operator(self, extra_kwargs, actual_exit_code, expected_state, dag_maker):
Expand Down
6 changes: 3 additions & 3 deletions tests/providers/docker/operators/test_docker.py
Expand Up @@ -516,9 +516,9 @@ def test_execute_unicode_logs(self):
"extra_kwargs, actual_exit_code, expected_exc",
[
(None, 99, AirflowException),
({"skip_exit_code": 100}, 100, AirflowSkipException),
({"skip_exit_code": 100}, 101, AirflowException),
({"skip_exit_code": None}, 100, AirflowException),
({"skip_on_exit_code": 100}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": None}, 100, AirflowException),
],
)
def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):
Expand Down

0 comments on commit 99a3bf2

Please sign in to comment.