Skip to content

Commit

Permalink
Add HdfsTaskHandler to store task instance logs on HDFS (#31512)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
  • Loading branch information
Khrol and uranusjr committed Jun 8, 2023
1 parent 16d9f96 commit d3c8881
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 2 deletions.
12 changes: 12 additions & 0 deletions airflow/config_templates/airflow_local_settings.py
Expand Up @@ -201,6 +201,7 @@
# Cloudwatch log groups should start with "cloudwatch://"
# GCS buckets should start with "gs://"
# WASB buckets should start with "wasb"
# HDFS path should start with "hdfs://"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})
Expand Down Expand Up @@ -282,6 +283,17 @@
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"):
HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
"task": {
"class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler",
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"hdfs_log_folder": REMOTE_BASE_LOG_FOLDER,
"filename_template": FILENAME_TEMPLATE,
},
}
DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS)
elif ELASTICSEARCH_HOST:
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
Expand Down
8 changes: 8 additions & 0 deletions airflow/providers/apache/hdfs/CHANGELOG.rst
Expand Up @@ -24,6 +24,14 @@
Changelog
---------

4.1.0
-----

Features
~~~~~~~~

* Add ability to read/write task instance logs from HDFS (#31512)

4.0.0
-----

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/hdfs/__init__.py
Expand Up @@ -28,7 +28,7 @@

__all__ = ["__version__"]

__version__ = "4.0.0"
__version__ = "4.1.0"

try:
from airflow import __version__ as airflow_version
Expand Down
12 changes: 12 additions & 0 deletions airflow/providers/apache/hdfs/hooks/webhdfs.py
Expand Up @@ -153,3 +153,15 @@ def load_file(
hdfs_path=destination, local_path=source, overwrite=overwrite, n_threads=parallelism, **kwargs
)
self.log.debug("Uploaded file %s to %s", source, destination)

def read_file(self, filename: str) -> bytes:
"""Read a file from HDFS.
:param filename: The path of the file to read.
:return: File content as a raw string
"""
conn = self.get_conn()

with conn.read(filename) as reader:
content = reader.read()
return content
16 changes: 16 additions & 0 deletions airflow/providers/apache/hdfs/log/__init__.py
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
133 changes: 133 additions & 0 deletions airflow/providers/apache/hdfs/log/hdfs_task_handler.py
@@ -0,0 +1,133 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
import pathlib
import shutil
from functools import cached_property
from urllib.parse import urlsplit

from airflow.configuration import conf
from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin


class HdfsTaskHandler(FileTaskHandler, LoggingMixin):
"""Logging handler to upload and read from HDFS."""

def __init__(
self, base_log_folder: str, hdfs_log_folder: str, filename_template: str | None = None, **kwargs
):
super().__init__(base_log_folder, filename_template)
self.remote_base = urlsplit(hdfs_log_folder).path
self.log_relative_path = ""
self._hook = None
self.closed = False
self.upload_on_close = True
self.delete_local_copy = (
kwargs["delete_local_copy"]
if "delete_local_copy" in kwargs
else conf.getboolean("logging", "delete_local_logs", fallback=False)
)

@cached_property
def hook(self):
"""Returns WebHDFSHook."""
return WebHDFSHook(webhdfs_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"))

def set_context(self, ti):
super().set_context(ti)
# Local location and remote location is needed to open and
# upload local log file to HDFS storage.
full_path = self.handler.baseFilename
self.log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix()
is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
self.upload_on_close = is_trigger_log_context or not ti.raw
# Clear the file first so that duplicate data is not uploaded
# when re-using the same path (e.g. with rescheduled sensors)
if self.upload_on_close:
with open(self.handler.baseFilename, "w"):
pass

def close(self):
"""Close and upload local log file to HDFS."""
# When application exit, system shuts down all handlers by
# calling close method. Here we check if logger is already
# closed to prevent uploading the log to remote storage multiple
# times when `logging.shutdown` is called.
if self.closed:
return

super().close()

if not self.upload_on_close:
return

local_loc = os.path.join(self.local_base, self.log_relative_path)
remote_loc = os.path.join(self.remote_base, self.log_relative_path)
if os.path.exists(local_loc) and os.path.isfile(local_loc):
self.hook.load_file(local_loc, remote_loc)
if self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

# Mark closed so we don't double write if close is called twice
self.closed = True

def _read_remote_logs(self, ti, try_number, metadata=None):
# Explicitly getting log relative path is necessary as the given
# task instance might be different from task instance passed
# in set_context method.
worker_log_rel_path = self._render_filename(ti, try_number)

logs = []
messages = []
file_path = os.path.join(self.remote_base, worker_log_rel_path)
if self.hook.check_for_path(file_path):
logs.append(self.hook.read_file(file_path).decode("utf-8"))
else:
messages.append(f"No logs found on hdfs for ti={ti}")

return messages, logs

def _read(self, ti, try_number, metadata=None):
"""Read logs of given task instance and try_number from HDFS.
If failed, read the log from task instance host machine.
todo: when min airflow version >= 2.6 then remove this method (``_read``)
:param ti: task instance object
:param try_number: task instance try_number to read logs from
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
"""
# from airflow 2.6 we no longer implement the _read method
if hasattr(super(), "_read_remote_logs"):
return super()._read(ti, try_number, metadata)
# if we get here, we're on airflow < 2.6 and we use this backcompat logic
messages, logs = self._read_remote_logs(ti, try_number, metadata)
if logs:
return "".join(f"*** {x}\n" for x in messages) + "\n".join(logs), {"end_of_log": True}
else:
if metadata and metadata.get("log_pos", 0) > 0:
log_prefix = ""
else:
log_prefix = "*** Falling back to local log\n"
local_log, metadata = super()._read(ti, try_number, metadata)
return f"{log_prefix}{local_log}", metadata
1 change: 1 addition & 0 deletions airflow/providers/apache/hdfs/provider.yaml
Expand Up @@ -24,6 +24,7 @@ description: |
suspended: false
versions:
- 4.1.0
- 4.0.0
- 3.2.1
- 3.2.0
Expand Down
3 changes: 2 additions & 1 deletion docs/apache-airflow-providers-apache-hdfs/index.rst
Expand Up @@ -28,6 +28,7 @@ Content

Connection types <connections>
Operators <operators/index>
Logging for Tasks <logging/index>

.. toctree::
:maxdepth: 1
Expand All @@ -54,7 +55,7 @@ Package apache-airflow-providers-apache-hdfs
and `WebHDFS <https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html>`__.


Release: 4.0.0
Release: 4.1.0

Provider package
----------------
Expand Down
@@ -0,0 +1,42 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _write-logs-hdfs:

Writing logs to HDFS
---------------------------

Remote logging to HDFS uses an existing Airflow connection to read or write logs. If you
don't have a connection properly setup, this process will fail.


Enabling remote logging
'''''''''''''''''''''''

To enable this feature, ``airflow.cfg`` must be configured as follows:

.. code-block:: ini
[logging]
# Airflow can store logs remotely in HDFS. Users must supply a remote
# location URL (starting with either 'hdfs://...') and an Airflow connection
# id that provides access to the storage location.
remote_logging = True
remote_base_log_folder = hdfs://some/path/to/logs
remote_log_conn_id = webhdfs_default
In the above example, Airflow will try to use ``WebHDFSHook('webhdfs_default')``.
25 changes: 25 additions & 0 deletions docs/apache-airflow-providers-apache-hdfs/logging/index.rst
@@ -0,0 +1,25 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Logging for Tasks
=================

.. toctree::
:maxdepth: 1
:glob:

*

0 comments on commit d3c8881

Please sign in to comment.