Skip to content

Commit

Permalink
fix: Ensure no subp from logs.py import (#5268)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheRealFalcon committed Jun 3, 2024
1 parent 06eaa7a commit 60766db
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 59 deletions.
57 changes: 31 additions & 26 deletions cloudinit/cmd/devel/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import sys
from datetime import datetime
from pathlib import Path
from typing import NamedTuple
from typing import NamedTuple, Optional

from cloudinit.cmd.devel import read_cfg_paths
from cloudinit.stages import Init
Expand All @@ -27,8 +27,23 @@
write_file,
)

PATHS = read_cfg_paths()
CLOUDINIT_RUN_DIR = PATHS.run_dir

class LogPaths(NamedTuple):
userdata_raw: str
cloud_data: str
run_dir: str
instance_data_sensitive: str


def get_log_paths(init: Optional[Init] = None) -> LogPaths:
"""Return a Paths object based on the system configuration on disk."""
paths = init.paths if init else read_cfg_paths()
return LogPaths(
userdata_raw=paths.get_ipath_cur("userdata_raw"),
cloud_data=paths.get_cpath("data"),
run_dir=paths.run_dir,
instance_data_sensitive=paths.lookups["instance_data_sensitive"],
)


class ApportFile(NamedTuple):
Expand Down Expand Up @@ -81,16 +96,6 @@ class ApportFile(NamedTuple):
]


def _get_user_data_file() -> str:
paths = read_cfg_paths()
return paths.get_ipath_cur("userdata_raw")


def _get_cloud_data_path() -> str:
paths = read_cfg_paths()
return paths.get_cpath("data")


