Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log lookup exception for empty op subtypes #35536

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,16 @@ def _executor_get_task_log(self) -> Callable[[TaskInstance, int], tuple[list[str
executor = ExecutorLoader.get_default_executor()
return executor.get_task_log

@cached_property
def _test_mode(self) -> bool:
"""Pulls the unit test mode flag from config.

This lives in a function here to be cached and only hit the config once.
"""
from airflow.configuration import conf

return conf.getboolean("core", "unit_test_mode")

def _read(
self,
ti: TaskInstance,
Expand Down Expand Up @@ -357,7 +367,9 @@ def _read(
worker_log_full_path = Path(self.local_base, worker_log_rel_path)
local_messages, local_logs = self._read_from_local(worker_log_full_path)
messages_list.extend(local_messages)
if is_running and not executor_messages:
if ti.task.inherits_from_empty_operator is True and self._test_mode is False:
vchiapaikeo marked this conversation as resolved.
Show resolved Hide resolved
executor_logs.append("Operator inherits from empty operator and thus does not have logs")
elif is_running and not executor_messages:
served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
messages_list.extend(served_messages)
elif ti.state not in State.unfinished and not (local_logs or remote_logs):
Expand Down