Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: File writer enhancements #1818

Merged
merged 4 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions subiquity/models/subiquity.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def _cloud_init_files(self):
("etc/cloud/ds-identify.cfg", "policy: enabled\n", 0o644),
]
# Add cloud-init clean hooks to support golden-image creation.
cfg_files = ["/" + path for (path, _content, _cmode) in files]
cfg_files = ["/" + path for (path, _content, _mode) in files]
cfg_files.extend(self.network.rendered_config_paths())
if lsb_release()["release"] not in ("20.04", "22.04"):
cfg_files.append("/etc/cloud/cloud-init.disabled")
Expand Down Expand Up @@ -467,10 +467,10 @@ def configure_cloud_init(self):
if self.source.current.variant == "core":
# can probably be supported but requires changes
return
for path, content, cmode in self._cloud_init_files():
for path, content, mode in self._cloud_init_files():
path = os.path.join(self.target, path)
os.makedirs(os.path.dirname(path), exist_ok=True)
write_file(path, content, cmode=cmode)
write_file(path, content, mode=mode)

def _media_info(self):
if os.path.exists("/cdrom/.disk/info"):
Expand Down
2 changes: 1 addition & 1 deletion subiquity/server/controllers/shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def copy_logs_to_target(self, context):
["cp", "-aT", "/var/log/installer", target_logs]
)
# Close the permissions from group writes on the target.
set_log_perms(target_logs, isdir=True, group_write=False)
set_log_perms(target_logs, group_write=False)

journal_txt = os.path.join(target_logs, "installer-journal.txt")
try:
Expand Down
13 changes: 5 additions & 8 deletions subiquitycore/file_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
log = logging.getLogger("subiquitycore.file_util")


def set_log_perms(target, *, isdir=True, group_write=False, mode=None):
def set_log_perms(target, *, group_write=False, mode=None, group=_DEF_GROUP):
if os.getuid() != 0:
log.warning(
"set_log_perms: running as non-root - not adjusting"
Expand All @@ -39,27 +39,24 @@ def set_log_perms(target, *, isdir=True, group_write=False, mode=None):
return
if mode is None:
mode = _DEF_PERMS_FILE
if isdir:
if os.path.isdir(target):
mode |= 0o110
if group_write:
mode |= 0o020
os.chmod(target, mode)
os.chown(target, -1, grp.getgrnam(_DEF_GROUP).gr_gid)
os.chown(target, -1, grp.getgrnam(group).gr_gid)


@contextlib.contextmanager
def open_perms(filename, *, cmode=None):
if cmode is None:
cmode = _DEF_PERMS_FILE

def open_perms(filename, **kwargs):
tf = None
try:
dirname = os.path.dirname(filename)
os.makedirs(dirname, exist_ok=True)
tf = tempfile.NamedTemporaryFile(dir=dirname, delete=False, mode="w")
yield tf
tf.close()
set_log_perms(tf.name, mode=cmode)
set_log_perms(tf.name, **kwargs)
os.rename(tf.name, filename)
except OSError as e:
if tf is not None:
Expand Down
4 changes: 2 additions & 2 deletions subiquitycore/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def setup_logger(dir, base="subiquity"):
os.makedirs(dir, exist_ok=True)
# Create the log directory in such a way that users in the group may
# write to this directory in the installation environment.
set_log_perms(dir, isdir=True, group_write=True)
set_log_perms(dir, group_write=True)

logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
Expand All @@ -34,7 +34,7 @@ def setup_logger(dir, base="subiquity"):
nopid_file = os.path.join(dir, "{}-{}.log".format(base, level))
logfile = "{}.{}".format(nopid_file, os.getpid())
handler = logging.FileHandler(logfile)
set_log_perms(logfile, isdir=False, group_write=False)
set_log_perms(logfile, group_write=False)
# os.symlink cannot replace an existing file or symlink so create
# it and then rename it over.
tmplink = logfile + ".link"
Expand Down
98 changes: 97 additions & 1 deletion subiquitycore/tests/test_file_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from subiquitycore.file_util import copy_file_if_exists
from pathlib import Path
from unittest.mock import Mock, patch

from subiquitycore.file_util import (
_DEF_GROUP,
_DEF_PERMS_FILE,
copy_file_if_exists,
set_log_perms,
)
from subiquitycore.tests import SubiTestCase


Expand All @@ -29,3 +37,91 @@ def test_copied_to_non_exist_dir(self):

def test_copied_non_exist_src(self):
copy_file_if_exists("/does/not/exist", "/ditto")


@patch("subiquitycore.file_util.os.getuid", new=Mock(return_value=0))
class TestLogPerms(SubiTestCase):
def setUp(self):
chmod = patch("subiquitycore.file_util.os.chmod")
self.chmod = chmod.start()
self.addCleanup(chmod.stop)
chown = patch("subiquitycore.file_util.os.chown")
self.chown = chown.start()
self.addCleanup(chown.stop)
getgrnam = patch("subiquitycore.file_util.grp.getgrnam")
self.getgrnam = getgrnam.start()
self.addCleanup(getgrnam.stop)
self.mock_gid = 10
self.getgrnam.return_value = Mock(gr_gid=self.mock_gid)

def test_defaults_group(self):
target = self.tmp_dir()
set_log_perms(target)
self.getgrnam.assert_called_once_with(_DEF_GROUP)

def test_defaults_file(self):
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE)
self.chown.assert_called_once_with(target, -1, self.mock_gid)

def test_defaults_dir(self):
target = self.tmp_dir()
set_log_perms(target)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110)
self.chown.assert_called_once_with(target, -1, self.mock_gid)

def test_group_write_file(self):
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target, group_write=True)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o020)
self.chown.assert_called_once_with(target, -1, self.mock_gid)

def test_group_write_dir(self):
target = self.tmp_dir()
set_log_perms(target, group_write=True)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o130)
self.chown.assert_called_once_with(target, -1, self.mock_gid)

def test_nogroup_write_file(self):
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target, group_write=False)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE)
self.chown.assert_called_once_with(target, -1, self.mock_gid)

def test_nogroup_write_dir(self):
target = self.tmp_dir()
set_log_perms(target, group_write=False)
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110)
self.chown.assert_called_once_with(target, -1, self.mock_gid)

def test_mode_file(self):
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target, mode=0o510)
self.chmod.assert_called_once_with(target, 0o510)
self.chown.assert_called_once_with(target, -1, self.mock_gid)

def test_mode_dir(self):
target = self.tmp_dir()
set_log_perms(target, mode=0o510)
self.chmod.assert_called_once_with(target, 0o510)
self.chown.assert_called_once_with(target, -1, self.mock_gid)

def test_group_file(self):
self.getgrnam.return_value = Mock(gr_gid=11)
target = self.tmp_path("file")
Path(target).touch()
set_log_perms(target, group="group1")
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE)
self.chown.assert_called_once_with(target, -1, 11)

def test_group_dir(self):
self.getgrnam.return_value = Mock(gr_gid=11)
target = self.tmp_dir()
set_log_perms(target, group="group1")
self.chmod.assert_called_once_with(target, _DEF_PERMS_FILE | 0o110)
self.chown.assert_called_once_with(target, -1, 11)