def get_parser(parser=None):
"""Build or extend and arg parser for collect-logs utility.
Expand Down Expand Up @@ -122,7 +127,6 @@ def get_parser(parser=None):
" Default: cloud-init.tar.gz"
),
)
user_data_file = _get_user_data_file()
parser.add_argument(
"--include-userdata",
"-u",
Expand All @@ -131,20 +135,20 @@ def get_parser(parser=None):
dest="userdata",
help=(
"Optionally include user-data from {0} which could contain"
" sensitive information.".format(user_data_file)
" sensitive information.".format(get_log_paths().userdata_raw)
),
)
return parser


def _copytree_rundir_ignore_files(curdir, files):
def _get_copytree_ignore_files(paths: LogPaths):
"""Return a list of files to ignore for /run/cloud-init directory"""
ignored_files = [
"hook-hotplug-cmd", # named pipe for hotplug
]
if os.getuid() != 0:
# Ignore root-permissioned files
ignored_files.append(PATHS.lookups["instance_data_sensitive"])
ignored_files.append(paths.instance_data_sensitive)
return ignored_files


Expand Down Expand Up @@ -216,7 +220,6 @@ def collect_logs(tarfile, include_userdata: bool, verbosity=0):
)
return 1

init = Init(ds_deps=[])
tarfile = os.path.abspath(tarfile)
log_dir = datetime.utcnow().date().strftime("cloud-init-logs-%Y-%m-%d")
with tempdir(dir="/tmp") as tmp_dir:
Expand Down Expand Up @@ -250,36 +253,38 @@ def collect_logs(tarfile, include_userdata: bool, verbosity=0):
verbosity=verbosity,
)

init = Init(ds_deps=[])
init.read_cfg()
paths = get_log_paths(init)
for log in get_config_logfiles(init.cfg):
_collect_file(log, log_dir, verbosity)
if include_userdata:
user_data_file = _get_user_data_file()
user_data_file = paths.userdata_raw
_collect_file(user_data_file, log_dir, verbosity)
collect_installer_logs(log_dir, include_userdata, verbosity)

run_dir = os.path.join(log_dir, "run")
ensure_dir(run_dir)
if os.path.exists(CLOUDINIT_RUN_DIR):
if os.path.exists(paths.run_dir):
try:
shutil.copytree(
CLOUDINIT_RUN_DIR,
paths.run_dir,
os.path.join(run_dir, "cloud-init"),
ignore=_copytree_rundir_ignore_files,
ignore=lambda _, __: _get_copytree_ignore_files(paths),
)
except shutil.Error as e:
sys.stderr.write("Failed collecting file(s) due to error:\n")
sys.stderr.write(str(e) + "\n")
_debug("collected dir %s\n" % CLOUDINIT_RUN_DIR, 1, verbosity)
_debug("collected dir %s\n" % paths.run_dir, 1, verbosity)
else:
_debug(
"directory '%s' did not exist\n" % CLOUDINIT_RUN_DIR,
"directory '%s' did not exist\n" % paths.run_dir,
1,
verbosity,
)
if os.path.exists(os.path.join(CLOUDINIT_RUN_DIR, "disabled")):
if os.path.exists(os.path.join(paths.run_dir, "disabled")):
# Fallback to grab previous cloud/data
cloud_data_dir = Path(_get_cloud_data_path())
cloud_data_dir = Path(paths.cloud_data)
if cloud_data_dir.exists():
shutil.copytree(
str(cloud_data_dir),
Expand Down
41 changes: 20 additions & 21 deletions tests/unittests/cmd/devel/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def test_collect_logs_with_userdata_requires_root_user(
" Try sudo cloud-init collect-logs\n" == m_stderr.getvalue()
)

def test_collect_logs_creates_tarfile(self, m_getuid, mocker, tmpdir):
def test_collect_logs_creates_tarfile(
self, m_getuid, m_log_paths, mocker, tmpdir
):
"""collect-logs creates a tarfile with all related cloud-init info."""
m_getuid.return_value = 100
log1 = tmpdir.join("cloud-init.log")
Expand All @@ -46,12 +48,10 @@ def test_collect_logs_creates_tarfile(self, m_getuid, mocker, tmpdir):
write_file(log2, "cloud-init-output-log")
log2_rotated = tmpdir.join("cloud-init-output.log.1.gz")
write_file(log2_rotated, "cloud-init-output-log-rotated")
run_dir = tmpdir.join("run")
write_file(run_dir.join("results.json"), "results")
run_dir = m_log_paths.run_dir
write_file(str(run_dir / "results.json"), "results")
write_file(
run_dir.join(
INSTANCE_JSON_SENSITIVE_FILE,
),
str(m_log_paths.instance_data_sensitive),
"sensitive",
)
output_tarfile = str(tmpdir.join("logs.tgz"))
Expand Down Expand Up @@ -108,7 +108,6 @@ def fake_subprocess_call(cmd, stdout=None, stderr=None):
M_PATH + "subprocess.call", side_effect=fake_subprocess_call
)
mocker.patch(M_PATH + "sys.stderr", fake_stderr)
mocker.patch(M_PATH + "CLOUDINIT_RUN_DIR", run_dir)
mocker.patch(M_PATH + "INSTALLER_APPORT_FILES", [])
mocker.patch(M_PATH + "INSTALLER_APPORT_SENSITIVE_FILES", [])
logs.collect_logs(output_tarfile, include_userdata=False)
Expand Down Expand Up @@ -155,20 +154,20 @@ def fake_subprocess_call(cmd, stdout=None, stderr=None):
fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile)

def test_collect_logs_includes_optional_userdata(
self, m_getuid, mocker, tmpdir
self, m_getuid, mocker, tmpdir, m_log_paths
):
"""collect-logs include userdata when --include-userdata is set."""
m_getuid.return_value = 0
log1 = tmpdir.join("cloud-init.log")
write_file(log1, "cloud-init-log")
log2 = tmpdir.join("cloud-init-output.log")
write_file(log2, "cloud-init-output-log")
userdata = tmpdir.join("user-data.txt")
write_file(userdata, "user-data")
run_dir = tmpdir.join("run")
write_file(run_dir.join("results.json"), "results")
userdata = m_log_paths.userdata_raw
write_file(str(userdata), "user-data")
run_dir = m_log_paths.run_dir
write_file(str(run_dir / "results.json"), "results")
write_file(
run_dir.join(INSTANCE_JSON_SENSITIVE_FILE),
str(m_log_paths.instance_data_sensitive),
"sensitive",
)
output_tarfile = str(tmpdir.join("logs.tgz"))
Expand Down Expand Up @@ -223,23 +222,21 @@ def fake_subprocess_call(cmd, stdout=None, stderr=None):
M_PATH + "subprocess.call", side_effect=fake_subprocess_call
)
mocker.patch(M_PATH + "sys.stderr", fake_stderr)
mocker.patch(M_PATH + "CLOUDINIT_RUN_DIR", run_dir)
mocker.patch(M_PATH + "INSTALLER_APPORT_FILES", [])
mocker.patch(M_PATH + "INSTALLER_APPORT_SENSITIVE_FILES", [])
mocker.patch(M_PATH + "_get_user_data_file", return_value=userdata)
logs.collect_logs(output_tarfile, include_userdata=True)
# unpack the tarfile and check file contents
subp(["tar", "zxvf", output_tarfile, "-C", str(tmpdir)])
out_logdir = tmpdir.join(date_logdir)
assert "user-data" == load_text_file(
os.path.join(out_logdir, "user-data.txt")
os.path.join(out_logdir, userdata.name)
)
assert "sensitive" == load_text_file(
os.path.join(
out_logdir,
"run",
"cloud-init",
INSTANCE_JSON_SENSITIVE_FILE,
m_log_paths.instance_data_sensitive.name,
)
)
fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile)
Expand Down Expand Up @@ -400,7 +397,9 @@ def test_include_installer_logs_when_present(


class TestParser:
def test_parser_help_has_userdata_file(self, mocker, tmpdir):
userdata = str(tmpdir.join("user-data.txt"))
mocker.patch(M_PATH + "_get_user_data_file", return_value=userdata)
assert userdata in re.sub(r"\s+", "", logs.get_parser().format_help())
def test_parser_help_has_userdata_file(self, m_log_paths, mocker, tmpdir):
# userdata = str(tmpdir.join("user-data.txt"))
userdata = m_log_paths.userdata_raw
assert str(userdata) in re.sub(
r"\s+", "", logs.get_parser().format_help()
)
18 changes: 18 additions & 0 deletions tests/unittests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import builtins
import glob
import os
import pathlib
import shutil
from pathlib import Path
from unittest import mock

import pytest

from cloudinit import atomic_helper, log, util
from cloudinit.cmd.devel import logs
from cloudinit.gpg import GPG
from tests.hypothesis import HAS_HYPOTHESIS
from tests.unittests.helpers import example_netdev, retarget_many_wrapper
Expand Down Expand Up @@ -167,3 +169,19 @@ def tmp_path(tmpdir):

settings.register_profile("ci", max_examples=1000)
settings.load_profile(os.getenv("HYPOTHESIS_PROFILE", "default"))


@pytest.fixture
def m_log_paths(mocker, tmp_path):
"""Define logs.LogPaths for testing and mock get_log_paths with it."""
paths = logs.LogPaths(
userdata_raw=tmp_path / "userdata_raw",
cloud_data=tmp_path / "cloud_data",
run_dir=tmp_path / "run_dir",
instance_data_sensitive=tmp_path
/ "run_dir"
/ "instance_data_sensitive",
)
pathlib.Path(paths.run_dir).mkdir()
mocker.patch.object(logs, "get_log_paths", return_value=paths)
yield paths
14 changes: 2 additions & 12 deletions tests/unittests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@
FakeArgs = namedtuple("FakeArgs", ["action", "local", "mode"])


@pytest.fixture()
def mock_get_user_data_file(mocker, tmpdir):
yield mocker.patch(
"cloudinit.cmd.devel.logs._get_user_data_file",
return_value=tmpdir.join("cloud"),
)


@pytest.fixture(autouse=True, scope="module")
def disable_setup_logging():
# setup_basic_logging can change the logging level to WARNING, so
Expand Down Expand Up @@ -255,13 +247,11 @@ def test_modules_subcommand_parser(self, m_status_wrapper, subcommand):
"schema",
],
)
@mock.patch("cloudinit.stages.Init._read_cfg", return_value={})
def test_conditional_subcommands_from_entry_point_sys_argv(
self,
m_read_cfg,
subcommand,
capsys,
mock_get_user_data_file,
m_log_paths,
mock_status_wrapper,
):
"""Subcommands from entry-point are properly parsed from sys.argv."""
Expand All @@ -284,7 +274,7 @@ def test_conditional_subcommands_from_entry_point_sys_argv(
],
)
def test_subcommand_parser(
self, subcommand, mock_get_user_data_file, mock_status_wrapper
self, subcommand, m_log_paths, mock_status_wrapper
):
"""cloud-init `subcommand` calls its subparser."""
# Provide -h param to `subcommand` to avoid having to mock behavior.
Expand Down

0 comments on commit 60766db

Please sign in to comment.