Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix excessive permission changing for log task handler #38164

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 9 additions & 32 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,31 +159,6 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance:
raise AirflowException(f"Could not find TaskInstance for {ti}")


def _change_directory_permissions_up(directory: Path, folder_permissions: int):
"""
Change permissions of the given directory and its parents.

Only attempt to change permissions for directories owned by the current user.

:param directory: directory to change permissions of (including parents)
:param folder_permissions: permissions to set
"""
if directory.stat().st_uid == os.getuid():
if directory.stat().st_mode % 0o1000 != folder_permissions % 0o1000:
print(f"Changing {directory} permission to {folder_permissions}")
try:
directory.chmod(folder_permissions)
except PermissionError as e:
# In some circumstances (depends on user and filesystem) we might not be able to
# change the permission for the folder (when the folder was created by another user
# before or when the filesystem does not allow to change permission). We should not
# fail in this case but rather ignore it.
print(f"Failed to change {directory} permission to {folder_permissions}: {e}")
return
if directory.parent != directory:
_change_directory_permissions_up(directory.parent, folder_permissions)


class FileTaskHandler(logging.Handler):
"""
FileTaskHandler is a python log handler that handles and reads task instance logs.
Expand Down Expand Up @@ -481,7 +456,8 @@ def read(self, task_instance, try_number=None, metadata=None):

return logs, metadata_array

def _prepare_log_folder(self, directory: Path):
@staticmethod
def _prepare_log_folder(directory: Path, new_folder_permissions: int):
"""
Prepare the log folder and ensure its mode is as configured.

Expand All @@ -505,11 +481,9 @@ def _prepare_log_folder(self, directory: Path):
sure that the same group is set as default group for both - impersonated user and main airflow
user.
"""
new_folder_permissions = int(
conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8
)
directory.mkdir(mode=new_folder_permissions, parents=True, exist_ok=True)
_change_directory_permissions_up(directory, new_folder_permissions)
for parent in reversed(directory.parents):
parent.mkdir(mode=new_folder_permissions, exist_ok=True)
directory.mkdir(mode=new_folder_permissions, exist_ok=True)

def _init_file(self, ti, *, identifier: str | None = None):
"""
Expand All @@ -531,7 +505,10 @@ def _init_file(self, ti, *, identifier: str | None = None):
# if this is true, we're invoked via set_context in the context of
# setting up individual trigger logging. return trigger log path.
full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id)
self._prepare_log_folder(Path(full_path).parent)
new_folder_permissions = int(
conf.get("logging", "file_task_handler_new_folder_permissions", fallback="0o775"), 8
)
self._prepare_log_folder(Path(full_path).parent, new_folder_permissions)

if not os.path.exists(full_path):
open(full_path, "a").close()
Expand Down
28 changes: 12 additions & 16 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import os
import re
import shutil
import tempfile
from importlib import reload
from pathlib import Path
from unittest import mock
Expand All @@ -44,7 +43,6 @@
from airflow.utils.log.file_task_handler import (
FileTaskHandler,
LogType,
_change_directory_permissions_up,
_interleave_logs,
_parse_timestamps_in_log_file,
)
Expand Down Expand Up @@ -729,28 +727,26 @@ def test_interleave_logs_correct_ordering():
assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, "", sample_with_dupe))


def test_permissions_for_new_directories():
tmp_path = Path(tempfile.mkdtemp())
def test_permissions_for_new_directories(tmpdir):
tmpdir_path = Path(tmpdir)
try:
# Set umask to 0o027: owner rwx, group rx-w, other -rwx
old_umask = os.umask(0o027)
try:
subdir = tmp_path / "subdir1" / "subdir2"
base_dir = tmpdir_path / "base"
base_dir.mkdir()
log_dir = base_dir / "subdir1" / "subdir2"
# force permissions for the new folder to be owner rwx, group -rxw, other -rwx
new_folder_permissions = 0o700
# default permissions are owner rwx, group rx-w, other -rwx (umask bit negative)
default_permissions = 0o750
subdir.mkdir(mode=new_folder_permissions, parents=True, exist_ok=True)
assert subdir.exists()
assert subdir.is_dir()
assert subdir.stat().st_mode % 0o1000 == new_folder_permissions
# initially parent permissions are as per umask
assert subdir.parent.stat().st_mode % 0o1000 == default_permissions
_change_directory_permissions_up(subdir, new_folder_permissions)
assert subdir.stat().st_mode % 0o1000 == new_folder_permissions
# now parent permissions are as per new_folder_permissions
assert subdir.parent.stat().st_mode % 0o1000 == new_folder_permissions
FileTaskHandler._prepare_log_folder(log_dir, new_folder_permissions)
assert log_dir.exists()
assert log_dir.is_dir()
assert log_dir.stat().st_mode % 0o1000 == new_folder_permissions
assert log_dir.parent.stat().st_mode % 0o1000 == new_folder_permissions
assert base_dir.stat().st_mode % 0o1000 == default_permissions
finally:
os.umask(old_umask)
finally:
shutil.rmtree(tmp_path)
shutil.rmtree(tmpdir_path)