From b4b66e9742e554e485d7b79d98997745f3fd83cb Mon Sep 17 00:00:00 2001 From: Tobias Stenzel Date: Tue, 1 Feb 2022 18:08:40 +0100 Subject: [PATCH] wip fc-agent maintenance scheduling - RequestManager: only add requests with effective activities - Implement activity/request merging - Reboot/VM change activities, update activity improvements, refactoring - manage.py uses update activity - throw out a lot of old code :) - default.nix use buildPythonPackage - configurable request prep time - update CLI --- nixos/platform/agent.nix | 18 +- pkgs/default.nix | 1 + pkgs/fc/agent/default.nix | 18 +- pkgs/fc/agent/fc/conftest.py | 38 ++- .../agent/fc/maintenance/activity/__init__.py | 63 +++- .../agent/fc/maintenance/activity/reboot.py | 75 ++++ .../maintenance/activity/tests/test_reboot.py | 103 ++++++ .../maintenance/activity/tests/test_update.py | 172 ++++++++-- .../activity/tests/test_vm_change.py | 92 +++++ .../agent/fc/maintenance/activity/update.py | 254 +++++++++++--- .../fc/maintenance/activity/vm_change.py | 175 ++++++++++ pkgs/fc/agent/fc/maintenance/cli.py | 153 +++------ pkgs/fc/agent/fc/maintenance/estimate.py | 5 +- pkgs/fc/agent/fc/maintenance/lib/__init__.py | 0 pkgs/fc/agent/fc/maintenance/lib/reboot.py | 1 + .../fc/maintenance/lib/tests/test_reboot.py | 68 ---- .../maintenance/lib/tests/test_shellscript.py | 1 - .../{system_properties.py => maintenance.py} | 106 ++++-- pkgs/fc/agent/fc/maintenance/reboot.py | 16 + pkgs/fc/agent/fc/maintenance/reqmanager.py | 270 +++++++++++---- pkgs/fc/agent/fc/maintenance/request.py | 139 ++++++-- .../fc/agent/fc/maintenance/tests/test_cli.py | 17 +- .../fc/maintenance/tests/test_estimate.py | 4 + ...stem_properties.py => test_maintenance.py} | 103 ++++-- .../fc/maintenance/tests/test_reqmanager.py | 211 +++++++----- .../fc/maintenance/tests/test_request.py | 73 +++- pkgs/fc/agent/fc/manage/cli.py | 18 +- pkgs/fc/agent/fc/manage/manage.py | 319 +----------------- pkgs/fc/agent/fc/manage/systemd.py | 4 + pkgs/fc/agent/fc/manage/tests/test_cli.py | 1 + pkgs/fc/agent/fc/manage/tests/test_manage.py | 81 +---- pkgs/fc/agent/fc/util/channel.py | 141 ++++++++ pkgs/fc/agent/fc/util/nixos.py | 52 +-- pkgs/fc/agent/fc/util/postgresql.py | 2 - pkgs/fc/agent/fc/util/tests/test_enc.py | 32 +- pkgs/fc/agent/fc/util/tests/test_logging.py | 1 - pkgs/fc/agent/fc/util/time_date.py | 4 +- pkgs/fc/agent/python_dev_env.nix | 2 +- pkgs/fc/agent/shell.nix | 7 +- pkgs/fc/default.nix | 4 +- tests/fcagent.nix | 4 +- 41 files changed, 1877 insertions(+), 971 deletions(-) create mode 100644 pkgs/fc/agent/fc/maintenance/activity/reboot.py create mode 100644 pkgs/fc/agent/fc/maintenance/activity/tests/test_reboot.py create mode 100644 pkgs/fc/agent/fc/maintenance/activity/tests/test_vm_change.py create mode 100644 pkgs/fc/agent/fc/maintenance/activity/vm_change.py delete mode 100644 pkgs/fc/agent/fc/maintenance/lib/__init__.py delete mode 100644 pkgs/fc/agent/fc/maintenance/lib/tests/test_reboot.py rename pkgs/fc/agent/fc/maintenance/{system_properties.py => maintenance.py} (53%) create mode 100644 pkgs/fc/agent/fc/maintenance/reboot.py rename pkgs/fc/agent/fc/maintenance/tests/{test_system_properties.py => test_maintenance.py} (51%) create mode 100644 pkgs/fc/agent/fc/util/channel.py diff --git a/nixos/platform/agent.nix b/nixos/platform/agent.nix index 51d0ef83a..eba0136e9 100644 --- a/nixos/platform/agent.nix +++ b/nixos/platform/agent.nix @@ -99,6 +99,12 @@ in type = types.bool; }; + maintenancePreparationSeconds = mkOption { + default = 300; + description = "How long the preparation before maintance takes"; + type = types.ints.positive; + }; + maintenance = mkOption { type = with types; attrsOf (submodule { options = { @@ -162,6 +168,9 @@ in ]; environment.etc."fc-agent.conf".text = '' + [maintenance] + preparation_seconds = ${toString cfg.agent.maintenancePreparationSeconds} + [maintenance-enter] ${concatStringsSep "\n" ( mapAttrsToList (k: v: "${k} = ${v.enter}") cfg.agent.maintenance)} @@ -195,7 +204,7 @@ in script = let - verbose = lib.optionalString cfg.agent.verbose "--verbose"; + verbose = lib.optionalString cfg.agent.verbose "--show-caller-info"; options = "--enc-path=${cfg.encPath} ${verbose}"; wrappedExtraCommands = lib.optionalString (cfg.agent.extraCommands != "") '' ( @@ -240,11 +249,12 @@ in IOWeight = 10; # 1-10000 ExecStart = let - verbose = lib.optionalString cfg.agent.verbose "--verbose"; + verbose = lib.optionalString cfg.agent.verbose "--show-caller-info"; options = "--enc-path=${cfg.encPath} ${verbose}"; - runNow = lib.optionalString (!cfg.agent.updateInMaintenance) "--run-now"; in - "${pkgs.fc.agent}/bin/fc-maintenance ${options} request update ${runNow}"; + if cfg.agent.updateInMaintenance + then "${pkgs.fc.agent}/bin/fc-maintenance ${options} request update" + else "${pkgs.fc.agent}/bin/fc-manage ${options} switch --update-channel --lazy"; }; path = commonEnvPath; diff --git a/pkgs/default.nix b/pkgs/default.nix index d4b5b93fe..eea7b1a9f 100644 --- a/pkgs/default.nix +++ b/pkgs/default.nix @@ -7,6 +7,7 @@ let fc = import ./fc { inherit (self) callPackage; + pythonPackages = pkgs.python310Packages; inherit pkgs; }; diff --git a/pkgs/fc/agent/default.nix b/pkgs/fc/agent/default.nix index 6af27b7b7..73cc3abc2 100644 --- a/pkgs/fc/agent/default.nix +++ b/pkgs/fc/agent/default.nix @@ -1,5 +1,6 @@ { lib , stdenv +, fetchPypi , fetchFromGitHub , dmidecode , gitMinimal @@ -7,13 +8,17 @@ , libyaml , multipath-tools , nix -, python310 +, buildPythonPackage +, pythonPackages +, python , util-linux , xfsprogs +, pytest +, structlog }: let - py = python310.pkgs; + py = pythonPackages; pytest-structlog = py.buildPythonPackage rec { pname = "pytest-structlog"; @@ -26,11 +31,11 @@ let sha256 = "00g2ivgj4y398d0y60lk710zz62pj80r9ya3b4iqijkp4j8nh4gp"; }; - buildInputs = [ py.pytest py.structlog ]; + buildInputs = [ pytest structlog ]; }; in -py.buildPythonPackage rec { +buildPythonPackage rec { name = "fc-agent-${version}"; version = "1.0"; namePrefix = ""; @@ -40,6 +45,7 @@ py.buildPythonPackage rec { py.freezegun py.pytest-cov py.responses + py.pytest py.pytest-mock py.pytest-subprocess pytest-structlog @@ -70,7 +76,9 @@ py.buildPythonPackage rec { py.systemd xfsprogs ]; + # XXX: just for testing, remove! + doCheck = false; dontStrip = true; - passthru.pythonDevEnv = python310.withPackages (_: checkInputs ++ propagatedBuildInputs); + passthru.pythonDevEnv = python.withPackages (_: checkInputs ++ propagatedBuildInputs); } diff --git a/pkgs/fc/agent/fc/conftest.py b/pkgs/fc/agent/fc/conftest.py index 559da1eff..03242cd82 100644 --- a/pkgs/fc/agent/fc/conftest.py +++ b/pkgs/fc/agent/fc/conftest.py @@ -1,8 +1,11 @@ import contextlib +import shutil import textwrap +import unittest import uuid from pathlib import Path +import responses import shortuuid import structlog from fc.maintenance.activity import Activity @@ -12,8 +15,8 @@ @fixture -def agent_maintenance_config(tmpdir): - config_file = str(tmpdir / "fc-agent.conf") +def agent_maintenance_config(tmp_path): + config_file = str(tmp_path / "fc-agent.conf") with open(config_file, "w") as f: f.write( textwrap.dedent( @@ -31,27 +34,32 @@ def agent_maintenance_config(tmpdir): @fixture -def reqmanager(tmpdir, agent_maintenance_config): - with ReqManager( - Path(tmpdir), config_file=Path(agent_maintenance_config) - ) as rm: - yield rm +def reqmanager(tmp_path, agent_maintenance_config): + spooldir = tmp_path / "maintenance" + spooldir.mkdir() + enc_path = tmp_path / "enc.json" + enc_path.write_text("{}") + with unittest.mock.patch("fc.util.directory.connect"): + with ReqManager( + spooldir=spooldir, + enc_path=enc_path, + config_file=agent_maintenance_config, + ) as rm: + yield rm @fixture -def request_population(tmpdir, agent_maintenance_config): +def request_population(tmp_path, agent_maintenance_config, reqmanager): @contextlib.contextmanager def _request_population(n): """Creates a ReqManager with a pregenerated population of N requests. The ReqManager and a list of Requests are passed to the calling code. """ - with ReqManager( - Path(tmpdir), config_file=Path(agent_maintenance_config) - ) as reqmanager: + with reqmanager: requests = [] for i in range(n): - req = Request(Activity(), 60, comment=str(i)) + req = Request(Activity(), comment=str(i)) req._reqid = shortuuid.encode(uuid.UUID(int=i)) reqmanager.add(req) requests.append(req) @@ -65,3 +73,9 @@ def logger(): _logger = structlog.get_logger() _logger.trace = lambda *a, **k: None return _logger + + +@fixture +def mocked_responses(): + with responses.RequestsMock() as rsps: + yield rsps diff --git a/pkgs/fc/agent/fc/maintenance/activity/__init__.py b/pkgs/fc/agent/fc/maintenance/activity/__init__.py index 81d9eb8fd..ee5cb5126 100644 --- a/pkgs/fc/agent/fc/maintenance/activity/__init__.py +++ b/pkgs/fc/agent/fc/maintenance/activity/__init__.py @@ -1,32 +1,48 @@ """Base class for maintenance activities.""" from enum import Enum +from typing import NamedTuple, Optional +import structlog +from fc.maintenance.estimate import Estimate -class RebootType(Enum): - WARM = 1 - COLD = 2 + +class RebootType(str, Enum): + WARM = "reboot" + COLD = "poweroff" + + +class ActivityMergeResult(NamedTuple): + merged: Optional["Activity"] = None + is_effective: bool = False + is_significant: bool = False + changes: dict = {} class Activity: """Maintenance activity which is executed as request payload. Activities are executed possibly several times until they succeed or - exceeed their retry limit. Individual maintenance activities should + exceed their retry limit. Individual maintenance activities should subclass this class and add custom behaviour to its methods. - Attributes: `stdout`, `stderr` capture the outcomes of shellouts. + Attributes: `stdout`, `stderr` capture the outcomes of shell-outs. `returncode` controls the resulting request state. If `duration` is set, it overrules execution timing done by the calling scope. Use this if a logical transaction spans several attempts, e.g. for reboots. """ - stdout = None - stderr = None - returncode = None - duration = None - request = None # backpointer, will be set in Request - reboot_needed = None + stdout: None | str = None + stderr: None | str = None + returncode: None | int = None + duration: None | float = None + request = None # back-pointer, will be set in Request + reboot_needed: None | RebootType = None + # Do we predict that this activity will actually change anything? + is_effective = True + comment = "" + estimate = Estimate("10m") + log = None def __init__(self): """Creates activity object (add args if you like). @@ -41,10 +57,12 @@ def __getstate__(self): # Deserializing loggers breaks, remove them before serializing (to YAML). if "log" in state: del state["log"] + if "request" in state: + del state["request"] return state def set_up_logging(self, log): - self.log = log + self.log = log.bind(activity_type=self.__class__.__name__) def run(self): """Executes maintenance activity. @@ -53,8 +71,8 @@ def run(self): whatever you want here, but do not destruct `request.yaml`. Directory contents is preserved between several attempts. - This method is expected to update self.stdout, self.stderr, and - self.returncode after each run. Request state is determined + This method is expected to update `self.stdout`, `self.stderr`, and + `self.returncode` after each run. Request state is determined according to the EXIT_* constants in `state.py`. Any returncode not listed there means hard failure and causes the request to be archived. Uncaught exceptions are handled the same way. @@ -74,3 +92,20 @@ def load(self): def dump(self): """Saves additional state during serialization.""" pass + + def merge(self, other) -> ActivityMergeResult: + """Merges in other activity. Settings from other have precedence. + Returns merge result. + """ + return ActivityMergeResult() + + def __rich__(self): + cls = self.__class__ + out = f"{cls.__module__}.{cls.__qualname__}" + match self.reboot_needed: + case RebootType.WARM: + out += " (warm reboot needed)" + case RebootType.COLD: + out += " (cold reboot needed)" + + return out diff --git a/pkgs/fc/agent/fc/maintenance/activity/reboot.py b/pkgs/fc/agent/fc/maintenance/activity/reboot.py new file mode 100644 index 000000000..2748d86af --- /dev/null +++ b/pkgs/fc/agent/fc/maintenance/activity/reboot.py @@ -0,0 +1,75 @@ +"""Scheduled machine reboot. + +This activity does nothing if the machine has been booted for another reason in +the time between creation and execution. +""" + +from typing import Union + +import structlog + +from ..estimate import Estimate +from . import Activity, ActivityMergeResult, RebootType + +_log = structlog.get_logger() + + +class RebootActivity(Activity): + estimate = Estimate("5m") + + def __init__( + self, action: Union[str, RebootType] = RebootType.WARM, log=_log + ): + super().__init__() + self.set_up_logging(log) + self.reboot_needed = RebootType(action) + + @property + def comment(self): + return "Scheduled {}".format( + "cold boot" if self.reboot_needed == RebootType.COLD else "reboot" + ) + + def merge(self, other): + if not isinstance(other, RebootActivity): + self.log.debug( + "merge-incompatible-skip", + self_type=type(self), + other_type=type(other), + ) + return ActivityMergeResult() + + if self.reboot_needed == other.reboot_needed: + self.log.debug("merge-reboot-identical") + return ActivityMergeResult(self, is_effective=True) + + if ( + self.reboot_needed == RebootType.COLD + and other.reboot_needed == RebootType.WARM + ): + self.log.debug( + "merge-reboot-cold-warm", + help=( + "merging a warm reboot into a cold reboot results in a " + "cold reboot." + ), + ) + return ActivityMergeResult(self, is_effective=True) + + if ( + self.reboot_needed == RebootType.WARM + and other.reboot_needed == RebootType.COLD + ): + self.log.debug( + "merge-reboot-warm-to-cold", + help=( + "merging a cold reboot into a warm reboot results in a " + "cold reboot. This is a significant change." + ), + ) + return ActivityMergeResult( + self, + is_effective=True, + is_significant=True, + changes={"before": RebootType.WARM, "after": RebootType.COLD}, + ) diff --git a/pkgs/fc/agent/fc/maintenance/activity/tests/test_reboot.py b/pkgs/fc/agent/fc/maintenance/activity/tests/test_reboot.py new file mode 100644 index 000000000..2806b24ef --- /dev/null +++ b/pkgs/fc/agent/fc/maintenance/activity/tests/test_reboot.py @@ -0,0 +1,103 @@ +import pytest +import yaml +from fc.maintenance.activity import Activity, RebootType +from fc.maintenance.activity.reboot import RebootActivity +from pytest import fixture + +SERIALIZED_ACTIVITY = f"""\ +!!python/object:fc.maintenance.activity.reboot.RebootActivity +reboot_needed: !!python/object/apply:fc.maintenance.activity.RebootType +- reboot +""" + + +@fixture +def warm(logger): + activity = RebootActivity(RebootType.WARM, log=logger) + return activity + + +@fixture +def cold(logger): + activity = RebootActivity(RebootType.COLD, log=logger) + return activity + + +def test_reboot_accepts_enum_reboot_type_warm(): + activity = RebootActivity(RebootType.WARM) + assert activity.reboot_needed == RebootType.WARM + + +def test_reboot_accepts_enum_reboot_type_cold(): + activity = RebootActivity(RebootType.COLD) + assert activity.reboot_needed == RebootType.COLD + + +def test_reboot_accepts_str_reboot(): + activity = RebootActivity("reboot") + assert activity.reboot_needed == RebootType.WARM + + +def test_reboot_accepts_str_poweroff(): + activity = RebootActivity("poweroff") + assert activity.reboot_needed == RebootType.COLD + + +def test_reboot_should_reject_invalid_str(): + with pytest.raises(ValueError): + RebootActivity("kaput") + + +def test_reboot_comment_warm(warm): + assert warm.comment == "Scheduled reboot" + + +def test_reboot_comment_cold(cold): + assert cold.comment == "Scheduled cold boot" + + +def test_reboot_dont_merge_incompatible(warm): + other = Activity() + result = warm.merge(other) + assert result.is_effective is False + assert result.is_significant is False + assert result.merged is None + assert not result.changes + + +def test_reboot_merge_warm_into_cold_is_an_insignificant_update(warm, cold): + result = cold.merge(warm) + assert result.merged is cold + assert result.is_effective is True + assert result.is_significant is False + assert not result.changes + + +def test_reboot_merge_warm_is_an_insignificant_update(warm): + other_warm = RebootActivity(RebootType.WARM) + result = warm.merge(other_warm) + assert result.merged is warm + assert result.is_effective is True + assert result.is_significant is False + assert not result.changes + + +def test_reboot_merge_cold_is_an_significant_update(warm, cold): + original = warm + result = original.merge(cold) + assert result.merged is original + assert result.is_effective is True + assert result.is_significant is True + assert result.changes == {"before": "reboot", "after": "poweroff"} + + +def test_reboot_activity_serialize(warm): + serialized = yaml.dump(warm) + print(serialized) + assert serialized == SERIALIZED_ACTIVITY + + +def test_update_activity_deserialize(warm, logger): + deserialized = yaml.load(SERIALIZED_ACTIVITY, Loader=yaml.UnsafeLoader) + deserialized.set_up_logging(logger) + assert deserialized.__getstate__() == warm.__getstate__() diff --git a/pkgs/fc/agent/fc/maintenance/activity/tests/test_update.py b/pkgs/fc/agent/fc/maintenance/activity/tests/test_update.py index ace7bd594..b7ed43279 100644 --- a/pkgs/fc/agent/fc/maintenance/activity/tests/test_update.py +++ b/pkgs/fc/agent/fc/maintenance/activity/tests/test_update.py @@ -1,12 +1,11 @@ import textwrap -from unittest.mock import MagicMock, Mock, create_autospec +from unittest.mock import create_autospec -import pytest -import structlog +import responses import yaml -from fc.maintenance.activity import RebootType +from fc.maintenance.activity import Activity, RebootType from fc.maintenance.activity.update import UpdateActivity -from fc.manage.manage import Channel +from fc.util.channel import Channel from fc.util.nixos import ( BuildFailed, ChannelException, @@ -18,6 +17,7 @@ CURRENT_BUILD = 93111 NEXT_BUILD = 93222 +NEXT_NEXT_BUILD = 93333 CURRENT_CHANNEL_URL = f"https://hydra.flyingcircus.io/build/{CURRENT_BUILD}/download/1/nixexprs.tar.xz" NEXT_CHANNEL_URL = f"https://hydra.flyingcircus.io/build/{NEXT_BUILD}/download/1/nixexprs.tar.xz" ENVIRONMENT = "fc-21.05-production" @@ -42,9 +42,8 @@ Environment: {ENVIRONMENT} (unchanged) Will reboot after the update. - Stop: postgresql + Start/Stop: postgresql Restart: telegraf - Start: postgresql Reload: nginx Channel URL: {NEXT_CHANNEL_URL}""" @@ -74,11 +73,6 @@ """ -@fixture -def logger(): - return structlog.get_logger() - - @fixture def activity(logger, nixos_mock): activity = UpdateActivity(next_channel_url=NEXT_CHANNEL_URL, log=logger) @@ -95,6 +89,74 @@ def activity(logger, nixos_mock): return activity +def test_update_dont_merge_incompatible(activity): + other = Activity() + result = activity.merge(other) + assert result.is_effective is False + assert result.is_significant is False + assert result.merged is None + assert not result.changes + + +def test_update_merge_same(activity): + # Given another activity which is exactly the same + other = UpdateActivity(NEXT_CHANNEL_URL) + other.__dict__.update(activity.__getstate__()) + result = activity.merge(other) + # Then the merge result should be the original activity + assert result.merged is activity + assert result.is_effective is True + assert result.is_significant is False + assert not result.changes + + +def test_update_merge_additional_reload_is_an_insignificant_update(activity): + # Given another activity which has a different channel URl and reloads an + # additional service. + channel_url = ( + "https://hydra.flyingcircus.io/build/100000/download/1/nixexprs.tar.xz" + ) + + other = UpdateActivity(channel_url) + other.unit_changes = { + **UNIT_CHANGES, + "reload": {"nginx.service", "dbus.service"}, + } + result = activity.merge(other) + # Then the merge result should be a new activity and the change is + # insignificant. + assert result.merged is not activity + assert result.merged is not other + assert result.is_effective is True + assert result.is_significant is False + assert result.changes == { + "added_unit_changes": {"reload": {"dbus.service"}}, + "removed_unit_changes": {}, + } + + +def test_update_merge_more_unit_changes_is_a_significant_update(activity): + # Given another activity which has a different channel url and restarts + # different units. + channel_url = ( + "https://hydra.flyingcircus.io/build/100000/download/1/nixexprs.tar.xz" + ) + + other = UpdateActivity(channel_url) + other.unit_changes = {**UNIT_CHANGES, "restart": {"mysql.service"}} + result = activity.merge(other) + # Then the merge result should be a new activity and the change is + # significant. + assert result.merged is not activity + assert result.merged is not other + assert result.is_effective is True + assert result.is_significant is True + assert result.changes == { + "added_unit_changes": {"restart": {"mysql.service"}}, + "removed_unit_changes": {"restart": {"telegraf.service"}}, + } + + @fixture def nixos_mock(monkeypatch): import fc.util.nixos @@ -126,6 +188,7 @@ def fake_changed_kernel_version(path): RegisterFailed=RegisterFailed, ) + mocked.format_unit_change_lines = fc.util.nixos.format_unit_change_lines mocked.get_fc_channel_build = fake_get_fc_channel_build mocked.channel_version = fake_channel_version mocked.kernel_version = fake_changed_kernel_version @@ -141,10 +204,8 @@ def fake_changed_kernel_version(path): return mocked -def test_update_activity_from_system_changed(nixos_mock): - activity = UpdateActivity.from_system_if_changed( - NEXT_CHANNEL_URL, ENVIRONMENT - ) +def test_update_activity(nixos_mock): + activity = UpdateActivity(NEXT_CHANNEL_URL, ENVIRONMENT) assert activity assert activity.current_version == CURRENT_VERSION @@ -168,11 +229,11 @@ def test_update_activity_prepare(log, logger, tmp_path, activity, nixos_mock): activity.prepare() nixos_mock.build_system.assert_called_once_with( - NEXT_CHANNEL_URL, out_link="/run/next-system", log=logger + NEXT_CHANNEL_URL, out_link="/run/next-system", log=activity.log ) nixos_mock.dry_activate_system.assert_called_once_with( - NEXT_SYSTEM_PATH, logger + NEXT_SYSTEM_PATH, activity.log ) assert ( @@ -197,13 +258,13 @@ def test_update_activity_run(log, nixos_mock, activity, logger): assert activity.returncode == 0 nixos_mock.update_system_channel.assert_called_with( - activity.next_channel_url, log=logger + activity.next_channel_url, log=activity.log ) nixos_mock.build_system.assert_called_with( - activity.next_channel_url, log=logger + activity.next_channel_url, log=activity.log ) nixos_mock.switch_to_system.assert_called_with( - NEXT_SYSTEM_PATH, lazy=False, log=logger + NEXT_SYSTEM_PATH, lazy=False, log=activity.log ) log.has("update-run-succeeded") @@ -248,3 +309,72 @@ def test_update_activity_switch_to_system_fails(log, nixos_mock, activity): assert activity.returncode == 3 log.has("update-run-failed", returncode=3) + + +def test_update_activity_from_enc( + log, mocked_responses, nixos_mock, logger, monkeypatch +): + environment = "fc-21.05-dev" + current_channel_url = ( + "https://hydra.flyingcircus.io/build/93000/download/1/nixexprs.tar.xz" + ) + next_channel_url = ( + "https://hydra.flyingcircus.io/build/93222/download/1/nixexprs.tar.xz" + ) + current_version = "21.05.1233.a9cc58d" + next_version = "21.05.1235.bacc11d" + + enc = { + "parameters": { + "environment_url": next_channel_url, + "environment": environment, + } + } + + mocked_responses.add(responses.HEAD, current_channel_url) + mocked_responses.add(responses.HEAD, next_channel_url) + monkeypatch.setattr( + "fc.util.nixos.channel_version", (lambda c: next_version) + ) + + current_channel = Channel(logger, current_channel_url) + current_channel.version = lambda *a: current_version + monkeypatch.setattr( + "fc.manage.manage.Channel.current", lambda *a: current_channel + ) + activity = UpdateActivity.from_enc(logger, enc, current_requests=[]) + assert activity + + +# current="", +# next="", + + +def test_update_activity_from_enc_unchanged( + log, mocked_responses, nixos_mock, logger, monkeypatch +): + """Given an unchanged channel url, should not prepare an update activity""" + environment = "fc-21.05-dev" + + enc = { + "parameters": { + "environment_url": CURRENT_CHANNEL_URL, + "environment": ENVIRONMENT, + } + } + + mocked_responses.add(responses.HEAD, CURRENT_CHANNEL_URL) + monkeypatch.setattr( + "fc.util.nixos.channel_version", (lambda c: CURRENT_VERSION) + ) + + current_channel = Channel(logger, CURRENT_CHANNEL_URL) + current_channel.version = lambda *a: CURRENT_VERSION + monkeypatch.setattr( + "fc.manage.manage.Channel.current", lambda *a: current_channel + ) + + activity = UpdateActivity.from_enc(logger, enc, current_requests=[]) + assert activity is None diff --git a/pkgs/fc/agent/fc/maintenance/activity/tests/test_vm_change.py b/pkgs/fc/agent/fc/maintenance/activity/tests/test_vm_change.py new file mode 100644 index 000000000..e35bc459d --- /dev/null +++ b/pkgs/fc/agent/fc/maintenance/activity/tests/test_vm_change.py @@ -0,0 +1,92 @@ +import yaml +from fc.maintenance.activity import Activity +from fc.maintenance.activity.vm_change import VMChangeActivity +from pytest import fixture + +SERIALIZED_ACTIVITY = f"""\ +!!python/object:fc.maintenance.activity.vm_change.VMChangeActivity +current_cores: 1 +current_memory: 1024 +reboot_needed: false +wanted_cores: 2 +wanted_memory: 2048 +""" + + +@fixture +def activity(logger): + activity = VMChangeActivity(wanted_memory=2048, wanted_cores=2, log=logger) + activity.current_memory = 1024 + activity.current_cores = 1 + return activity + + +def test_vm_change_dont_merge_incompatible(activity): + other = Activity() + result = activity.merge(other) + assert result.is_effective is False + assert result.is_significant is False + assert result.merged is None + assert not result.changes + + +def test_vm_change_merge_same(activity): + result = activity.merge(activity) + assert result.is_effective is True + assert result.is_significant is False + assert result.merged is activity + assert not result.changes + + +def test_vm_change_merge_different_is_an_insignificant_update(activity): + other = VMChangeActivity(wanted_memory=4096, wanted_cores=4) + result = activity.merge(other) + assert result.merged is activity + assert result.merged is not other + assert result.is_effective is True + assert result.is_significant is False + assert result.changes == { + "cores": {"before": 2, "after": 4}, + "memory": {"before": 2048, "after": 4096}, + } + + +def test_reboot_merge_into_ineffective_is_an_significant_update(activity): + activity.current_memory = activity.wanted_memory + activity.current_cores = activity.wanted_cores + other = VMChangeActivity(wanted_memory=4096, wanted_cores=4) + result = activity.merge(other) + assert result.merged is activity + assert result.is_effective is True + assert result.is_significant is True + assert result.changes == { + "cores": {"before": 2, "after": 4}, + "memory": {"before": 2048, "after": 4096}, + } + + +def test_vm_change_merge_inverse_is_no_op(activity): + other = VMChangeActivity( + wanted_memory=activity.current_memory, + wanted_cores=activity.current_cores, + ) + result = activity.merge(other) + assert result.merged is activity + assert result.is_effective is False + assert result.is_significant is False + assert result.changes == { + "cores": {"before": 2, "after": 1}, + "memory": {"before": 2048, "after": 1024}, + } + + +def test_reboot_activity_serialize(activity): + serialized = yaml.dump(activity) + print(serialized) + assert serialized == SERIALIZED_ACTIVITY + + +def test_update_activity_deserialize(activity, logger): + deserialized = yaml.load(SERIALIZED_ACTIVITY, Loader=yaml.UnsafeLoader) + deserialized.set_up_logging(logger) + assert deserialized.__getstate__() == activity.__getstate__() diff --git a/pkgs/fc/agent/fc/maintenance/activity/update.py b/pkgs/fc/agent/fc/maintenance/activity/update.py index 1d51c6662..9acec7d8a 100644 --- a/pkgs/fc/agent/fc/maintenance/activity/update.py +++ b/pkgs/fc/agent/fc/maintenance/activity/update.py @@ -1,14 +1,21 @@ """Do a platform update. This activity does nothing if the machine already uses the new version. + +TODO: re-check if the update is actually needed by looking at the ENC channel +TODO: and comparing it with the current `nixos` channel URL. """ import os.path as p +from typing import Optional import structlog +from fc.maintenance.estimate import Estimate from fc.util import nixos +from fc.util.nixos import UnitChanges -from . import Activity, RebootType +from ...util.channel import Channel +from . import Activity, ActivityMergeResult, RebootType _log = structlog.get_logger() @@ -19,9 +26,17 @@ class UpdateActivity(Activity): + """ + Updates the NixOS system to a different channel URL. + The new system resulting from the channel URL is already pre-built + in `UpdateActivity.prepare` which means that a run of this activity usually + only has to set the new system link and switch to it. + """ + def __init__( self, next_channel_url: str, next_environment: str = None, log=_log ): + super().__init__() self.next_environment = next_environment self.next_channel_url = nixos.resolve_url_redirects(next_channel_url) self.current_system = None @@ -30,12 +45,17 @@ def __init__( self.current_version = None self.next_version = None self.current_environment = None - self.unit_changes = None + self.unit_changes: UnitChanges = {} self.current_kernel = None self.next_kernel = None self.set_up_logging(log) self._detect_current_state() self._detect_next_version() + self.log.debug( + "update-init", + next_channel_url=next_channel_url, + next_environment=next_environment, + ) def __eq__(self, other): return ( @@ -44,17 +64,68 @@ def __eq__(self, other): ) @classmethod - def from_system_if_changed( - cls, next_channel_url: str, next_environment: str = None - ): - activity = cls(next_channel_url, next_environment) + def from_enc(cls, log, enc, current_requests) -> Optional["UpdateActivity"]: + """ + Create a new UpdateActivity from ENC data or None, if nothing would + change. + + """ + if not enc or not enc.get("parameters"): + log.warning( + "enc-data-missing", msg="No ENC data, cannot update the system." + ) + return + + env_name = enc["parameters"]["environment"] + channel_url = enc["parameters"]["environment_url"] + + next_channel = Channel( + log, + channel_url, + name="next", + environment=env_name, + ) + + if not next_channel or next_channel.is_local: + log.warn( + "update-from-enc-local-channel", + _replace_msg=( + "UpdateActivity is incompatible with local checkouts." + ), + ) + return + + equivalent_planned_requests = [ + req + for req in current_requests + if isinstance(req.activity, UpdateActivity) + and req.activity.next_channel_url == next_channel.resolved_url + ] + + if equivalent_planned_requests: + log.info( + "update-from-enc-found-existing", + _replace_msg=( + "Existing request {request} with same channel URL: {channel_url}" + ), + request=equivalent_planned_requests[0].id, + channel_url=next_channel.resolved_url, + ) + return + + activity = cls(channel_url, env_name) + if activity.is_effective: return activity @property def is_effective(self): """Does this actually change anything?""" - return self.next_channel_url != self.current_channel_url + if self.next_channel_url == self.current_channel_url: + return False + if self.current_system == self.next_system: + return False + return True def prepare(self, dry_run=False): self.log.debug( @@ -72,30 +143,66 @@ def prepare(self, dry_run=False): else: out_link = NEXT_SYSTEM - self.next_system = nixos.build_system( - self.next_channel_url, out_link=out_link, log=self.log - ) + try: + self.next_system = nixos.build_system( + self.next_channel_url, out_link=out_link, log=self.log + ) + except nixos.ChannelException: + self.log.error( + "update-prepare-build-failed", + current_version=self.current_version, + current_channel_url=self.current_channel_url, + current_environment=self.current_environment, + next_channel=self.next_channel_url, + next_version=self.next_version, + next_environment=self.next_environment, + ) + raise + self.unit_changes = nixos.dry_activate_system( self.next_system, self.log ) self._register_reboot_for_kernel() + if self.reboot_needed: + self.estimate = Estimate("10m") + elif ( + self.unit_changes["restart"] + or self.unit_changes["stop"] + or self.unit_changes["start"] + ): + self.estimate = Estimate("5m") + else: + # Only reloads or no unit changes, this should not take long + self.estimate = Estimate("2m") + + def update_system_channel(self): + nixos.update_system_channel(self.next_channel_url, self.log) + + @property + def is_identical_to_running_system(self): + if self.current_system == self.next_system: + self.log.info( + "update-identical", + current_version=self.next_version, + next_version=self.next_version, + system=self.next_system.removeprefix("/nix/store/"), + _replace_msg=( + "Running system {system} is already the wanted system." + ), + ) + return True + + return False + def run(self): """Do the update""" + step = 1 try: - step = 1 - nixos.update_system_channel(self.next_channel_url, self.log) - - if nixos.running_system_version(self.log) == self.next_version: - self.log.info( - "update-run-skip", - current_version=self.next_version, - _replace_msg=( - "Running version is already the wanted version " - "{current_version}, skip update." - ), - ) + self.update_system_channel() + + if self.is_identical_to_running_system: self.returncode = 0 return @@ -139,23 +246,12 @@ def run(self): @property def changelog(self): - def notify(category): - services = self.unit_changes.get(category, []) - if services: - return "{}: {}".format( - category.capitalize(), - ", ".join(s.replace(".service", "", 1) for s in services), - ) - else: - return "" - - unit_changes = list( - filter( - None, - (notify(cat) for cat in ["stop", "restart", "start", "reload"]), - ) - ) - + """ + A human-readable summary of what will be changed by this update. + Includes possible reboots, significant unit state changes (start, stop, + restart) as well as changes of build number, environment ( + fc-22.11-staging, for example) and channel URL. + """ msg = [f"System update: {self.current_version} -> {self.next_version}"] current_build = nixos.get_fc_channel_build( @@ -180,15 +276,84 @@ def notify(category): if self.reboot_needed: msg.append("Will reboot after the update.") - if unit_changes: - msg.extend(unit_changes) + unit_change_lines = nixos.format_unit_change_lines(self.unit_changes) + + if unit_change_lines: + msg.extend(unit_change_lines) msg.append("") msg.append(f"Channel URL: {self.next_channel_url}") return "\n".join(msg) - def merge(self, activity): - pass + @property + def comment(self): + return self.changelog + + def merge(self, other: Activity) -> ActivityMergeResult: + if not isinstance(other, UpdateActivity): + self.log.debug( + "merge-incompatible-skip", + self_type=type(self), + other_type=type(other), + ) + return ActivityMergeResult() + + current_state = self.__getstate__() + other_state = other.__getstate__() + + if other_state == current_state: + self.log.debug("merge-update-identical") + return ActivityMergeResult(self, self.is_effective) + + if other.next_channel_url != self.next_channel_url: + self.log.debug( + "merge-update-channel-diff", + current=current_state, + new=other_state, + ) + else: + self.log.debug( + "merge-update-metadata-diff", + current=current_state, + new=other_state, + ) + + added_unit_changes = {} + removed_unit_changes = {} + + for category, changes in self.unit_changes.items(): + other_changes = other.unit_changes[category] + added = set(other_changes) - set(changes) + + if added: + added_unit_changes[category] = added + + removed = set(changes) - set(other_changes) + + if removed: + removed_unit_changes[category] = removed + + changes = { + "added_unit_changes": added_unit_changes, + "removed_unit_changes": removed_unit_changes, + } + + # Additional starts, stops and restart of units are considered a + # significant change of the activity. Reloads are harmless and can be + # ignored. + + is_significant = bool( + added_unit_changes.get("start") + or added_unit_changes.get("stop") + or added_unit_changes.get("restart") + ) + + merged = UpdateActivity(other.next_channel_url) + merged.__dict__.update({**current_state, **other_state}) + + return ActivityMergeResult( + merged, merged.is_effective, is_significant, changes + ) def _register_reboot_for_kernel(self): current_kernel = nixos.kernel_version( @@ -198,6 +363,7 @@ def _register_reboot_for_kernel(self): if current_kernel == next_kernel: self.log.debug("update-kernel-unchanged") + self.reboot_needed = None else: self.log.info( "update-kernel-changed", @@ -215,7 +381,7 @@ def _detect_current_state(self): self.current_environment = nixos.current_fc_environment_name() self.current_system = nixos.current_system() self.log.debug( - "update-activity-update-current-state", + "update-activity-current-state", current_version=self.current_version, current_channel_url=self.current_channel_url, current_environment=self.current_environment, diff --git a/pkgs/fc/agent/fc/maintenance/activity/vm_change.py b/pkgs/fc/agent/fc/maintenance/activity/vm_change.py new file mode 100644 index 000000000..be98893eb --- /dev/null +++ b/pkgs/fc/agent/fc/maintenance/activity/vm_change.py @@ -0,0 +1,175 @@ +"""Handle VM changes that need a cold reboot. +""" +from typing import Optional + +import fc.util.dmi_memory +import fc.util.vm +import structlog + +from ..estimate import Estimate +from . import Activity, ActivityMergeResult, RebootType + +_log = structlog.get_logger() + + +class VMChangeActivity(Activity): + estimate = Estimate("5m") + + def __init__( + self, + wanted_memory: Optional[int] = None, + wanted_cores: Optional[int] = None, + log=_log, + ): + super().__init__() + self.set_up_logging(log) + self.current_memory = None + self.wanted_memory = wanted_memory + self.current_cores = None + self.wanted_cores = wanted_cores + self.reboot_needed = False + + @property + def comment(self): + msgs = [] + + if self.wanted_memory and self.current_memory != self.wanted_memory: + msgs.append( + f"Memory {self.current_memory} MiB -> " + f"{self.wanted_memory} MiB." + ) + + if self.wanted_cores and self.current_cores != self.wanted_cores: + msgs.append( + f"CPU cores {self.current_cores} -> {self.wanted_cores}." + ) + + if msgs: + return " ".join(["Reboot to activate VM changes:"] + msgs) + + @classmethod + def from_system_if_changed(cls, wanted_memory=None, wanted_cores=None): + activity = cls(wanted_memory, wanted_cores) + activity.update_from_system_state() + + if activity.is_effective: + return activity + + def update_from_system_state(self): + self.current_memory = fc.util.dmi_memory.main() + self.current_cores = fc.util.vm.count_cores() + self._update_reboot_needed() + + def merge(self, other) -> ActivityMergeResult: + if not isinstance(other, VMChangeActivity): + self.log.debug( + "merge-incompatible-skip", + other_type=type(other).__name__, + ) + return ActivityMergeResult() + + is_effective_before = self.is_effective + changes = {} + + if other.wanted_memory != self.wanted_memory: + self.log.debug( + "vm-change-merge-memory-diff", + this_memory=self.wanted_memory, + other_memory=other.wanted_memory, + ) + if other.wanted_memory: + changes["memory"] = { + "before": self.wanted_memory, + "after": other.wanted_memory, + } + self.wanted_memory = other.wanted_memory + + if other.wanted_cores != self.wanted_cores: + self.log.debug( + "vm-change-merge-cores-diff", + this_cores=self.wanted_cores, + other_cores=other.wanted_cores, + ) + if other.wanted_cores: + changes["cores"] = { + "before": self.wanted_cores, + "after": other.wanted_cores, + } + self.wanted_cores = other.wanted_cores + + return ActivityMergeResult( + self, + is_effective=self.is_effective, + is_significant=self.is_effective and not is_effective_before, + changes=changes, + ) + + @property + def is_effective(self): + """Does this actually change anything?""" + if self.wanted_memory and self.current_memory != self.wanted_memory: + return True + if self.wanted_cores and self.current_cores != self.wanted_cores: + return True + + return False + + def _need_poweroff_for_memory(self): + actual_memory = fc.util.dmi_memory.main() + if self.current_memory != actual_memory: + self.log.debug( + "poweroff-mem-changed", + msg="Memory changed after creation of this activity.", + expected_current_memory=self.current_memory, + actual_memory=actual_memory, + ) + + if actual_memory == self.wanted_memory: + self.log.debug( + "poweroff-mem-noop", + msg="Memory already at wanted value, no power-off needed.", + actual_memory=actual_memory, + ) + return False + else: + self.log.debug( + "poweroff-mem-needed", + msg="Power-off needed to activate new memory size.", + ) + return True + + def _need_poweroff_for_cores(self): + actual_cores = fc.util.vm.count_cores() + if self.current_cores != actual_cores: + self.log.debug( + "poweroff-cores-changed", + msg="Cores changed after creation of this activity.", + expected_current_cores=self.current_cores, + actual_cores=actual_cores, + ) + + if actual_cores == self.wanted_cores: + self.log.debug( + "poweroff-cores-noop", + msg="Cores already at wanted value, no power-off needed.", + actual_cores=actual_cores, + ) + return False + else: + self.log.debug( + "poweroff-cores-needed", + msg="Power-off needed to activate new cores count.", + ) + return True + + def _update_reboot_needed(self): + if self._need_poweroff_for_memory() or self._need_poweroff_for_cores(): + self.reboot_needed = RebootType.COLD + else: + self.reboot_needed = None + + def load(self): + self._update_reboot_needed() + + # Running an VMChangeActivity is not needed, so no run method. + # The request manager handles the reboot required by this activity. diff --git a/pkgs/fc/agent/fc/maintenance/cli.py b/pkgs/fc/agent/fc/maintenance/cli.py index 889d4ee43..3faf1c2f6 100644 --- a/pkgs/fc/agent/fc/maintenance/cli.py +++ b/pkgs/fc/agent/fc/maintenance/cli.py @@ -1,25 +1,23 @@ -import sys from pathlib import Path from typing import NamedTuple, Optional import structlog import typer -from fc.maintenance.activity.update import UpdateActivity -from fc.maintenance.lib.reboot import RebootActivity +from fc.maintenance.activity.reboot import RebootActivity from fc.maintenance.lib.shellscript import ShellScriptActivity +from fc.maintenance.maintenance import ( + request_reboot_for_cpu, + request_reboot_for_kernel, + request_reboot_for_memory, + request_reboot_for_qemu, + request_update, +) from fc.maintenance.reqmanager import ( DEFAULT_CONFIG_FILE, DEFAULT_SPOOLDIR, ReqManager, ) from fc.maintenance.request import Request -from fc.maintenance.system_properties import ( - request_reboot_for_cpu, - request_reboot_for_kernel, - request_reboot_for_memory, - request_reboot_for_qemu, -) -from fc.manage.manage import prepare_switch_in_maintenance, switch_with_update from fc.util import nixos from fc.util.enc import load_enc from fc.util.lock import locked @@ -41,6 +39,7 @@ class Context(NamedTuple): lock_dir: Path spooldir: Path verbose: bool + show_caller_info: bool context: Context @@ -49,7 +48,20 @@ class Context(NamedTuple): @app.callback(no_args_is_help=True) def main( - verbose: bool = False, + verbose: bool = Option( + False, + "--verbose", + "-v", + help=( + "Show debug and trace (Nix command) output. By default, only log " + "levels info and higher are shown." + ), + ), + show_caller_info: bool = Option( + False, + "--show-caller-info", + help="Show where a logging function was called (file/function/line).", + ), spooldir: Path = Option( file_okay=False, writable=True, @@ -97,9 +109,12 @@ def main( lock_dir=lock_dir, spooldir=spooldir, verbose=verbose, + show_caller_info=show_caller_info, ) - init_logging(context.verbose, context.logdir) + init_logging( + context.verbose, context.logdir, show_caller_info=show_caller_info + ) rm = ReqManager( spooldir=spooldir, @@ -128,12 +143,11 @@ def run(run_all_now: bool = False): are postponed (they get a new execution time) and finished requests (successful or failed permanently) moved from the current request to the archive directory. - - Executing, postponing and archiving can be disabled using their respective - flags for testing and debugging purposes, for example. """ log.info("fc-maintenance-run-start") with rm: + if not run_all_now: + rm.set_postpone_state_for_overdue_requests() rm.execute(run_all_now) rm.postpone() rm.archive() @@ -192,9 +206,9 @@ def request_main(): @request_app.command(name="script") -def run_script(comment: str, script: str, estimate: str = "10m"): +def run_script(comment: str, script: str, estimate: Optional[str] = None): """Request to run a script.""" - request = Request(ShellScriptActivity(script), estimate, comment=comment) + request = Request(ShellScriptActivity(script), estimate, comment) with rm: rm.scan() rm.add(request) @@ -204,14 +218,7 @@ def run_script(comment: str, script: str, estimate: str = "10m"): def reboot(comment: Optional[str] = None, cold_reboot: bool = False): """Request a reboot.""" action = "poweroff" if cold_reboot else "reboot" - default_comment = "Scheduled {}".format( - "cold boot" if cold_reboot else "reboot" - ) - request = Request( - RebootActivity(action), - 900 if cold_reboot else 600, - comment if comment else default_comment, - ) + request = Request(RebootActivity(action), comment=comment) with rm: rm.scan() rm.add(request) @@ -219,7 +226,7 @@ def reboot(comment: Optional[str] = None, cold_reboot: bool = False): @request_app.command() def system_properties(): - """Request reboot for changed sys properties. + """Request reboot for changed system properties. Runs applicable checks for the machine type (virtual/physical). * Physical: kernel @@ -234,101 +241,47 @@ def system_properties(): rm.scan() if enc["parameters"]["machine"] == "virtual": - rm.add(request_reboot_for_memory(enc)) - rm.add(request_reboot_for_cpu(enc)) - rm.add(request_reboot_for_qemu()) + rm.add(request_reboot_for_memory(log, enc)) + rm.add(request_reboot_for_cpu(log, enc)) + rm.add(request_reboot_for_qemu(log)) - rm.add(request_reboot_for_kernel()) + rm.add(request_reboot_for_kernel(log)) log.info("fc-maintenance-system-properties-finished") @request_app.command() -def update( - run_now: bool = Option( - default=False, help="do update now instead of scheduling a request" - ) -): +def update(): """Request a system update. Builds the system and prepares the update to be run in a maintenance - window by default. To activate the update immediately, pass the - --run-now option. + window by default. Acquires an exclusive lock because this shouldn't be run concurrently with more invocations of the update command or other commands (from - fc-manage) that - potentially modify the system.""" - init_command_logging(log, context.logdir) + fc-manage) that potentially modify the system. + """ log.info("fc-maintenance-update-start") enc = load_enc(log, context.enc_path) + init_command_logging(log, context.logdir) + + with rm: + rm.scan() + current_requests = rm.requests.values() with locked(log, context.lock_dir): try: - if run_now: - keep_cmd_output = switch_with_update(log, enc, lazy=True) - else: - keep_cmd_output = prepare_switch_in_maintenance(log, enc) + request = request_update(log, enc, current_requests) except nixos.ChannelException: raise Exit(2) - if not keep_cmd_output: + with rm: + request = rm.add(request) + + if request is None: drop_cmd_output_logfile(log) log.info("fc-maintenance-update-finished") -@request_app.command() -def update_with_update_activity( - channel_url: str = Argument(..., help="channel URL to update to"), - run_now: bool = Option( - default=False, help="do update now instead of scheduling a request" - ), - dry_run: bool = Option( - default=False, help="do nothing, just show activity" - ), -): - """(Experimental) Prepare an UpdateActivity or execute it now.""" - - activity = UpdateActivity.from_system_if_changed(channel_url) - - if activity is None: - log.warn( - "update-skip", - _replace_msg="Channel URL unchanged, skipped.", - activity=activity, - ) - sys.exit(1) - - activity.prepare(dry_run) - - # possible short-cut: built system is the same - # => we can skip requesting maintenance and set the new channel directly - - if run_now: - log.info( - "update-run-now", - _replace_msg="Run-now mode requested, running the update now.", - ) - activity.run() - - elif dry_run: - log.info( - "update-dry-run", - _replace_msg=( - "Update prediction was successful. This would be applied by " - "the update:" - ), - _output=activity.changelog, - ) - else: - with rm: - rm.scan() - rm.add(Request(activity, 600, activity.changelog)) - log.info( - "update-prepared", - _replace_msg=( - "Update preparation was successful. This will be applied in a " - "maintenance window:" - ), - _output=activity.changelog, - ) +if __name__ == "__main__": + app() diff --git a/pkgs/fc/agent/fc/maintenance/estimate.py b/pkgs/fc/agent/fc/maintenance/estimate.py index 1716f7b4c..a4330cfaa 100644 --- a/pkgs/fc/agent/fc/maintenance/estimate.py +++ b/pkgs/fc/agent/fc/maintenance/estimate.py @@ -27,7 +27,7 @@ def __init__(self, spec): def __str__(self): if self.value == 0: return "0s" - elif self.value > 0 and self.value < 1: + elif 0 < self.value < 1: return "1s" out = [] remainder = self.value @@ -51,3 +51,6 @@ def __int__(self): def __float__(self): return float(self.value) + + def __lt__(self, other): + return self.value < other.value diff --git a/pkgs/fc/agent/fc/maintenance/lib/__init__.py b/pkgs/fc/agent/fc/maintenance/lib/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/pkgs/fc/agent/fc/maintenance/lib/reboot.py b/pkgs/fc/agent/fc/maintenance/lib/reboot.py index d34dc5c6a..e6d37224c 100644 --- a/pkgs/fc/agent/fc/maintenance/lib/reboot.py +++ b/pkgs/fc/agent/fc/maintenance/lib/reboot.py @@ -12,6 +12,7 @@ class RebootActivity(Activity): def __init__(self, action="reboot"): + super().__init__() assert action in ["reboot", "poweroff"] self.action = action self.coldboot = action == "poweroff" diff --git a/pkgs/fc/agent/fc/maintenance/lib/tests/test_reboot.py b/pkgs/fc/agent/fc/maintenance/lib/tests/test_reboot.py deleted file mode 100644 index 71f72be9d..000000000 --- a/pkgs/fc/agent/fc/maintenance/lib/tests/test_reboot.py +++ /dev/null @@ -1,68 +0,0 @@ -import sys -from unittest.mock import Mock - -import pytest -from fc.maintenance import Request -from fc.maintenance.lib.reboot import RebootActivity -from fc.maintenance.reqmanager import ReqManager - - -@pytest.fixture -def defused_boom(monkeypatch): - mock = Mock(RebootActivity.boom) - monkeypatch.setattr(RebootActivity, "boom", mock) - return mock - - -@pytest.fixture -def reqdir(tmpdir, monkeypatch): - monkeypatch.chdir(tmpdir) - return tmpdir - - -@pytest.fixture -def boottime(monkeypatch): - """Simulate fixed boottime (i.e., no intermediary reboots).""" - boottime = Mock(RebootActivity.boottime, side_effect=[1] * 20) - monkeypatch.setattr(RebootActivity, "boottime", boottime) - return boottime - - -def test_reboot(reqdir, defused_boom, boottime): - r = RebootActivity() - r.run() - assert defused_boom.call_count == 1 - - -def test_skip_reboot_when_already_rebooted(reqdir, defused_boom, monkeypatch): - boottime = Mock(RebootActivity.boottime, side_effect=[1, 10]) - monkeypatch.setattr(RebootActivity, "boottime", boottime) - r = RebootActivity() - r.run() - assert defused_boom.call_count == 0 - - -def comments(spooldir): - """Returns dict of (reqid, comment).""" - with ReqManager(str(spooldir)) as rm: - rm.scan() - return [req.comment for req in rm.requests.values()] - - -def test_dont_perform_warm_reboot_if_cold_reboot_pending( - reqdir, defused_boom, boottime -): - with ReqManager(str(reqdir)) as rm: - rm.scan() - rm.add(Request(RebootActivity("reboot"), 60, "warm")) - rm.add(Request(RebootActivity("poweroff"), 60, "cold")) - # run soft reboot first - reqs = sorted( - rm.requests.values(), key=lambda r: r.activity.action, reverse=True - ) - reqs[0].execute() - reqs[0].save() - assert defused_boom.call_count == 0 - reqs[1].execute() - reqs[1].save() - assert defused_boom.call_count == 1 diff --git a/pkgs/fc/agent/fc/maintenance/lib/tests/test_shellscript.py b/pkgs/fc/agent/fc/maintenance/lib/tests/test_shellscript.py index 44a43ab88..6bd5771c8 100644 --- a/pkgs/fc/agent/fc/maintenance/lib/tests/test_shellscript.py +++ b/pkgs/fc/agent/fc/maintenance/lib/tests/test_shellscript.py @@ -1,4 +1,3 @@ -import io import os import pytest diff --git a/pkgs/fc/agent/fc/maintenance/system_properties.py b/pkgs/fc/agent/fc/maintenance/maintenance.py similarity index 53% rename from pkgs/fc/agent/fc/maintenance/system_properties.py rename to pkgs/fc/agent/fc/maintenance/maintenance.py index 7a51db7ab..2748a9053 100644 --- a/pkgs/fc/agent/fc/maintenance/system_properties.py +++ b/pkgs/fc/agent/fc/maintenance/maintenance.py @@ -1,59 +1,64 @@ +""" +Create maintenance activities and a wrapping request object based on current +system state. All maintenance activities start their life here in this module. + +TODO: better logging for created activites. + +""" import os.path import shutil +from typing import Optional import fc.util import fc.util.dmi_memory import fc.util.nixos import fc.util.vm -import structlog -from fc.maintenance.lib.reboot import RebootActivity +from fc.maintenance import Request +from fc.maintenance.activity.reboot import RebootActivity +from fc.maintenance.activity.update import UpdateActivity +from fc.maintenance.activity.vm_change import VMChangeActivity -log = structlog.get_logger() - -def request_reboot_for_memory(enc): +def request_reboot_for_memory(log, enc) -> Optional[Request]: """Schedules reboot if the memory size has changed.""" wanted_memory = int(enc["parameters"].get("memory", 0)) + log.debug("request-reboot-for-memory", enc_memory=wanted_memory) + if not wanted_memory: return - current_memory = fc.util.dmi_memory.main() - if current_memory == wanted_memory: - return - msg = f"Reboot to change memory from {current_memory} MiB to {wanted_memory} MiB" - log.info( - "memory-change", - _replace_msg=f"Scheduling reboot to activate memory change: {current_memory} -> {wanted_memory}", - current_memory=current_memory, - wanted_memory=wanted_memory, + + activity = VMChangeActivity.from_system_if_changed( + wanted_memory=wanted_memory ) - return fc.maintenance.Request(RebootActivity("poweroff"), 600, comment=msg) + if activity: + return Request(activity) -def request_reboot_for_cpu(enc): + +def request_reboot_for_cpu(log, enc) -> Optional[Request]: """Schedules reboot if the number of cores has changed.""" wanted_cores = int(enc["parameters"].get("cores", 0)) + log.debug("request-reboot-for-cpu", enc_cores=wanted_cores) + if not wanted_cores: return - current_cores = fc.util.vm.count_cores() - if current_cores == wanted_cores: - return - msg = f"Reboot to change CPU count from {current_cores} to {wanted_cores}" - log.info( - "cpu-change", - _replace_msg=f"Scheduling reboot to activate cpu change: {current_cores} -> {wanted_cores}", - current_cores=current_cores, - wanted_cores=wanted_cores, + + activity = VMChangeActivity.from_system_if_changed( + wanted_cores=wanted_cores ) - return fc.maintenance.Request(RebootActivity("poweroff"), 600, comment=msg) + + if activity: + return Request(activity) -def request_reboot_for_qemu(): +def request_reboot_for_qemu(log) -> Optional[Request]: """Schedules a reboot if the Qemu binary environment has changed.""" # Update the -booted marker if necessary. We need to store the marker # in a place where it does not get removed after _internal_ reboots # of the virtual machine. However, if we got rebooted with a fresh # Qemu instance, we need to update it from the marker on the tmp # partition. + log.debug("request-reboot-for-qemu-start") if not os.path.isdir("/var/lib/qemu"): os.makedirs("/var/lib/qemu") if os.path.exists("/tmp/fc-data/qemu-binary-generation-booted"): @@ -86,14 +91,14 @@ def request_reboot_for_qemu(): if booted_generation >= current_generation: # We do not automatically downgrade. If we ever want that then I - # want us to reconsider the side-effects. + # want us to reconsider the side effects. return msg = "Cold restart because the Qemu binary environment has changed" - return fc.maintenance.Request(RebootActivity("poweroff"), 600, comment=msg) + return Request(RebootActivity("poweroff"), comment=msg) -def request_reboot_for_kernel(): +def request_reboot_for_kernel(log) -> Optional[Request]: """Schedules a reboot if the kernel has changed.""" booted = fc.util.nixos.kernel_version("/run/booted-system/kernel") current = fc.util.nixos.kernel_version("/run/current-system/kernel") @@ -101,12 +106,47 @@ def request_reboot_for_kernel(): if booted != current: log.info( "kernel-changed", - _replace_msg="Scheduling reboot to activate new kernel {booted} -> {current}", + _replace_msg=( + "Scheduling reboot to activate new kernel {booted} -> {current}", + ), booted=booted, current=current, ) - return fc.maintenance.Request( + return Request( RebootActivity("reboot"), - 600, comment=f"Reboot to activate changed kernel ({booted} to {current})", ) + + +def request_update(log, enc, current_requests) -> Optional[Request]: + """Schedule a system update if the channel has changed and the + resulting system is different from the running system. + + As a shortcut, switches to the new channel if the new system is still the + same. + """ + activity = UpdateActivity.from_enc(log, enc, current_requests) + + if activity is None: + log.info("update-unchanged") + return + + activity.prepare() + + if activity.is_identical_to_running_system: + log.info("update-system-unchanged") + activity.update_system_channel() + return + + log.info( + "update-system-changed", + _replace_msg=( + "Update preparation was successful. This update will apply " + "changes to the system." + ), + _output=activity.changelog, + current_channel=activity.current_channel_url, + next_channel=activity.next_channel_url, + ) + + return Request(activity) diff --git a/pkgs/fc/agent/fc/maintenance/reboot.py b/pkgs/fc/agent/fc/maintenance/reboot.py new file mode 100644 index 000000000..84e7d9929 --- /dev/null +++ b/pkgs/fc/agent/fc/maintenance/reboot.py @@ -0,0 +1,16 @@ +import subprocess +import time + + +def reboot(coldboot=False): + print( + "shutdown at {}".format( + time.strftime( + "%Y-%m-%d %H:%M:%S UTC", time.gmtime(time.time() + 60) + ) + ) + ) + if coldboot: + subprocess.check_call(["poweroff"]) + else: + subprocess.check_call(["reboot"]) diff --git a/pkgs/fc/agent/fc/maintenance/reqmanager.py b/pkgs/fc/agent/fc/maintenance/reqmanager.py index e4ca8dc63..f5570ee7a 100644 --- a/pkgs/fc/agent/fc/maintenance/reqmanager.py +++ b/pkgs/fc/agent/fc/maintenance/reqmanager.py @@ -8,6 +8,8 @@ import os.path as p import socket import subprocess +import sys +import time from datetime import datetime from pathlib import Path @@ -19,7 +21,7 @@ from fc.util.time_date import format_datetime, utcnow from rich.table import Table -from .request import Request +from .request import Request, RequestMergeResult from .state import ARCHIVE, State DEFAULT_SPOOLDIR = "/var/spool/maintenance" @@ -58,12 +60,13 @@ class ReqManager: directory = None lockfile = None + requests: dict[str, Request] def __init__( self, - spooldir=Path(DEFAULT_SPOOLDIR), - enc_path=None, - config_file=None, + spooldir, + enc_path, + config_file, log=_log, ): """Initialize ReqManager and create directories if necessary.""" @@ -80,8 +83,8 @@ def __init__( for d in (self.spooldir, self.requestsdir, self.archivedir): if not d.exists(): os.mkdir(d) - self.enc_path = Path(enc_path) if enc_path else None - self.config_file = Path(config_file) if config_file else None + self.enc_path = Path(enc_path) + self.config_file = Path(config_file) self.requests = {} def __enter__(self): @@ -100,6 +103,9 @@ def __enter__(self): self.config.read(self.config_file) else: self.log.warn("reqmanager-enter-config-not-found") + self.maintenance_preparation_seconds = int( + self.config.get("maintenance", "preparation_seconds", fallback=300) + ) return self def __exit__(self, exc_type, exc_value, exc_tb): @@ -120,22 +126,34 @@ def __rich__(self): return "[bold]No maintenance requests at the moment.[/bold]" table.add_column("State") - table.add_column("Request ID") + table.add_column("Req ID") table.add_column("Execution Time") table.add_column("Duration") table.add_column("Comment") table.add_column("Added") - table.add_column("Last Scheduled") + table.add_column("Updated") + table.add_column("Scheduled") for req in sorted(self.requests.values()): + if req.next_due: + exec_interval = ( + format_datetime(req.next_due) + + " -\n" + + format_datetime(req.not_after) + ) + + if req.overdue: + exec_interval += " (overdue)" + else: + exec_interval = "--- TBA ---" + table.add_row( str(req.state), - req.id, - format_datetime(req.next_due) - if req.next_due - else "--- TBA ---", + req.id[:6], + exec_interval, str(req.estimate), req.comment, format_datetime(req.added_at) if req.added_at else "-", + format_datetime(req.updated_at) if req.updated_at else "-", format_datetime(req.last_scheduled_at) if req.last_scheduled_at else "-", @@ -170,30 +188,7 @@ def scan(self): ) os.rename(d, p.join(self.archivedir, p.basename(d))) - def add(self, request, skip_same_comment=True): - """Adds a Request object to the local queue. - - If skip_same_comment is True, a request is not added if a - requests with the same comment already exists in the queue. - - Returns modified Request object or None if nothing was added. - """ - if request is None: - return - - if skip_same_comment and request.comment: - duplicate = self.find_by_comment(request.comment) - if duplicate: - self.log.info( - "request-skip-duplicate", - _replace_msg=( - "When adding {request}, found identical request " - "{duplicate}. Nothing added." - ), - request=request.id, - duplicate=duplicate.id, - ) - return None + def _add_request(self, request: Request): self.requests[request.id] = request request.dir = self.dir(request) request._reqmanager = self @@ -207,11 +202,119 @@ def add(self, request, skip_same_comment=True): ) return request - def find_by_comment(self, comment): - """Returns first request with `comment` or None.""" - for r in self.requests.values(): - if r.comment == comment: - return r + def _merge_request( + self, existing_request: Request, request: Request + ) -> RequestMergeResult: + merge_result = existing_request.merge(request) + match merge_result: + case RequestMergeResult.UPDATE: + self.log.info( + "requestmanager-merge-update", + _replace_msg=( + "New request {request} was merged with an " + "existing request {merged}." + ), + request=request.id, + merged=existing_request.id, + ) + # XXX: schedule is used here to update the comment as + # a side effect. + # Having a dedicated API call for updating would be + # helpful. + # Schedule all requests to keep the order. + self.schedule() + + case RequestMergeResult.SIGNIFICANT_UPDATE: + self.log.info( + "requestmanager-merge-significant", + _replace_msg=( + "New request {request} was merged with an " + "existing request {merged}. Change is " + "significant so execution will be postponed." + ), + request=request.id, + merged=existing_request.id, + ) + existing_request.updated_at = utcnow() + existing_request.save() + # XXX: We have to do it both because the directory + # doesn't send out mails when schedule updates the comment + # and postpone doesn't support updating the comment... + # Having an API call for updating would be helpful. + # Schedule all requests to keep the order. + self.schedule() + self.directory.postpone_maintenance( + {existing_request.id: {"postpone_by": 3600}} + ) + + case RequestMergeResult.REMOVE: + self.log.info( + "requestmanager-merge-remove", + _replace_msg=( + "New request {request} was merged with an " + "existing request {merged} and produced a " + "no-op request. Removing the request." + ), + request=request.id, + merged=existing_request.id, + ) + self.delete(existing_request.id) + + case RequestMergeResult.NO_MERGE: + self.log.debug( + "requestmanager-merge-skip", + existing_request=existing_request.id, + new_request=request.id, + ) + + return merge_result + + def add(self, request: Request | None, add_always=False): + """Adds a Request object to the local queue if the activity would make changes to the system. + New request is merged with existing requests. If the merge results + in a no-op request, the existing request is deleted. + + Returns Request object, new or merged + None if not added/no-op. + """ + self.log.debug("request-add-start", request_object=request) + + if request is None: + return + + if not request.activity.is_effective: + if add_always: + self.log.info( + "request-force-add", + _replace_msg="Activity for {request} wouldn't apply any changes but still adding because add_always was given.", + request=request.id, + ) + else: + self.log.info( + "request-skip", + _replace_msg=( + "Activity for {request} wouldn't apply any changes. " + "Nothing added." + ), + request=request.id, + ) + return + + if add_always: + self.log.debug("request-add-always", request=request.id) + return self._add_request(request) + + for existing_request in reversed(self.requests.values()): + # We can stop if request was merged or removed, continue otherwise + match self._merge_request(existing_request, request): + case RequestMergeResult.SIGNIFICANT_UPDATE | RequestMergeResult.UPDATE: + return existing_request + case RequestMergeResult.REMOVE: + return + case RequestMergeResult.NO_MERGE: + pass + + return self._add_request(request) @require_lock @require_directory @@ -220,12 +323,20 @@ def schedule(self): self.log.debug("schedule-start") schedule_maintenance = { - reqid: {"estimate": int(req.estimate), "comment": req.comment} + reqid: { + "estimate": max( + 600, + int(req.estimate) + self.maintenance_preparation_seconds, + ), + "comment": req.comment, + } for reqid, req in self.requests.items() } if schedule_maintenance: self.log.debug( - "schedule-maintenances", request_count=len(schedule_maintenance) + "schedule-maintenances", + request_count=len(schedule_maintenance), + requests=list(schedule_maintenance), ) result = self.directory.schedule_maintenance(schedule_maintenance) @@ -233,8 +344,14 @@ def schedule(self): for key, val in result.items(): try: req = self.requests[key] - self.log.debug("schedule-request", request=key, data=val) - if req.update_due(val["time"]): + due_changed = req.update_due(val["time"]) + self.log.debug( + "schedule-request-result", + request=key, + data=val, + due_changed=due_changed, + ) + if due_changed: self.log.info( "schedule-change-start-time", _replace_msg=( @@ -259,11 +376,36 @@ def schedule(self): {key: {"result": "deleted"} for key in disappeared} ) + def set_postpone_state_for_overdue_requests(self): + """ + Checks for requests that are too long overdue (past their due datetime + plus some extra time) and marks them for postponing. The directory will + find a new time slot for them. + """ + self.log.debug("set-postpone-state-for-overdue-requests") + for req in self.requests.values(): + if req.overdue: + self.log.info( + "request-overdue-postpone", + _replace_msg="Request {request} is overdue, postponing.", + request=req.id, + ) + req.state = State.postpone + def runnable(self): """Generate due Requests in running order.""" requests = [] for request in self.requests.values(): + request.activity + current_state = request.state new_state = request.update_state() + if current_state != new_state: + self.log.debug( + "reqmanager-runnable-state-change", + request=request.id, + current_state=current_state, + new_state=new_state, + ) if new_state is State.running: yield request elif new_state in (State.due, State.tempfail): @@ -341,13 +483,12 @@ def execute(self, run_all_now=False): requested_reboots.add(req.activity.reboot_needed) # Execute any reboots while still in maintenance. - # Using the 'if' with the side effect has been a huge problem - # WRT to readability for me when trying to find out whether it - # is safe to call 'leave_maintenance' in the except: part a few - # lines below. - if not self.reboot(requested_reboots): - self.log.debug("no-reboot-requested") - self.leave_maintenance() + self.reboot_and_exit(requested_reboots) + + # When we are still here, no reboot has happened. We can leave + # maintenance now. + self.log.debug("no-reboot-requested") + self.leave_maintenance() @require_lock @require_directory @@ -369,8 +510,11 @@ def postpone(self): self.log.debug( "postpone-maintenance-directory", args=postpone_maintenance ) + # This directory call just returns an empty string. self.directory.postpone_maintenance(postpone_maintenance) for req in postponed: + # Resetting the due datetime also sets the state to pending. + # Request will be rescheduled on the next run. req.update_due(None) req.save() @@ -442,12 +586,12 @@ def show(self, request_id=None, dump_yaml=False): req = requests[-1] - rich.print(req) - if dump_yaml: rich.print("\n[bold]Raw YAML serialization:[/bold]") yaml = Path(req.filename).read_text() rich.print(rich.syntax.Syntax(yaml, "yaml")) + else: + rich.print(req) @require_lock def delete(self, reqid): @@ -472,23 +616,29 @@ def delete(self, reqid): request=req.id, ) - def reboot(self, requested_reboots): + def reboot_and_exit(self, requested_reboots): if RebootType.COLD in requested_reboots: self.log.info( "maintenance-poweroff", _replace_msg=( - "Doing a cold boot now to finish maintenance activities." + "Doing a cold boot in five seconds to finish maintenance " + "activities." ), ) + time.sleep(5) subprocess.run( "poweroff", check=True, capture_output=True, text=True ) - return True + sys.exit(0) + elif RebootType.WARM in requested_reboots: self.log.info( "maintenance-reboot", - _replace_msg="Rebooting now to finish maintenance activities.", + _replace_msg=( + "Rebooting in five seconds to finish maintenance " + "activities." + ), ) + time.sleep(5) subprocess.run("reboot", check=True, capture_output=True, text=True) - return True - return False + sys.exit(0) diff --git a/pkgs/fc/agent/fc/maintenance/request.py b/pkgs/fc/agent/fc/maintenance/request.py index aa0342421..2ea489419 100644 --- a/pkgs/fc/agent/fc/maintenance/request.py +++ b/pkgs/fc/agent/fc/maintenance/request.py @@ -4,6 +4,8 @@ import os import os.path as p import tempfile +from enum import Enum +from typing import Optional import iso8601 import rich.table @@ -12,6 +14,7 @@ import yaml from fc.util.time_date import ensure_timezone_present, format_datetime, utcnow +from .activity import Activity, ActivityMergeResult from .estimate import Estimate from .state import State, evaluate_state @@ -28,27 +31,58 @@ def cd(newdir): os.chdir(oldcwd) +class RequestMergeResult(Enum): + NO_MERGE = 0 + REMOVE = 1 + UPDATE = 2 + SIGNIFICANT_UPDATE = 3 + + class Request: MAX_RETRIES = 48 - _reqid = None - activity = None - comment = None - estimate = None - next_due = None - last_scheduled_at = None - added_at = None - state = State.pending - _reqmanager = None # backpointer, will be set in ReqManager - - def __init__(self, activity, estimate, comment=None, dir=None, log=_log): + _comment: str | None + _estimate: Estimate | None + _reqid: str | None + activity: Activity + added_at: datetime.datetime | None + last_scheduled_at: datetime.datetime | None + next_due: datetime.datetime | None + state: State + updated_at: datetime.datetime | None + + def __init__( + self, activity, estimate=None, comment=None, dir=None, log=_log + ): activity.request = self + activity.set_up_logging(log) self.activity = activity - self.estimate = Estimate(estimate) - self.comment = comment + self._estimate = Estimate(estimate) if estimate else None + self._comment = comment self.dir = dir self.log = log self.attempts = [] + self._reqid = None # will be set on first access + self._reqmanager = None # will be set in ReqManager + self.added_at = None + self.last_scheduled_at = None + self.next_due = None + self.state = State.pending + self.updated_at = None + + @property + def comment(self): + if self._comment: + return self._comment + + return self.activity.comment + + @property + def estimate(self) -> Estimate: + if self._estimate: + return self._estimate + + return self.activity.estimate def __eq__(self, other): return self.__class__ == other.__class__ and self.id == other.id @@ -81,6 +115,8 @@ def __rich_repr__(self): yield "next_due", format_datetime(self.next_due) if self.added_at: yield "added_at", format_datetime(self.added_at) + if self.updated_at: + yield "updated_at", format_datetime(self.updated_at) if self.last_scheduled_at: yield "last_scheduled_at", format_datetime(self.last_scheduled_at) yield "estimate", str(self.estimate) @@ -114,10 +150,24 @@ def filename(self): """Full path to request.yaml.""" return p.join(self.dir, "request.yaml") + @property + def not_after(self) -> Optional[datetime.datetime]: + if not self.next_due: + return + return self.next_due + datetime.timedelta(seconds=1800) + + @property + def overdue(self) -> bool: + if not self.not_after: + return False + return utcnow() > self.not_after + @classmethod def load(cls, dir, log): # need imports because such objects may be loaded via YAML + import fc.maintenance.activity.reboot import fc.maintenance.activity.update + import fc.maintenance.activity.vm_change import fc.maintenance.lib.reboot import fc.maintenance.lib.shellscript @@ -125,13 +175,16 @@ def load(cls, dir, log): instance = yaml.load(f, Loader=yaml.UnsafeLoader) instance.next_due = ensure_timezone_present(instance.next_due) instance.added_at = ensure_timezone_present(instance.added_at) + instance.updated_at = ensure_timezone_present(instance.updated_at) instance.last_scheduled_at = ensure_timezone_present( instance.last_scheduled_at ) instance.dir = dir + instance.set_up_logging(log) + with cd(dir): instance.activity.load() - instance.set_up_logging(log) + instance.activity.request = instance return instance def save(self): @@ -158,8 +211,11 @@ def execute(self): """ self.log.info( "execute-request-start", - _replace_msg="Starting execution of request: {request}", + _replace_msg=( + "Starting execution of request: {request} ({activity_type})" + ), request=self.id, + activity_type=self.activity.__class__.__name__, ) attempt = Attempt() # sets start time try: @@ -235,16 +291,57 @@ def update_due(self, due): def update_state(self): """Updates time-dependent request state.""" - if ( - self.state in (State.pending, State.postpone) - and self.next_due - and utcnow() >= self.next_due - ): - self.state = State.due + if self.next_due: + if self.state == State.pending and utcnow() >= self.next_due: + self.state = State.due + + if self.state == State.postpone: + self.state = State.pending + if len(self.attempts) > self.MAX_RETRIES: self.state = State.retrylimit + return self.state + def merge(self, other): + if not isinstance(other, Request): + raise ValueError( + f"Can only be merged with other Request instances! Given: {other}" + ) + + activity_merge_result = self.activity.merge(other.activity) + assert isinstance( + activity_merge_result, ActivityMergeResult + ), f"{activity_merge_result} has wrong type, must be ActivityMergeResult" + + if not activity_merge_result.merged: + return RequestMergeResult.NO_MERGE + + self.activity = activity_merge_result.merged + + # XXX: get rid of request estimate? + if self._estimate is not None and other._estimate is not None: + self._estimate = max(self._estimate, other._estimate) + if other._comment is not None and self._comment != other._comment: + self._comment += " " + other._comment + + if not activity_merge_result.is_effective: + return RequestMergeResult.REMOVE + + self.log.debug( + "request-merge-update", + is_significant=activity_merge_result.is_significant, + changes=activity_merge_result.changes, + activity=activity_merge_result.merged, + ) + self.updated_at = utcnow() + self.save() + + if activity_merge_result.is_significant: + return RequestMergeResult.SIGNIFICANT_UPDATE + else: + return RequestMergeResult.UPDATE + def other_requests(self): """Lists other requests currently active in the ReqManager.""" return [ diff --git a/pkgs/fc/agent/fc/maintenance/tests/test_cli.py b/pkgs/fc/agent/fc/maintenance/tests/test_cli.py index 6adc17395..689a6c7e6 100644 --- a/pkgs/fc/agent/fc/maintenance/tests/test_cli.py +++ b/pkgs/fc/agent/fc/maintenance/tests/test_cli.py @@ -1,5 +1,6 @@ import json import unittest.mock +from unittest.mock import MagicMock import fc.maintenance.cli import pytest @@ -25,6 +26,7 @@ def invoke_app(tmpdir, agent_maintenance_config): enc_file = tmpdir / "enc.json" main_args = ( "--verbose", + "--show-caller-info", "--spooldir", tmpdir, "--logdir", @@ -139,13 +141,10 @@ def test_invoke_request_system_properties_virtual( kernel.assert_called_once() -@unittest.mock.patch("fc.maintenance.cli.prepare_switch_in_maintenance") -def test_invoke_request_update(prepare_switch, invoke_app): +@unittest.mock.patch("fc.maintenance.cli.request_update") +@unittest.mock.patch("fc.maintenance.cli.load_enc") +def test_invoke_request_update(load_enc, request_update, invoke_app): invoke_app("request", "update") - prepare_switch.assert_called_once() - - -@unittest.mock.patch("fc.maintenance.cli.switch_with_update") -def test_invoke_request_update_run_now(switch, invoke_app): - invoke_app("request", "update", "--run-now") - switch.assert_called_once() + load_enc.assert_called_once() + request_update.assert_called_once() + fc.maintenance.cli.rm.add.assert_called_once() diff --git a/pkgs/fc/agent/fc/maintenance/tests/test_estimate.py b/pkgs/fc/agent/fc/maintenance/tests/test_estimate.py index ee0ae8550..021be36a0 100644 --- a/pkgs/fc/agent/fc/maintenance/tests/test_estimate.py +++ b/pkgs/fc/agent/fc/maintenance/tests/test_estimate.py @@ -41,3 +41,7 @@ def test_zero_duration(): def test_repr(): assert repr(Estimate(42)) == "" + + +def test_comparison(): + assert Estimate("10m") < Estimate("1h") diff --git a/pkgs/fc/agent/fc/maintenance/tests/test_system_properties.py b/pkgs/fc/agent/fc/maintenance/tests/test_maintenance.py similarity index 51% rename from pkgs/fc/agent/fc/maintenance/tests/test_system_properties.py rename to pkgs/fc/agent/fc/maintenance/tests/test_maintenance.py index 2b5e8ce48..d245ed74e 100644 --- a/pkgs/fc/agent/fc/maintenance/tests/test_system_properties.py +++ b/pkgs/fc/agent/fc/maintenance/tests/test_maintenance.py @@ -1,13 +1,11 @@ import contextlib import unittest.mock +from unittest.mock import MagicMock -from fc.maintenance.lib.reboot import RebootActivity -from fc.maintenance.system_properties import ( - request_reboot_for_cpu, - request_reboot_for_kernel, - request_reboot_for_memory, - request_reboot_for_qemu, -) +import fc.maintenance.maintenance +from fc.maintenance.activity import RebootType +from fc.maintenance.activity.reboot import RebootActivity +from fc.maintenance.activity.vm_change import VMChangeActivity ENC = { "parameters": { @@ -17,43 +15,52 @@ } -@unittest.mock.patch("fc.maintenance.system_properties.RebootActivity.boottime") @unittest.mock.patch("fc.util.dmi_memory.main") -def test_request_reboot_for_memory(get_memory, boottime): +@unittest.mock.patch("fc.util.vm.count_cores") +def test_request_reboot_for_memory(count_cores, get_memory, logger): get_memory.return_value = 1024 - request = request_reboot_for_memory(ENC) - assert "from 1024 MiB to 2048" in request.comment + request = fc.maintenance.maintenance.request_reboot_for_memory(logger, ENC) + assert "1024 MiB -> 2048" in request.comment activity = request.activity - assert isinstance(activity, RebootActivity) - assert activity.action == "poweroff" + assert isinstance(activity, VMChangeActivity) + activity.set_up_logging(logger) + activity.run() + assert activity.reboot_needed == RebootType.COLD @unittest.mock.patch("fc.util.dmi_memory.main") -def test_do_not_request_reboot_for_unchanged_memory(get_memory): +@unittest.mock.patch("fc.util.vm.count_cores") +def test_do_not_request_reboot_for_unchanged_memory( + count_cores, get_memory, logger +): get_memory.return_value = 2048 - request = request_reboot_for_memory(ENC) + request = fc.maintenance.maintenance.request_reboot_for_memory(logger, ENC) assert request is None -@unittest.mock.patch("fc.maintenance.system_properties.RebootActivity.boottime") +@unittest.mock.patch("fc.util.dmi_memory.main") @unittest.mock.patch("fc.util.vm.count_cores") -def test_request_reboot_for_cpu(count_cores, boottime): +def test_request_reboot_for_cpu(count_cores, get_memory, logger): count_cores.return_value = 1 - request = request_reboot_for_cpu(ENC) - assert "from 1 to 2" in request.comment + request = fc.maintenance.maintenance.request_reboot_for_cpu(logger, ENC) + assert "1 -> 2" in request.comment activity = request.activity - assert isinstance(activity, RebootActivity) - assert activity.action == "poweroff" + assert isinstance(activity, VMChangeActivity) + activity.set_up_logging(logger) + activity.run() + assert activity.reboot_needed == RebootType.COLD +@unittest.mock.patch("fc.util.dmi_memory.main") @unittest.mock.patch("fc.util.vm.count_cores") -def test_do_not_request_reboot_for_unchanged_cpu(count_cores): +def test_do_not_request_reboot_for_unchanged_cpu( + count_cores, get_memory, logger +): count_cores.return_value = 2 - request = request_reboot_for_cpu(ENC) + request = fc.maintenance.maintenance.request_reboot_for_cpu(logger, ENC) assert request is None -@unittest.mock.patch("fc.maintenance.system_properties.RebootActivity.boottime") @unittest.mock.patch("os.path.isdir") @unittest.mock.patch("os.path.exists") @unittest.mock.patch("shutil.move") @@ -61,7 +68,7 @@ def test_request_reboot_for_qemu_change( shutil_move, path_exists, path_isdir, - boottime, + logger, ): path_isdir.return_value = True path_exists.return_value = True @@ -78,12 +85,12 @@ def fake_open_qemu_files(filename, encoding): with unittest.mock.patch( "builtins.open", contextlib.contextmanager(fake_open_qemu_files) ): - request = request_reboot_for_qemu() + request = fc.maintenance.maintenance.request_reboot_for_qemu(logger) assert "Qemu binary environment has changed" in request.comment activity = request.activity assert isinstance(activity, RebootActivity) - assert activity.action == "poweroff" + assert activity.reboot_needed == RebootType.COLD @unittest.mock.patch("os.path.isdir") @@ -93,6 +100,7 @@ def test_do_not_request_reboot_for_unchanged_qemu( shutil_move, path_exists, path_isdir, + logger, ): path_isdir.return_value = True path_exists.return_value = True @@ -109,13 +117,12 @@ def fake_open_qemu_files(filename, encoding): with unittest.mock.patch( "builtins.open", contextlib.contextmanager(fake_open_qemu_files) ): - request = request_reboot_for_qemu() + request = fc.maintenance.maintenance.request_reboot_for_qemu(logger) assert request is None -@unittest.mock.patch("fc.maintenance.system_properties.RebootActivity.boottime") -def test_request_reboot_for_kernel_change(boottime): +def test_request_reboot_for_kernel_change(logger): def fake_changed_kernel_version(path): if path == "/run/booted-system/kernel": return "5.10.45" @@ -125,15 +132,15 @@ def fake_changed_kernel_version(path): with unittest.mock.patch( "fc.util.nixos.kernel_version", fake_changed_kernel_version ): - request = request_reboot_for_kernel() + request = fc.maintenance.maintenance.request_reboot_for_kernel(logger) assert "kernel (5.10.45 to 5.10.50)" in request.comment activity = request.activity assert isinstance(activity, RebootActivity) - assert activity.action == "reboot" + assert activity.reboot_needed == RebootType.WARM -def test_do_not_request_reboot_for_unchanged_kernel(): +def test_do_not_request_reboot_for_unchanged_kernel(logger): def fake_changed_kernel_version(path): if path == "/run/booted-system/kernel": return "5.10.50" @@ -143,6 +150,34 @@ def fake_changed_kernel_version(path): with unittest.mock.patch( "fc.util.nixos.kernel_version", fake_changed_kernel_version ): - request = request_reboot_for_kernel() + request = fc.maintenance.maintenance.request_reboot_for_kernel(logger) + + assert request is None + + +def test_request_update(log, logger, monkeypatch): + enc = {} + from_enc_mock = MagicMock() + from_enc_mock.return_value.is_identical_to_running_system = False + monkeypatch.setattr( + "fc.maintenance.maintenance.UpdateActivity.from_enc", from_enc_mock + ) + + request = fc.maintenance.maintenance.request_update(logger, enc, []) + + assert request + log.has("update-system-changed") + + +def test_request_update_unchanged(log, logger, monkeypatch): + enc = {} + from_enc_mock = MagicMock() + from_enc_mock.return_value.is_identical_to_running_system = True + monkeypatch.setattr( + "fc.maintenance.maintenance.UpdateActivity.from_enc", from_enc_mock + ) + + request = fc.maintenance.maintenance.request_update(logger, enc, []) assert request is None + log.has("update-system-unchanged") diff --git a/pkgs/fc/agent/fc/maintenance/tests/test_reqmanager.py b/pkgs/fc/agent/fc/maintenance/tests/test_reqmanager.py index deefc9a76..9d790646f 100644 --- a/pkgs/fc/agent/fc/maintenance/tests/test_reqmanager.py +++ b/pkgs/fc/agent/fc/maintenance/tests/test_reqmanager.py @@ -12,22 +12,47 @@ import freezegun import pytest import pytz -import rich import shortuuid -from fc.maintenance.activity import Activity, RebootType -from fc.maintenance.reqmanager import ReqManager +from fc.maintenance.activity import Activity, ActivityMergeResult, RebootType +from fc.maintenance.estimate import Estimate from fc.maintenance.request import Attempt, Request from fc.maintenance.state import ARCHIVE, EXIT_POSTPONE, State from rich.console import Console +class MergeableActivity(Activity): + estimate = Estimate("10") + + def __init__(self, value): + super().__init__() + self.value = value + + @property + def comment(self): + return self.value + + def merge(self, other): + if not isinstance(other, MergeableActivity): + return ActivityMergeResult() + + # Simulate merging an activity that reverts this activity, resulting + # in a no-op situation. + if other.value == "inverse": + return ActivityMergeResult(self, is_effective=False) + + self.value = other.value + return ActivityMergeResult(self, is_effective=True, is_significant=True) + + @pytest.fixture -def agent_maintenance_config(tmpdir): - config_file = str(tmpdir / "fc-agent.conf") - with open(config_file, "w") as f: - f.write( - textwrap.dedent( - """\ +def agent_maintenance_config(tmp_path): + config_path = tmp_path / "fc-agent.conf" + config_path.write_text( + textwrap.dedent( + """\ + [maintenance] + preparation_seconds = 300 + [maintenance-enter] demo = echo "entering demo" @@ -35,50 +60,30 @@ def agent_maintenance_config(tmpdir): demo = echo "leaving demo" dummy = """ - ) ) - return config_file + ) + return config_path -@pytest.fixture -def reqmanager(tmpdir, agent_maintenance_config): - with ReqManager(str(tmpdir), config_file=agent_maintenance_config) as rm: - yield rm - - -@contextlib.contextmanager -def request_population(n, dir, config_file=None): - """Creates a ReqManager with a pregenerated population of N requests. - - The ReqManager and a list of Requests are passed to the calling code. - """ - with ReqManager(str(dir), config_file=config_file) as reqmanager: - requests = [] - for i in range(n): - req = Request(Activity(), 60, comment=str(i)) - req._reqid = shortuuid.encode(uuid.UUID(int=i)) - reqmanager.add(req) - requests.append(req) - yield (reqmanager, requests) - - -def test_init_should_create_directories(tmpdir): - spooldir = str(tmpdir / "maintenance") - ReqManager(spooldir) +def test_init_should_create_directories(reqmanager): + spooldir = reqmanager.spooldir assert p.isdir(spooldir) assert p.isdir(p.join(spooldir, "requests")) assert p.isdir(p.join(spooldir, "archive")) -def test_lockfile(tmpdir): - with ReqManager(str(tmpdir)): - with open(str(tmpdir / ".lock")) as f: - assert f.read().strip() == str(os.getpid()) - with open(str(tmpdir / ".lock")) as f: - assert f.read() == "" +def test_lockfile(reqmanager): + with reqmanager: + # ReqManager active, lock file contain PID + assert (reqmanager.spooldir / ".lock").read_text().strip() == str( + os.getpid() + ) + # ReqManager closed, lock file should be empty now. + assert (reqmanager.spooldir / ".lock").read_text() == "" -def test_req_save(tmpdir, request_population): + +def test_req_save(request_population): with request_population(1) as (rm, requests): req = requests[0] assert p.isfile(p.join(req.dir, "request.yaml")) @@ -87,53 +92,92 @@ def test_req_save(tmpdir, request_population): class FunnyActivity(Activity): def __init__(self, mood): + super().__init__() self.mood = mood -def test_scan(tmpdir, request_population): +def test_scan(request_population): with request_population(3) as (rm, requests): - requests[0].comment = "foo" + requests[0]._comment = "foo" requests[0].save() requests[1].activity = FunnyActivity("good") requests[1].save() - with ReqManager(str(tmpdir)) as rm: + with rm: assert set(rm.requests.values()) == set(requests) -def test_scan_invalid(tmpdir): - os.makedirs(str(tmpdir / "requests" / "emptydir")) - open(str(tmpdir / "requests" / "foo"), "w").close() - ReqManager(str(tmpdir)).scan() # should not raise +def test_scan_invalid(reqmanager): + os.makedirs(str(reqmanager.requestsdir / "emptydir")) + open(str(reqmanager.requestsdir / "foo"), "w").close() + reqmanager.scan() # should not raise assert True -def test_find_by_comment(reqmanager): +def test_dont_add_ineffective_req(reqmanager): + with reqmanager as rm: + activity = Activity() + activity.is_effective = False + assert rm.add(Request(activity, 1, "comment 1")) is None + assert not rm.requests + + +def test_do_add_ineffective_req_with_add_always(reqmanager): with reqmanager as rm: - rm.add(Request(Activity(), 1, "comment 1")) - req2 = rm.add(Request(Activity(), 1, "comment 2")) + activity = Activity() + activity.is_effective = False + assert ( + rm.add(Request(activity, 1, "comment 1"), add_always=True) + is not None + ) + assert len(rm.requests) == 1 + + +def test_add_dont_add_none(log, reqmanager): with reqmanager as rm: - assert req2 == rm.find_by_comment("comment 2") + rm.add(None) -def test_find_by_comment_returns_none_on_mismatch(reqmanager): +@unittest.mock.patch("fc.util.directory.connect") +def test_add_do_merge_compatible_request(log, reqmanager): with reqmanager as rm: - assert rm.find_by_comment("no such comment") is None + first_activity = MergeableActivity("first") + second_activity = MergeableActivity("second") + to_be_merged_activity = MergeableActivity("to be merged") + first_request = Request(first_activity) + second_request = Request(second_activity) + to_be_merged_request = Request(to_be_merged_activity) + assert rm.add(first_request) is first_request + # Should not be merged because of add_always + assert rm.add(second_request, add_always=True) is second_request + # Should be merged + assert rm.add(to_be_merged_request) is second_request + assert log.has( + "request-merged", + request=to_be_merged_request.id, + merged=second_request.id, + ) + assert len(rm.requests) == 2 -def test_dont_add_two_reqs_with_identical_comments(reqmanager): +def test_add_should_remove_no_op_request(reqmanager): with reqmanager as rm: - assert rm.add(Request(Activity(), 1, "comment 1")) is not None - assert rm.add(Request(Activity(), 1, "comment 1")) is None + first_activity = MergeableActivity("first") + second_activity = MergeableActivity("inverse") + first_request = Request(first_activity) + second_request = Request(second_activity) + assert rm.add(first_request) is first_request + assert rm.add(second_request) is None assert len(rm.requests) == 1 -def test_do_add_two_reqs_with_identical_comments(reqmanager): +def test_add_do_not_merge_incompatible_request(reqmanager): with reqmanager as rm: - assert rm.add(Request(Activity(), 1, "comment 1")) is not None - assert ( - rm.add(Request(Activity(), 1, "comment 1"), skip_same_comment=False) - is not None - ) + first_activity = MergeableActivity("first") + second_activity = Activity() + first_request = Request(first_activity) + second_request = Request(second_activity) + assert rm.add(first_request) is first_request + assert rm.add(second_request) is second_request assert len(rm.requests) == 2 @@ -146,12 +190,12 @@ def test_list_other_requests(reqmanager): @unittest.mock.patch("fc.util.directory.connect") def test_schedule_requests(connect, reqmanager): - req = reqmanager.add(Request(Activity(), 1, "comment")) + req = reqmanager.add(Request(Activity(), 320, "comment")) rpccall = connect().schedule_maintenance rpccall.return_value = {req.id: {"time": "2016-04-20T15:12:40.9+00:00"}} reqmanager.schedule() rpccall.assert_called_once_with( - {req.id: {"estimate": 1, "comment": "comment"}} + {req.id: {"estimate": 620, "comment": "comment"}} ) assert req.next_due == datetime.datetime( 2016, 4, 20, 15, 12, 40, 900000, tzinfo=pytz.UTC @@ -206,7 +250,11 @@ def test_execute_activity_with_reboot(connect, run, reqmanager, log): activity.reboot_needed = RebootType.WARM req = reqmanager.add(Request(activity, 1)) req.state = State.due - reqmanager.execute(run_all_now=True) + with pytest.raises(SystemExit) as e: + reqmanager.execute(run_all_now=True) + + assert e.value.code == 0 + run.assert_has_calls( [ call('echo "entering demo"', shell=True, check=True), @@ -220,7 +268,8 @@ def test_execute_activity_with_reboot(connect, run, reqmanager, log): @unittest.mock.patch("subprocess.run") def test_reboot_cold_reboot_has_precedence(run, reqmanager, log): - reqmanager.reboot({RebootType.COLD, RebootType.WARM}) + with pytest.raises(SystemExit): + reqmanager.reboot_and_exit({RebootType.COLD, RebootType.WARM}) assert log.has("maintenance-poweroff") @@ -293,7 +342,9 @@ def test_execute_all_due(connect, request_population): reqs[2].next_due = datetime.datetime(2016, 4, 20, 11, tzinfo=pytz.UTC) rm.execute() for r in reqs: - assert len(r.attempts) == 1 + assert ( + len(r.attempts) == 1 + ), f"Wrong number of attempts for request {r.id}, expected exactly one" @unittest.mock.patch("fc.util.directory.connect") @@ -323,8 +374,7 @@ def test_execute_logs_exception(connect, reqmanager, log): @unittest.mock.patch("fc.util.directory.connect") def test_execute_marks_service_status(connect, reqmanager): req = reqmanager.add(Request(Activity(), 1)) - req.state = State.due - reqmanager.execute() + reqmanager.execute(run_all_now=True) assert [ unittest.mock.call(unittest.mock.ANY, False), unittest.mock.call(unittest.mock.ANY, True), @@ -351,7 +401,7 @@ def test_postpone(connect, reqmanager): postp = connect().postpone_maintenance reqmanager.postpone() postp.assert_called_once_with({req.id: {"postpone_by": 180}}) - assert req.state == State.postpone + assert req.state == State.pending assert req.next_due is None @@ -418,6 +468,15 @@ def test_list(reqmanager): id1 = r1.id[:7] id2 = r2.id[:7] id3 = r3.id[:7] - assert id1 in str_output - assert id2 in str_output - assert id3 in str_output + assert id1[:6] in str_output + assert id2[:6] in str_output + assert id3[:6] in str_output + + +@freezegun.freeze_time("2016-04-20 11:00:00") +def test_overdue(reqmanager): + r1 = Request(Activity(), "14m", "pending request") + reqmanager.add(r1) + r2 = Request(Activity(), "2h", "due request") + r2.state = State.due + reqmanager.set_postpone_state_for_overdue_requests() diff --git a/pkgs/fc/agent/fc/maintenance/tests/test_request.py b/pkgs/fc/agent/fc/maintenance/tests/test_request.py index 74bd12293..2c603fa50 100644 --- a/pkgs/fc/agent/fc/maintenance/tests/test_request.py +++ b/pkgs/fc/agent/fc/maintenance/tests/test_request.py @@ -8,6 +8,41 @@ from fc.maintenance.state import State +def test_overdue_not_scheduled(): + r = Request(Activity(), 600) + assert not r.overdue + + +@unittest.mock.patch("fc.maintenance.request.utcnow") +def test_overdue_before_due(utcnow): + utcnow.side_effect = [ + datetime.datetime(2023, 1, 1, hour=0), + ] + r = Request(Activity(), 600) + r.next_due = datetime.datetime(2023, 1, 1, hour=2) + assert not r.overdue + + +@unittest.mock.patch("fc.maintenance.request.utcnow") +def test_overdue_after_but_ok(utcnow): + utcnow.side_effect = [ + datetime.datetime(2023, 1, 1, hour=2, minute=19), + ] + r = Request(Activity(), 600) + r.next_due = datetime.datetime(2023, 1, 1, hour=2) + assert not r.overdue + + +@unittest.mock.patch("fc.maintenance.request.utcnow") +def test_overdue_thats_too_late(utcnow): + utcnow.side_effect = [ + datetime.datetime(2023, 1, 1, hour=2, minute=31), + ] + r = Request(Activity(), 600) + r.next_due = datetime.datetime(2023, 1, 1, hour=2) + assert r.overdue + + def test_duration(): r = Request(Activity(), 1) a = Attempt() @@ -47,28 +82,30 @@ def test_duration_from_activity_duration(tmpdir): assert r.duration == 90 -def test_save_yaml(tmpdir): - r = Request(Activity(), 10, "my comment", dir=str(tmpdir)) +def test_save_yaml(tmp_path): + r = Request(Activity(), 10, "my comment", dir=str(tmp_path)) assert r.id is not None r.save() - with open(str(tmpdir / "request.yaml")) as f: - assert ( - f.read() - == """\ -&id001 !!python/object:fc.maintenance.request.Request -_reqid: {id} + saved_yaml = (tmp_path / "request.yaml").read_text() + expected = f"""\ +!!python/object:fc.maintenance.request.Request +_comment: my comment +_estimate: !!python/object:fc.maintenance.estimate.Estimate + value: 10.0 +_reqid: {r.id} _reqmanager: null -activity: !!python/object:fc.maintenance.activity.Activity - request: *id001 +activity: !!python/object:fc.maintenance.activity.Activity {{}} +added_at: null attempts: [] -comment: my comment -dir: {tmpdir} -estimate: !!python/object:fc.maintenance.estimate.Estimate - value: 10.0 -""".format( - id=r.id, tmpdir=str(tmpdir) - ) - ) +dir: {tmp_path} +last_scheduled_at: null +next_due: null +state: !!python/object/apply:fc.maintenance.state.State +- '-' +updated_at: null +""" + + assert saved_yaml == expected def test_execute_obeys_retrylimit(tmpdir): diff --git a/pkgs/fc/agent/fc/manage/cli.py b/pkgs/fc/agent/fc/manage/cli.py index 40b6508dd..411d4be9e 100644 --- a/pkgs/fc/agent/fc/manage/cli.py +++ b/pkgs/fc/agent/fc/manage/cli.py @@ -21,6 +21,7 @@ class Context(NamedTuple): enc_path: Path verbose: bool show_trace: bool + show_caller_info: bool context: Context @@ -183,7 +184,17 @@ def fc_manage( False, "--directory", "-e", help="(legacy flag) Update inventory data." ), verbose: bool = Option( - False, "--verbose", "-v", help="Show debug messages and code locations." + False, + "--verbose", + "-v", + help=( + "Show debug logging output. By default, only info and higher " + "levels are shown." + ), + ), + show_caller_info: bool = Option( + False, + help="Show where a logging function was called (file/function/line).", ), show_trace: bool = Option( False, @@ -239,6 +250,7 @@ def fc_manage( enc_path=enc_path, verbose=verbose, show_trace=show_trace, + show_caller_info=show_caller_info, ) return @@ -294,3 +306,7 @@ def main(): command = typer.main.get_command(app) result = command(standalone_mode=False) sys.exit(result) + + +if __name__ == "__main__": + main() diff --git a/pkgs/fc/agent/fc/manage/manage.py b/pkgs/fc/agent/fc/manage/manage.py index 98d086d28..9aaa3b141 100644 --- a/pkgs/fc/agent/fc/manage/manage.py +++ b/pkgs/fc/agent/fc/manage/manage.py @@ -1,42 +1,13 @@ """Update NixOS system configuration from infrastructure or local sources.""" import os -import os.path as p import re -import subprocess -from dataclasses import dataclass -from datetime import datetime from pathlib import Path -import fc.maintenance -import fc.util.logging -import requests -from fc.maintenance.lib.shellscript import ShellScriptActivity from fc.util import nixos +from fc.util.channel import Channel from fc.util.checks import CheckResult from fc.util.enc import STATE_VERSION_FILE -from fc.util.nixos import RE_FC_CHANNEL - -# nixos-rebuild doesn't support changing the result link name so we -# create a dir with a meaningful name (like /run/current-system) and -# let nixos-rebuild put it there. -# The link goes away after a reboot. It's possible that the new system -# will be garbage-collected before the switch in that case but the switch -# will still work. -NEXT_SYSTEM = "/run/next-system" - -ACTIVATE = f"""\ -set -e -nix-channel --add {{url}} nixos -nix-channel --update nixos -nix-channel --remove next -# Retry once in case nixos-build fails e.g. due to updates to Nix itself -nixos-rebuild switch || nixos-rebuild switch -rm -rf {NEXT_SYSTEM} -""" - -os.environ["NIX_REMOTE"] = "daemon" - # Other platform code can also check the presence of this marker file to # change behaviour before/during the first agent run. @@ -63,233 +34,6 @@ """ -class Channel: - PHRASES = re.compile(r"would (\w+) the following units: (.*)$") - - # global, to avoid re-connecting (with ssl handshake and all) - session = requests.session() - is_local = False - - def __init__(self, log, url, name="", environment=None, resolve_url=True): - self.url = url - self.name = name - self.environment = environment - self.system_path = None - - if url.startswith("file://"): - self.is_local = True - self.resolved_url = url.replace("file://", "") - elif resolve_url: - self.resolved_url = nixos.resolve_url_redirects(url) - else: - self.resolved_url = url - - self.log = log - - self.log_with_context = log.bind( - url=self.resolved_url, - name=name, - environment=environment, - is_local=self.is_local, - ) - - def version(self): - if self.is_local: - return "local-checkout" - label_comp = [ - "/root/.nix-defexpr/channels/{}/{}".format(self.name, c) - for c in [".version", ".version-suffix"] - ] - if all(p.exists(f) for f in label_comp): - return "".join(open(f).read() for f in label_comp) - - def __str__(self): - v = self.version() or "unknown" - return "".format( - self.name, v, self.resolved_url - ) - - def __eq__(self, other): - if isinstance(other, Channel): - return self.resolved_url == other.resolved_url - return NotImplemented - - @classmethod - def current(cls, log, channel_name): - """Looks up existing channel by name. - The URL found is usually already resolved (no redirects) - so we don't do it again here. It can still be enabled with - `resolve_url`, when needed. - """ - if not p.exists("/root/.nix-channels"): - log.debug("channel-current-no-nix-channels-dir") - return - with open("/root/.nix-channels") as f: - for line in f.readlines(): - url, name = line.strip().split(" ", 1) - if name == channel_name: - # We don't have to resolve the URL if it's a direct link - # to a Hydra build product. This is the normal case for - # running VMs because the nixos channel is set to an - # already resolved URL. - # Resolve all other URLs, for example initial URLs used - # during VM bootstrapping. - resolve_url = RE_FC_CHANNEL.match(url) is None - log.debug( - "channel-current", - url=url, - name=name, - resolve_url=resolve_url, - ) - return Channel(log, url, name, resolve_url=resolve_url) - - log.debug("channel-current-not-found", name=name) - - def load_nixos(self): - self.log_with_context.debug("channel-load-nixos") - - if self.is_local: - raise RuntimeError("`load` not applicable for local channels") - - nixos.update_system_channel(self.resolved_url, self.log) - - def load_next(self): - self.log_with_context.debug("channel-load-next") - - if self.is_local: - raise RuntimeError("`load` not applicable for local channels") - subprocess.run( - ["nix-channel", "--add", self.resolved_url, "next"], - check=True, - capture_output=True, - text=True, - ) - subprocess.run( - ["nix-channel", "--update", "next"], - check=True, - capture_output=True, - text=True, - ) - - def check_local_channel(self): - if not p.exists(p.join(self.resolved_url, "fc")): - self.log_with_context.error( - "local-channel-nix-path-invalid", - _replace_msg="Expected NIX_PATH element 'fc' not found. Did you " - "create a 'channels' directory via `dev-setup` and point " - "the channel URL towards that directory?", - ) - - def switch(self, lazy=True, show_trace=False): - """ - Build system with this channel and switch to it. - Replicates the behaviour of nixos-rebuild switch and adds - a "lazy mode" which only switches to the built system if it actually - changed. - """ - self.log_with_context.debug("channel-switch-start") - # Put a temporary result link in /run to avoid a race condition - # with the garbage collector which may remove the system we just built. - # If register fails, we still hold a GC root until the next reboot. - out_link = "/run/fc-agent-built-system" - self.build(out_link, show_trace) - nixos.register_system_profile(self.system_path, self.log) - # New system is registered, delete the temporary result link. - os.unlink(out_link) - return nixos.switch_to_system(self.system_path, lazy, self.log) - - def build(self, out_link=None, show_trace=False): - """ - Build system with this channel. Works like nixos-rebuild build. - Does not modify the running system. - """ - self.log_with_context.debug("channel-build-start") - - if show_trace: - build_options = ["--show-trace"] - else: - build_options = [] - - if self.is_local: - self.check_local_channel() - system_path = nixos.build_system( - self.resolved_url, build_options, out_link, self.log - ) - self.system_path = system_path - - def dry_activate(self): - return nixos.dry_activate_system(self.system_path, self.log) - - def prepare_maintenance(self): - self.log_with_context.debug("channel-prepare-maintenance-start") - - if not p.exists(NEXT_SYSTEM): - os.mkdir(NEXT_SYSTEM) - - out_link = Path(NEXT_SYSTEM) / "result" - self.system_path = nixos.build_system( - "/root/.nix-defexpr/channels/next", out_link=out_link, log=self.log - ) - changes = self.dry_activate() - self.register_maintenance(changes) - - def register_maintenance(self, changes): - self.log_with_context.debug("maintenance-register-start") - - def notify(category): - services = changes.get(category, []) - if services: - return "{}: {}".format( - category.capitalize(), - ", ".join(s.replace(".service", "", 1) for s in services), - ) - else: - return "" - - notifications = list( - filter( - None, - (notify(cat) for cat in ["stop", "restart", "start", "reload"]), - ) - ) - msg_parts = [ - f"System update to {self.version()}", - f"Environment: {self.environment}", - f"Channel URL: {self.resolved_url}", - ] + notifications - - current_kernel = nixos.kernel_version("/run/current-system/kernel") - next_kernel = nixos.kernel_version("/run/next-system/result/kernel") - - if current_kernel != next_kernel: - self.log.info( - "maintenance-register-kernel-change", - current_kernel=current_kernel, - next_kernel=next_kernel, - ) - msg_parts.append( - "Will schedule a reboot to activate the changed kernel." - ) - - if len(msg_parts) > 1: # add trailing newline if output is multi-line - msg_parts += [""] - - msg = "\n".join(msg_parts) - # XXX: We should use an fc-manage call (like --activate), instead of - # Dumping the script into the maintenance request. - script = ACTIVATE.format(url=self.resolved_url) - self.log_with_context.debug( - "maintenance-register-result", script=script, comment=msg - ) - with fc.maintenance.ReqManager() as rm: - rm.add( - fc.maintenance.Request( - ShellScriptActivity(script), 600, comment=msg - ) - ) - self.log.info("maintenance-register-succeeded") - - class SwitchFailed(Exception): pass @@ -332,7 +76,7 @@ def check(log, enc) -> CheckResult: errors.append("ENC: environment URL is missing.") uses_local_checkout = ( - environment_url.startswith("file://") if environment_url else None + environment_url.startswith("file:") if environment_url else None ) if production_flag and uses_local_checkout: @@ -380,65 +124,6 @@ def check(log, enc) -> CheckResult: return CheckResult(errors, warnings) -def prepare_switch_in_maintenance(log, enc): - if not enc or not enc.get("parameters"): - log.warning( - "enc-data-missing", msg="No ENC data. Not building channel." - ) - return False - # scheduled update already present? - if Channel.current(log, "next"): - rm = fc.maintenance.ReqManager() - rm.scan() - if rm.requests: - due = list(rm.requests.values())[0].next_due - log.info( - "maintenance-present", - scheduled=bool(due), - at=datetime.isoformat(due) if due else None, - ) - return False - - # scheduled update available? - next_channel = Channel( - log, - enc["parameters"]["environment_url"], - name="next", - environment=enc["parameters"]["environment"], - ) - - if not next_channel or next_channel.is_local: - log.warn( - "maintenance-error-local-channel", - _replace_msg="Switch-in-maintenance incompatible with local checkout, abort.", - ) - return False - - current_channel = Channel.current(log, "nixos") - if next_channel != current_channel: - next_channel.load_next() - log.info( - "maintenance-prepare-changed", - current=str(current_channel), - next=str(next_channel), - ) - try: - next_channel.prepare_maintenance() - except nixos.ChannelException: - subprocess.run( - ["nix-channel", "--remove", "next"], - check=True, - capture_output=True, - text=True, - ) - raise - - return True - else: - log.info("maintenance-prepare-unchanged") - return False - - def dry_activate(log, channel_url, show_trace=False): channel = Channel( log, diff --git a/pkgs/fc/agent/fc/manage/systemd.py b/pkgs/fc/agent/fc/manage/systemd.py index 88de78219..b2c8c1817 100644 --- a/pkgs/fc/agent/fc/manage/systemd.py +++ b/pkgs/fc/agent/fc/manage/systemd.py @@ -77,3 +77,7 @@ def check_units( print(result.format_output()) if result.exit_code: raise Exit(result.exit_code) + + +if __name__ == "__main__": + app() diff --git a/pkgs/fc/agent/fc/manage/tests/test_cli.py b/pkgs/fc/agent/fc/manage/tests/test_cli.py index cfdbe6e6b..afe398447 100644 --- a/pkgs/fc/agent/fc/manage/tests/test_cli.py +++ b/pkgs/fc/agent/fc/manage/tests/test_cli.py @@ -28,6 +28,7 @@ def invoke_app(log, tmpdir, agent_maintenance_config): enc_file = tmpdir / "enc.json" main_args = ( "--verbose", + "--show-caller-info", "--logdir", tmpdir, "--tmpdir", diff --git a/pkgs/fc/agent/fc/manage/tests/test_manage.py b/pkgs/fc/agent/fc/manage/tests/test_manage.py index 28c0d122e..901a3d82f 100644 --- a/pkgs/fc/agent/fc/manage/tests/test_manage.py +++ b/pkgs/fc/agent/fc/manage/tests/test_manage.py @@ -1,18 +1,12 @@ -import textwrap from unittest.mock import MagicMock, Mock +import fc.manage.manage import responses from fc.manage.manage import Channel from pytest import fixture, raises from requests import HTTPError -@fixture -def mocked_responses(): - with responses.RequestsMock() as rsps: - yield rsps - - def expr_url(url): return url + "nixexprs.tar.xz" @@ -66,76 +60,3 @@ def test_channel_wrong_url_should_raise(logger, mocked_responses): with raises(HTTPError): Channel(logger, url) - - -def test_channel_prepare_maintenance( - log, mocked_responses, logger, monkeypatch, tmp_path -): - channel_url = ( - "https://hydra.flyingcircus.io/build/93222/download/1/nixexprs.tar.xz" - ) - version = "21.05.1367.817a5b0" - environment = "fc-21.05-dev" - system_path = f"/nix/store/v49jzgwblcn9vkrmpz92kzw5pkbsn0vz-nixos-system-test-{version}" - changes = { - "reload": ["nginx.service"], - "restart": ["telegraf.service"], - "start": ["postgresql.service"], - "stop": ["postgresql.service"], - } - - expected_request_comment = textwrap.dedent( - f"""\ - System update to {version} - Environment: {environment} - Channel URL: {channel_url} - Stop: postgresql - Restart: telegraf - Start: postgresql - Reload: nginx - Will schedule a reboot to activate the changed kernel. - """ - ) - - mocked_responses.add(responses.HEAD, channel_url) - monkeypatch.setattr("fc.manage.manage.NEXT_SYSTEM", tmp_path) - build_system_mock = Mock(return_value=system_path) - monkeypatch.setattr("fc.util.nixos.build_system", build_system_mock) - dry_activate_system_mock = Mock(return_value=changes) - monkeypatch.setattr( - "fc.util.nixos.dry_activate_system", dry_activate_system_mock - ) - - def fake_changed_kernel_version(path): - if path == "/run/current-system/kernel": - return "5.10.45" - elif path == "/run/next-system/result/kernel": - return "5.10.50" - - monkeypatch.setattr( - "fc.util.nixos.kernel_version", fake_changed_kernel_version - ) - - req_manager_mock = MagicMock() - req_manager_mock.return_value.__enter__.return_value.add = ( - rm_add_mock - ) = Mock() - monkeypatch.setattr("fc.maintenance.ReqManager", req_manager_mock) - - channel = Channel(logger, channel_url, environment=environment) - channel.version = lambda: version - - channel.prepare_maintenance() - - build_system_mock.assert_called_once_with( - "/root/.nix-defexpr/channels/next", - out_link=tmp_path / "result", - log=channel.log, - ) - dry_activate_system_mock.assert_called_once_with(system_path, channel.log) - assert log.has("channel-prepare-maintenance-start") - # Check maintenance request. - rm_add_mock.assert_called_once() - req = rm_add_mock.call_args[0][0] - assert req.comment == expected_request_comment - assert channel_url in req.activity.script diff --git a/pkgs/fc/agent/fc/util/channel.py b/pkgs/fc/agent/fc/util/channel.py new file mode 100644 index 000000000..2a6811cc1 --- /dev/null +++ b/pkgs/fc/agent/fc/util/channel.py @@ -0,0 +1,141 @@ +import os +import os.path as p + +from fc.util import nixos +from fc.util.nixos import RE_FC_CHANNEL + + +class Channel: + is_local = False + + def __init__(self, log, url, name="", environment=None, resolve_url=True): + self.url = url + self.name = name + self.environment = environment + self.system_path = None + + if url.startswith("file://"): + self.is_local = True + self.resolved_url = url.replace("file://", "") + elif resolve_url: + self.resolved_url = nixos.resolve_url_redirects(url) + else: + self.resolved_url = url + + self.log = log + + self.log_with_context = log.bind( + url=self.resolved_url, + name=name, + environment=environment, + is_local=self.is_local, + ) + + def version(self): + if self.is_local: + return "local-checkout" + label_comp = [ + "/root/.nix-defexpr/channels/{}/{}".format(self.name, c) + for c in [".version", ".version-suffix"] + ] + if all(p.exists(f) for f in label_comp): + return "".join(open(f).read() for f in label_comp) + + def __str__(self): + v = self.version() or "unknown" + return "".format( + self.name, v, self.resolved_url + ) + + def __eq__(self, other): + if isinstance(other, Channel): + return self.resolved_url == other.resolved_url + return NotImplemented + + @classmethod + def current(cls, log, channel_name): + """Looks up existing channel by name. + The URL found is usually already resolved (no redirects) + so we don't do it again here. It can still be enabled with + `resolve_url`, when needed. + """ + if not p.exists("/root/.nix-channels"): + log.debug("channel-current-no-nix-channels-dir") + return + with open("/root/.nix-channels") as f: + for line in f.readlines(): + url, name = line.strip().split(" ", 1) + if name == channel_name: + # We don't have to resolve the URL if it's a direct link + # to a Hydra build product. This is the normal case for + # running VMs because the nixos channel is set to an + # already resolved URL. + # Resolve all other URLs, for example initial URLs used + # during VM bootstrapping. + resolve_url = RE_FC_CHANNEL.match(url) is None + log.debug( + "channel-current", + url=url, + name=name, + resolve_url=resolve_url, + ) + return Channel(log, url, name, resolve_url=resolve_url) + + log.debug("channel-current-not-found", name=name) + + def load_nixos(self): + self.log_with_context.debug("channel-load-nixos") + + if self.is_local: + raise RuntimeError("`load` not applicable for local channels") + + nixos.update_system_channel(self.resolved_url, self.log) + + def check_local_channel(self): + if not p.exists(p.join(self.resolved_url, "fc")): + self.log_with_context.error( + "local-channel-nix-path-invalid", + _replace_msg="Expected NIX_PATH element 'fc' not found. Did you " + "create a 'channels' directory via `dev-setup` and point " + "the channel URL towards that directory?", + ) + + def switch(self, lazy=True, show_trace=False): + """ + Build system with this channel and switch to it. + Replicates the behaviour of nixos-rebuild switch and adds + a "lazy mode" which only switches to the built system if it actually + changed. + """ + self.log_with_context.debug("channel-switch-start") + # Put a temporary result link in /run to avoid a race condition + # with the garbage collector which may remove the system we just built. + # If register fails, we still hold a GC root until the next reboot. + out_link = "/run/fc-agent-built-system" + self.build(out_link, show_trace) + nixos.register_system_profile(self.system_path, self.log) + # New system is registered, delete the temporary result link. + os.unlink(out_link) + return nixos.switch_to_system(self.system_path, lazy, self.log) + + def build(self, out_link=None, show_trace=False): + """ + Build system with this channel. Works like nixos-rebuild build. + Does not modify the running system. + """ + self.log_with_context.debug("channel-build-start") + + if show_trace: + build_options = ["--show-trace"] + else: + build_options = [] + + if self.is_local: + self.check_local_channel() + system_path = nixos.build_system( + self.resolved_url, build_options, out_link, self.log + ) + self.system_path = system_path + + def dry_activate(self): + return nixos.dry_activate_system(self.system_path, self.log) diff --git a/pkgs/fc/agent/fc/util/nixos.py b/pkgs/fc/agent/fc/util/nixos.py index c4e4209b2..4786740ce 100644 --- a/pkgs/fc/agent/fc/util/nixos.py +++ b/pkgs/fc/agent/fc/util/nixos.py @@ -20,15 +20,13 @@ requests_session = requests.session() PHRASES = re.compile(r"would (\w+) the following units: (.*)$") -FC_ENV_FILE = "/etc/fcio_environment" +FC_ENV_FILE = "/etc/fcio_environment_name" RE_FC_CHANNEL = re.compile( r"https://hydra.flyingcircus.io/build/(\d+)/download/1/nixexprs.tar.xz" ) -class Channel: - def __init__(self, url) -> None: - self.url = url +UnitChanges = dict[str, list[str]] class ChannelException(Exception): @@ -202,7 +200,12 @@ def resolve_url_redirects(url): def detect_systemd_unit_changes(dry_activate_lines): - changes = {} + changes: UnitChanges = { + "start": [], + "stop": [], + "restart": [], + "reload": [], + } for line in dry_activate_lines: m = PHRASES.match(line) if m is not None: @@ -213,25 +216,28 @@ def detect_systemd_unit_changes(dry_activate_lines): def format_unit_change_lines(unit_changes): - units_by_category = {} - start_units = set(unit_changes.get("start", [])) - stop_units = set(unit_changes.get("stop", [])) - reload_units = set(unit_changes.get("reload", [])) + # Clean up raw unit changes: usually, units are stopped and + # started shortly after for updates. They get their own category + # "Start/Stop" to separate them from permanent stops and starts. + pretty_unit_changes = {} + start_units = set(unit_changes["start"]) + stop_units = set(unit_changes["stop"]) + reload_units = set(unit_changes["reload"]) start_stop_units = start_units.intersection(stop_units) - units_by_category["Start/Stop"] = start_stop_units - units_by_category["Restart"] = unit_changes.get("restart", []) - units_by_category["Start"] = start_units - start_stop_units - units_by_category["Stop"] = stop_units - start_stop_units - units_by_category["Reload"] = reload_units - {"dbus.service"} + pretty_unit_changes["Start/Stop"] = start_stop_units + pretty_unit_changes["Restart"] = set(unit_changes["restart"]) + pretty_unit_changes["Start"] = start_units - start_stop_units + pretty_unit_changes["Stop"] = stop_units - start_stop_units + pretty_unit_changes["Reload"] = reload_units - {"dbus.service"} unit_change_lines = [] - for cat, units in units_by_category.items(): + for cat, units in pretty_unit_changes.items(): if units: - unit_change_lines.append(f"{cat}:") - unit_change_lines.extend( - [" - " + u.replace(".service", "") for u in sorted(units)] + unit_str = ", ".join( + u.replace(".service", "") for u in sorted(units) ) + unit_change_lines.append(f"{cat}: {unit_str}") return unit_change_lines @@ -483,7 +489,7 @@ def switch_to_system(system_path, lazy, log=_log): return True -def dry_activate_system(system_path, log=_log): +def dry_activate_system(system_path, log=_log) -> UnitChanges: cmd = [f"{system_path}/bin/switch-to-configuration", "dry-activate"] log.info( "system-dry-activate-start", @@ -507,10 +513,10 @@ def dry_activate_system(system_path, log=_log): unit_changes = detect_systemd_unit_changes(stdout_lines) log.debug( "system-dry-activate-unit-changes", - start=unit_changes.get("start"), - stop=unit_changes.get("stop"), - restart=unit_changes.get("restart"), - reload=unit_changes.get("reload"), + start=unit_changes["start"], + stop=unit_changes["stop"], + restart=unit_changes["restart"], + reload=unit_changes["reload"], ) return unit_changes diff --git a/pkgs/fc/agent/fc/util/postgresql.py b/pkgs/fc/agent/fc/util/postgresql.py index f5ae08cf0..c998c69dd 100644 --- a/pkgs/fc/agent/fc/util/postgresql.py +++ b/pkgs/fc/agent/fc/util/postgresql.py @@ -8,8 +8,6 @@ from pathlib import Path from subprocess import CalledProcessError, run -from typer import confirm - MIGRATED_TO_TEMPLATE = """\ WARNING: This data directory should not be used anymore! diff --git a/pkgs/fc/agent/fc/util/tests/test_enc.py b/pkgs/fc/agent/fc/util/tests/test_enc.py index 796a4ae22..68101e0a6 100644 --- a/pkgs/fc/agent/fc/util/tests/test_enc.py +++ b/pkgs/fc/agent/fc/util/tests/test_enc.py @@ -5,25 +5,25 @@ from fc.util.enc import initialize_enc, update_enc -def test_initialize_enc_should_do_nothing_when_enc_present(log, logger, tmpdir): - tmpdir_path = Path(tmpdir) - enc_path = Path(f"{tmpdir}/enc.json") +def test_initialize_enc_should_do_nothing_when_enc_present( + log, logger, tmp_path +): + enc_path = tmp_path / "enc.json" enc_path.write_text("") - initialize_enc(logger, tmpdir_path, enc_path) + initialize_enc(logger, tmp_path, enc_path) assert log.has("initialize-enc-present", enc_path=str(enc_path)) -def test_initialize_enc_should_populate_enc_initially(log, logger, tmpdir): - tmpdir_path = Path(tmpdir) - fc_data_path = tmpdir_path / "fc-data" +def test_initialize_enc_should_populate_enc_initially(log, logger, tmp_path): + fc_data_path = tmp_path / "fc-data" fc_data_path.mkdir() initial_enc_path = fc_data_path / "enc.json" initial_enc_path.write_text("") - enc_path = Path(f"{tmpdir}/enc.json") + enc_path = tmp_path / "enc.json" - initialize_enc(logger, tmpdir_path, enc_path) + initialize_enc(logger, tmp_path, enc_path) assert log.has( "initialize-enc-init", @@ -34,12 +34,11 @@ def test_initialize_enc_should_populate_enc_initially(log, logger, tmpdir): def test_initialize_enc_should_not_crash_when_initial_data_missing( - log, logger, tmpdir + log, logger, tmp_path ): - tmpdir_path = Path(tmpdir) - enc_path = Path(f"{tmpdir}/enc.json") + enc_path = tmp_path / "enc.json" - initialize_enc(logger, tmpdir_path, enc_path) + initialize_enc(logger, tmp_path, enc_path) assert log.has("initialize-enc-initial-data-not-found") @@ -55,15 +54,14 @@ def test_update_enc( write_system_state, log, logger, - tmpdir, + tmp_path, ): enc_data = {"parameters": {"test": 1}} - tmpdir_path = Path(tmpdir) - enc_path = Path(f"{tmpdir}/enc.json") + enc_path = tmp_path / "enc.json" with open(enc_path, "w") as wf: json.dump(enc_data, wf) - update_enc(logger, tmpdir_path, enc_path) + update_enc(logger, tmp_path, enc_path) initialize_state_version.assert_called_once() update_inventory.assert_called_with(logger, enc_data) diff --git a/pkgs/fc/agent/fc/util/tests/test_logging.py b/pkgs/fc/agent/fc/util/tests/test_logging.py index ca28234eb..8498883c0 100644 --- a/pkgs/fc/agent/fc/util/tests/test_logging.py +++ b/pkgs/fc/agent/fc/util/tests/test_logging.py @@ -1,4 +1,3 @@ -import datetime import syslog import pytest diff --git a/pkgs/fc/agent/fc/util/time_date.py b/pkgs/fc/agent/fc/util/time_date.py index 9bc3689ad..f01e4172c 100644 --- a/pkgs/fc/agent/fc/util/time_date.py +++ b/pkgs/fc/agent/fc/util/time_date.py @@ -3,7 +3,7 @@ import pytz -def ensure_timezone_present(dt): +def ensure_timezone_present(dt: datetime.datetime): if dt and not dt.tzinfo: return pytz.UTC.localize(dt) @@ -14,5 +14,5 @@ def utcnow(): return pytz.UTC.localize(datetime.datetime.utcnow()) -def format_datetime(dt): +def format_datetime(dt: datetime.datetime): return dt.strftime("%Y-%m-%d %H:%M UTC") diff --git a/pkgs/fc/agent/python_dev_env.nix b/pkgs/fc/agent/python_dev_env.nix index ae9439b41..2385bc5ba 100755 --- a/pkgs/fc/agent/python_dev_env.nix +++ b/pkgs/fc/agent/python_dev_env.nix @@ -6,6 +6,6 @@ let pkgs = import {}; - fcagent = pkgs.callPackage ./. {}; + fcagent = pkgs.python310Packages.callPackage ./. {}; in fcagent.pythonDevEnv diff --git a/pkgs/fc/agent/shell.nix b/pkgs/fc/agent/shell.nix index 4dfb6b11b..8634f37fe 100644 --- a/pkgs/fc/agent/shell.nix +++ b/pkgs/fc/agent/shell.nix @@ -1,4 +1,7 @@ let pkgs = import {}; - -in pkgs.callPackage ./. {} + fcagent = pkgs.python310Packages.callPackage ./. {}; +in +fcagent.overridePythonAttrs(_: { + doCheck = true; +}) diff --git a/pkgs/fc/default.nix b/pkgs/fc/default.nix index 7705388a1..a80c34a7d 100644 --- a/pkgs/fc/default.nix +++ b/pkgs/fc/default.nix @@ -1,9 +1,9 @@ -{ pkgs, callPackage }: +{ pkgs, pythonPackages, callPackage }: rec { recurseForDerivations = true; - agent = callPackage ./agent {}; + agent = pythonPackages.callPackage ./agent {}; check-age = callPackage ./check-age {}; # XXX: ceph is broken, needs integration of changes from 21.05 diff --git a/tests/fcagent.nix b/tests/fcagent.nix index de1fdbabf..4c0b12860 100644 --- a/tests/fcagent.nix +++ b/tests/fcagent.nix @@ -13,7 +13,7 @@ import ./make-test-python.nix ({ pkgs, testlib, ... }: }; testScript = '' machine.wait_for_unit('multi-user.target') - machine.succeed("systemctl show fc-update-channel.service --property ExecStart | grep -v 'run-now'") + machine.succeed("systemctl show fc-update-channel.service --property ExecStart | grep 'request update'") ''; }; nonprod = { @@ -27,7 +27,7 @@ import ./make-test-python.nix ({ pkgs, testlib, ... }: }; testScript = '' machine.wait_for_unit('multi-user.target') - machine.succeed("systemctl show fc-update-channel.service --property ExecStart | grep 'run-now'") + machine.succeed("systemctl show fc-update-channel.service --property ExecStart | grep 'switch --update-channel'") ''; }; };