Skip to content

Commit

Permalink
Add log lookup exception for empty op subtypes (apache#35536)
Browse files Browse the repository at this point in the history
* Add log lookup exception for empty op subtypes

* Use exception catching approach instead to preserve tests
  • Loading branch information
vchiapaikeo authored and abhishekbhakat committed Mar 5, 2024
1 parent 95c9fab commit b27a36d
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from typing import TYPE_CHECKING, Any, Callable, Iterable
from urllib.parse import urljoin

import httpx
import pendulum

from airflow.configuration import conf
Expand Down Expand Up @@ -78,8 +79,6 @@ def _set_task_deferred_context_var():


def _fetch_logs_from_service(url, log_relative_path):
import httpx

from airflow.utils.jwt_signer import JWTSigner

timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None)
Expand Down Expand Up @@ -170,6 +169,9 @@ class FileTaskHandler(logging.Handler):
"""

trigger_should_wrap = True
inherits_from_empty_operator_log_message = (
"Operator inherits from empty operator and thus does not have logs"
)

def __init__(self, base_log_folder: str, filename_template: str | None = None):
super().__init__()
Expand Down Expand Up @@ -555,8 +557,11 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li
messages.append(f"Found logs served from host {url}")
logs.append(response.text)
except Exception as e:
messages.append(f"Could not read served logs: {e}")
logger.exception("Could not read served logs")
if isinstance(e, httpx.UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True:
messages.append(self.inherits_from_empty_operator_log_message)
else:
messages.append(f"Could not read served logs: {e}")
logger.exception("Could not read served logs")
return messages, logs

def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]:
Expand Down

0 comments on commit b27a36d

Please sign in to comment.