Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a generic export for containerized executor logging #34903

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from airflow.models.operator import needs_expansion
from airflow.models.param import ParamsDict
from airflow.models.taskinstance import TaskReturnCode
from airflow.settings import IS_K8S_EXECUTOR_POD
from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.typing_compat import Literal
Expand Down Expand Up @@ -326,7 +326,7 @@ def _move_task_handlers_to_root(ti: TaskInstance) -> Generator[None, None, None]
console_handler = next((h for h in root_logger.handlers if h.name == "console"), None)
with LoggerMutationHelper(root_logger), LoggerMutationHelper(ti.log) as task_helper:
task_helper.move(root_logger)
if IS_K8S_EXECUTOR_POD:
if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nittiest of nitpicks, non-blocking thought: in two places you use if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:, I wonder if a calculated constant somewhere like IS_CONTAINERIZED = any(IS_K8S_EXECUTOR_POD, IS_EXECUTOR_CONTAINER)... but that's likely pointless overkill right now, but if/when other providers add their own containerized executors, that may be needed...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From another direction, it seems that IS_EXECUTOR_CONTAINER is never used on its own, which makes me feel perhaps only the IS_CONTAINERIZED constant is needed, and IS_EXECUTOR_CONTAINER can be removed entirely. The same applies for the test parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry folks (@ferruzzi @uranusjr), I thought I put enough details in the description but perhaps I should have added more!

This new constant will be a generalized replacement for the K8s constant we have now. Any new containerized executor should use this one. Which our new ECS executor does use (link in description). I did not remove the K8s environment variable in this PR to keep the scope small. This change is just the core pieces. The K8s executor will be migrated over to this env var in a follow up PR.

if console_handler and console_handler not in root_logger.handlers:
root_logger.addHandler(console_handler)
yield
Expand Down
4 changes: 4 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ def initialize():
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}

# Executors can set this to true to configure logging correctly for
# containerized executors.
IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
"""Will be True if running in kubernetes executor pod."""

Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/log/logging_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ def __init__(self, stream):
@property
def stream(self):
"""Returns current stream."""
from airflow.settings import IS_K8S_EXECUTOR_POD
from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD

if IS_K8S_EXECUTOR_POD:
if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
return self._orig_stream
if self._use_stderr:
return sys.stderr
Expand Down
13 changes: 9 additions & 4 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,10 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job)
external_executor_id="ABCD12345",
)

@pytest.mark.parametrize("is_k8s", ["true", ""])
def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s):
@pytest.mark.parametrize(
"is_k8s, is_container_exec", [("true", "true"), ("true", ""), ("", "true"), ("", "")]
)
def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_container_exec):
"""
When running task --local as k8s executor pod, all logging should make it to stdout.
Otherwise, all logging after "running TI" is redirected to logs (and the actual log
Expand All @@ -748,7 +750,10 @@ def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s):
import subprocess

with mock.patch.dict(
"os.environ", AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s, PYTHONPATH=os.fspath(AIRFLOW_SOURCES_ROOT)
"os.environ",
AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s,
AIRFLOW_IS_EXECUTOR_CONTAINER=is_container_exec,
PYTHONPATH=os.fspath(AIRFLOW_SOURCES_ROOT),
):
with subprocess.Popen(
args=[sys.executable, "-m", "airflow", *self.task_args, "-S", self.dag_path],
Expand All @@ -764,7 +769,7 @@ def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s):
found_start = True
if found_start:
lines.append(line)
if is_k8s:
if any((is_k8s, is_container_exec)):
# 10 is arbitrary, but, with enough padding to hopefully not be flakey
assert len(lines) > 10
self.assert_log_line("Starting attempt 1 of 1", lines)
Expand Down