Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@
"""File logging handler for tasks."""
import logging
import os
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Tuple

from itsdangerous import TimedJSONWebSignatureSerializer

from airflow.configuration import AirflowConfigException, conf
from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session

if TYPE_CHECKING:
from airflow.models import TaskInstance


LOG_SEARCH_INTERVAL = timedelta(days=2)


class FileTaskHandler(logging.Handler):
"""
FileTaskHandler is a python log handler that handles and reads
Expand Down Expand Up @@ -179,26 +184,51 @@ def _read(self, ti, try_number, metadata=None):
else:
import httpx

# Inner import to avoid circular import
from airflow.models import TaskInstance

try:
timeout: Optional[int] = conf.getint('webserver', 'log_fetch_timeout_sec')
except (AirflowConfigException, ValueError):
timeout = None # No timeout

signer = TimedJSONWebSignatureSerializer(
secret_key=conf.get('webserver', 'secret_key'),
algorithm_name='HS512',
expires_in=conf.getint('webserver', 'log_request_clock_grace', fallback=30),
# This isn't really a "salt", more of a signing context
salt='task-instance-logs',
)

url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path).format(
ti=ti, worker_log_server_port=conf.get('logging', 'WORKER_LOG_SERVER_PORT')
)

with create_session() as session:
hosts = (
session.query(TaskInstance.hostname)
.filter(
TaskInstance.hostname != '',
TaskInstance.end_date > (timezone.utcnow() - LOG_SEARCH_INTERVAL),
)
.distinct()
)

for host in hosts:
url = os.path.join("http://{host}:{worker_log_server_port}/log", log_relative_path).format(
host=host[0], worker_log_server_port=conf.get('logging', 'WORKER_LOG_SERVER_PORT')
)

response = httpx.head(
url, timeout=timeout, headers={'Authorization': signer.dumps(log_relative_path)}
)

if response.status_code == 200:
break

log += f"*** Log file does not exist: {location}\n"
log += f"*** Fetching from: {url}\n"
try:
timeout = None # No timeout
try:
timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
except (AirflowConfigException, ValueError):
pass

signer = TimedJSONWebSignatureSerializer(
secret_key=conf.get('webserver', 'secret_key'),
algorithm_name='HS512',
expires_in=conf.getint('webserver', 'log_request_clock_grace', fallback=30),
# This isn't really a "salt", more of a signing context
salt='task-instance-logs',
)

response = httpx.get(
url, timeout=timeout, headers={'Authorization': signer.dumps(log_relative_path)}
)
Expand Down