diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index 51da02c5..344d67fb 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -58,6 +58,7 @@ class SystemPaths(EnumBackport): class AzGPSPaths(EnumBackport): EULA_SETTINGS = "/var/lib/azure/linuxpatchextension/patch.eula.settings" + LIVEPATCH_CUSTOMER_SETTINGS = "/var/lib/azure/linuxpatchextension/livepatching.settings" class EnvSettings(EnumBackport): LOG_FOLDER = "logFolder" @@ -87,6 +88,12 @@ class EulaSettings(EnumBackport): ACCEPTED_BY = 'AcceptedBy' LAST_MODIFIED = 'LastModified' + class LivePatchSettings(EnumBackport): + ENABLE_LIVEPATCH = 'EnableLivePatch' # boolean config that if set to true indicates customer's ask on AzGPS to apply livepatches on their VM + LIVEPATCH_ONLY = 'LivePatchOnly' # boolean config that indicates if the customer is requesting only livepatches i.e. no cold patch or regular patching with reboots + ENABLED_BY = 'EnabledBy' + LAST_MODIFIED = 'LastModified' + TEMP_FOLDER_DIR_NAME = "tmp" TEMP_FOLDER_CLEANUP_ARTIFACT_LIST = ["*.list", "azgps*"] @@ -312,6 +319,7 @@ class PatchOperationErrorCodes(EnumBackport): NEWER_OPERATION_SUPERSEDED = "NEWER_OPERATION_SUPERSEDED" UA_ESM_REQUIRED = "UA_ESM_REQUIRED" TRUNCATION = "PACKAGE_LIST_TRUNCATED" + LIVEPATCH_ERROR = "LIVEPATCH_ERROR" ERROR_ADDED_TO_STATUS = "Error_added_to_status" diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index f79f7b2c..6847f74c 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -421,4 +421,10 @@ def datetime_string_to_posix_time(datetime_string, format_string): epoch = datetime.datetime(1970, 1, 1) return int((dt - epoch).total_seconds()) + @staticmethod + def datetime_iso_basic_string_to_extended_string(datetime_string): + # type: (str) -> str + """Converts ISO 8601 basic date format (20240401T000000Z) to extended format (2024-04-01T00:00:00Z).""" + return datetime.datetime.strptime(datetime_string, "%Y%m%dT%H%M%SZ").strftime("%Y-%m-%dT%H:%M:%SZ") + # endregion - DateTime emulator and extensions diff --git a/src/core/src/core_logic/ExecutionConfig.py b/src/core/src/core_logic/ExecutionConfig.py index 04cec562..f0122af4 100644 --- a/src/core/src/core_logic/ExecutionConfig.py +++ b/src/core/src/core_logic/ExecutionConfig.py @@ -92,6 +92,11 @@ def __init__(self, env_layer, composite_logger, execution_parameters): # EULA config self.accept_package_eula = self.__is_eula_accepted_for_all_patches() + # LivePatch config + self.livepatch_customer_config_settings = self.__get_livepatch_customer_config_in_json() + self.is_livepatch_requested = self.__is_livepatch_requested(self.livepatch_customer_config_settings) + self.is_livepatch_only_requested = self.__is_livepatch_only_requested(self.livepatch_customer_config_settings) + def __transform_execution_config_for_auto_assessment(self): self.activity_id = str(uuid.uuid4()) self.included_classifications_list = self.included_package_name_mask_list = self.excluded_package_name_mask_list = [] @@ -246,10 +251,10 @@ def __is_eula_accepted_for_all_patches(self): try: if os.path.exists(Constants.AzGPSPaths.EULA_SETTINGS): eula_settings = json.loads(self.env_layer.file_system.read_with_retry(Constants.AzGPSPaths.EULA_SETTINGS) or 'null') - accept_eula_for_all_patches = self.__fetch_specific_eula_setting(eula_settings, Constants.EulaSettings.ACCEPT_EULA_FOR_ALL_PATCHES) - accepted_by = self.__fetch_specific_eula_setting(eula_settings, Constants.EulaSettings.ACCEPTED_BY) - last_modified = self.__fetch_specific_eula_setting(eula_settings, Constants.EulaSettings.LAST_MODIFIED) - if accept_eula_for_all_patches is not None and accept_eula_for_all_patches in [True, 'True', 'true', '1', 1]: + accept_eula_for_all_patches = self.__fetch_specific_setting(eula_settings, Constants.EulaSettings.ACCEPT_EULA_FOR_ALL_PATCHES) + accepted_by = self.__fetch_specific_setting(eula_settings, Constants.EulaSettings.ACCEPTED_BY) + last_modified = self.__fetch_specific_setting(eula_settings, Constants.EulaSettings.LAST_MODIFIED) + if self.__is_truthy(accept_eula_for_all_patches): is_eula_accepted = True self.composite_logger.log_debug("EULA config values from disk: [AcceptEULAForAllPatches={0}] [AcceptedBy={1}] [LastModified={2}]. Computed value of [IsEULAAccepted={3}]" .format(str(accept_eula_for_all_patches), str(accepted_by), str(last_modified), str(is_eula_accepted))) @@ -260,10 +265,72 @@ def __is_eula_accepted_for_all_patches(self): return is_eula_accepted + def __get_livepatch_customer_config_in_json(self): + # type: () -> dict + """ Reads customer provided config on livepatch from disk and returns a dict with the config values. + NOTE: This is a temporary solution and will be deprecated soon """ + livepatch_customer_config = dict() + try: + if os.path.exists(Constants.AzGPSPaths.LIVEPATCH_CUSTOMER_SETTINGS): + livepatch_customer_config = json.loads(self.env_layer.file_system.read_with_retry(Constants.AzGPSPaths.LIVEPATCH_CUSTOMER_SETTINGS) or 'null') or dict() + self.composite_logger.log_debug("Livepatch customer config values from disk: [Config={0}]".format(str(livepatch_customer_config))) + else: + self.composite_logger.log_debug("No livepatch customer config found on the VM. Returning empty config.") + except Exception as error: + self.composite_logger.log_debug("Error occurred while reading and parsing livepatch customer config. Returning empty config. Error=[{0}]".format(repr(error))) + + return livepatch_customer_config + + def __is_livepatch_requested(self, livepatch_settings): + # type: (dict) -> bool + """ Determines if livepatch is requested in config settings. Returns a boolean.""" + livepatch_requested = False + + enable_livepatch = self.__fetch_specific_setting(livepatch_settings, Constants.LivePatchSettings.ENABLE_LIVEPATCH) + enabled_by = self.__fetch_specific_setting(livepatch_settings, Constants.LivePatchSettings.ENABLED_BY) + last_modified = self.__fetch_specific_setting(livepatch_settings, Constants.LivePatchSettings.LAST_MODIFIED) + if self.__is_truthy(enable_livepatch): + livepatch_requested = True + self.composite_logger.log_debug("Livepatch config values read from disk: [EnableLivePatch={0}] [EnabledBy={1}] [LastModified={2}]. Computed value of [LivePatchRequested={3}]" + .format(str(enable_livepatch), str(enabled_by), str(last_modified), str(livepatch_requested))) + else: + self.composite_logger.log_debug("LivePatch is not requested for the VM. [EnableLivePatchValueFromConfig={0}]. Computed value of [LivePatchRequested={1}]" + .format(str(enable_livepatch),str(livepatch_requested))) + + return livepatch_requested + + def __is_livepatch_only_requested(self, livepatch_settings): + # type: (dict) -> bool + """ Determines if livepatch only, i.e. no cold patch, is requested in config settings. Returns a boolean.""" + """ NOTE: This is not in use currently but can be added in MVP if needed""" + livepatch_only_config = self.__fetch_specific_setting(livepatch_settings, Constants.LivePatchSettings.LIVEPATCH_ONLY) + livepatch_only_requested = self.__is_truthy(livepatch_only_config) + self.composite_logger.log_debug("Livepatch only config values from disk: [LivePatchOnly={0}]. Computed value of [LivePatchOnlyRequested={1}]" + .format(str(livepatch_only_config), str(livepatch_only_requested))) + return livepatch_only_requested + @staticmethod - def __fetch_specific_eula_setting(settings_source, setting_to_fetch): - """ Returns the specific setting value from eula_settings_source or None if not found """ + def __fetch_specific_setting(settings_source, setting_to_fetch): + # type: (dict, str) -> str or None + """ Returns the specific setting value from the given settings_source or None if not found """ if settings_source is not None and setting_to_fetch is not None and setting_to_fetch in settings_source: return settings_source[setting_to_fetch] return None + @staticmethod + def __is_truthy(value): + # type: (any) -> bool + """Case-insensitive truthy evaluator for config values.""" + if isinstance(value, bool): + return value + if isinstance(value, int): + return value == 1 + + # Cross-version text types: + # py2 -> (str, unicode) + # py3 -> (str, str) + text_types = (str, type(u"")) + if isinstance(value, text_types): + return value.strip().lower() in ("true", "1") + return False + diff --git a/src/core/src/core_logic/PatchInstaller.py b/src/core/src/core_logic/PatchInstaller.py index 16a26adc..bead2f30 100644 --- a/src/core/src/core_logic/PatchInstaller.py +++ b/src/core/src/core_logic/PatchInstaller.py @@ -58,13 +58,16 @@ def start_installation(self, simulate=False): self.raise_if_telemetry_unsupported() self.raise_if_min_python_version_not_met() + maintenance_window = self.maintenance_window + package_manager = self.package_manager + reboot_manager = self.reboot_manager + self.composite_logger.log("\nStarting patch installation... [MachineId: " + self.env_layer.platform.vm_name() + "][ActivityId: " + self.execution_config.activity_id + "][StartTime: " + self.execution_config.start_time + "][MaintenanceWindowDuration: " + self.execution_config.duration + "]") self.stopwatch.start() - maintenance_window = self.maintenance_window - package_manager = self.package_manager - reboot_manager = self.reboot_manager + if self.execution_config.is_livepatch_requested: + package_manager.start_livepatch() # Early reboot if reboot is allowed by settings and required by the machine reboot_pending = self.package_manager.is_reboot_pending() diff --git a/src/core/src/package_managers/AptitudePackageManager.py b/src/core/src/package_managers/AptitudePackageManager.py index cdc1b2ec..8e413e77 100644 --- a/src/core/src/package_managers/AptitudePackageManager.py +++ b/src/core/src/package_managers/AptitudePackageManager.py @@ -20,6 +20,7 @@ import re import shutil import sys +import datetime from core.src.package_managers.PackageManager import PackageManager from core.src.bootstrap.Constants import Constants @@ -91,6 +92,11 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ self.package_install_expected_avg_time_in_seconds = 90 # As per telemetry data, the average time to install package is around 81 seconds for apt. + # Livepatch service + self.set_cutoff_date_in_livepatch_config_cmd = "sudo canonical-livepatch config cutoff-date=" + self.__try_reformat_date_for_livepatch(execution_config.max_patch_publish_date) + self.launch_livepatch_client_cmd = "sudo systemctl restart snap.canonical-livepatch.canonical-livepatchd" + self.get_livepatch_status_cmd = "sudo canonical-livepatch status --verbose --format json" + # region Sources Management def __get_custom_sources_to_spec(self, max_patch_published_date=str(), base_classification=str()): # type: (str, str) -> (str, str) @@ -839,6 +845,163 @@ def do_processes_require_restart(self): return False # endregion Reboot Management + #region Livepatch + def start_livepatch(self): + # type: () -> () + """ Applies livepatches on the machine, if its pre-req are met""" + if self.are_livepatch_prereq_met(): + self.start_livepatching_on_machine() + else: + self.composite_logger.log_warning("[APM] Livepatches are not applied since the pre-requisites were not met") + + def are_livepatch_prereq_met(self): + # type: () -> bool + """ Validates whether livepatch prereqs are met. + These pre-reqs are: Machine should be attached to a pro subscription and livepatch service should be enabled on the VM. """ + self.composite_logger.log_debug("[APM] Checking if all the pre-reqs to receive livepatches are met. NOTE: Livepatch service is only available on Ubuntu LTS paid pro VMs and has to be in enabled state") + if not self.ubuntu_pro_client.pro_client_attached_for_livepatching(): + error_message = "[APM] Livepatches will NOT be applied since the VM is not attached to a pro subscription." + self.composite_logger.log_error(error_message) + self.status_handler.add_error_to_status(error_message, Constants.PatchOperationErrorCodes.LIVEPATCH_ERROR) + return False + + if not self.ubuntu_pro_client.livepatch_service_enabled_on_machine(): + error_message = ("[APM] The Ubuntu Pro client reported that the Livepatch service is not enabled. Please enable it for Livepatching to succeed.") + self.composite_logger.log_error(error_message) + self.status_handler.add_error_to_status(error_message, Constants.PatchOperationErrorCodes.LIVEPATCH_ERROR) + return False + + self.composite_logger.log_debug("[APM] All Livepatch pre-reqs are met. VM is eligible to receive livepatches") + return True + + def start_livepatching_on_machine(self): + # type: () -> () + """Starts livepatching on the machine according to the configurations set in AzGPS and updates livepatch status in status blob""" + self.composite_logger.log_debug("[APM] Starting livepatching on the machine...") + if self.try_set_livepatch_cutoff_date_in_config(): + is_livepatch_client_launch_successful = self.launch_livepatch_client() + if not is_livepatch_client_launch_successful: + self.composite_logger.log_warning("[APM] A stale livepatch status may be reported since a manual launch/restart of the livepatch client failed") + self.fetch_and_update_livepatch_status_in_status_blob() + else: + self.composite_logger.log_warning("[APM] AzGPS's attempt to set livepatch cutoff config date failed. Any livepatch applied by the livepatch client will not honor a cutoff date. " + "Please check previous logs for more details on why it failed and fix the issue.") + + def try_set_livepatch_cutoff_date_in_config(self): + # type: () -> bool + cmd = self.set_cutoff_date_in_livepatch_config_cmd + self.composite_logger.log_debug("[APM] Attempting to set livepatch cutoff date in livepatch config using [cmd={0}]".format(str(cmd))) + try: + code, output = self.env_layer.run_command_output(cmd, False, False) + if code == 0: + self.composite_logger.log_debug("[APM] Successfully set cutoff date in livepatch config. [ConfigSet={0}]".format(str(output))) + return True + else: + error_msg = "[APM] Command to set cutoff date in livepatch config failed. [Cmd={0}][Code={1}][Output={2}]".format(str(cmd), str(code), str(output)) + self.composite_logger.log_error(error_msg) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.LIVEPATCH_ERROR) + # Q: should we disable livepatching if we fail to set cutoff date in config since it's a critical config for livepatching to work properly? + except Exception as error: + error_msg = "[APM] Livepatch config update Exception: [Exception={0}]".format(repr(error)) + self.composite_logger.log_error(error_msg) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.LIVEPATCH_ERROR) + # Q: should we disable livepatching if we fail to set cutoff date in config since it's a critical config for livepatching to work properly? + return False + + def launch_livepatch_client(self): + # type: () -> bool + """ Launch livepatch client manually as best case effort to ensure livepatches are applied in a timely manner. + If this fails, livepatches will still be applied but it will be up to the machine's cron to trigger it""" + launch_successful = False + self.composite_logger.log("[APM] Launching livepatch client...") + try: + code, output = self.env_layer.run_command_output(self.launch_livepatch_client_cmd, False, False) + if code == 0: + self.composite_logger.log_debug("[APM] Successfully launched livepatch client. [Output={0}]".format(str(output))) + launch_successful = True + else: + self.composite_logger.log_warning("[APM] Failed to launch livepatch client. [Output={0}]".format(str(output))) + except Exception as error: + livepatch_launch_exception = repr(error) + self.composite_logger.log_warning("[APM] Exception while launching livepatch client. [Exception={0}]".format(livepatch_launch_exception)) + return launch_successful + + def fetch_and_update_livepatch_status_in_status_blob(self): + # type: () -> () + """Fetches livepatch status and if a livepatch/es is/are applied, updates it as a new patch entry in PatchInstallationSummary""" + livepatch_status = self.try_get_livepatch_status() + if livepatch_status: + self.update_livepatch_status_in_patch_installation_summary(livepatch_status) + + def try_get_livepatch_status(self): + # type: () -> dict + """ Attempts to fetch livepatch status and return it in json format. If it fails, returns an empty json""" + self.composite_logger.log_debug("[APM] Fetching livepatch status...") + livepatch_status = {} + cmd = self.get_livepatch_status_cmd + try: + code, output = self.env_layer.run_command_output(cmd, False, False) + if code == 0: + livepatch_status = json.loads(output) + self.composite_logger.log_debug("[APM] Successfully fetched livepatch status. [Status={0}]".format(str(livepatch_status))) + else: + error_msg = "[APM] Failed to fetch livepatch status. [Cmd={0}][Code={1}][Output={2}]".format(str(cmd), str(code), str(output)) + raise Exception(error_msg) + except Exception as error: + error_msg = "[APM] Exception while fetching livepatch status. [Cmd={0}][Exception={1}]".format(str(cmd), repr(error)) + self.composite_logger.log_error(error_msg) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.LIVEPATCH_ERROR) + return livepatch_status + + def update_livepatch_status_in_patch_installation_summary(self, livepatch_status): + # type: (dict) -> () + """ Updates livepatch status in patch installation summary as a new patch record """ + self.composite_logger.log_debug("[APM] Updating patch installation summary with livepatch status. [LivepatchStatus={0}]".format(str(livepatch_status))) + extracted_livepatch_fields = self.extract_livepatch_fields(livepatch_status) + if len(extracted_livepatch_fields) == 0: + self.composite_logger.log_warning("[APM] No supported running livepatch entry found in livepatch status. Nothing to update in patch installation summary") + return + + livepatch_fields = extracted_livepatch_fields[0] + check_state = str(livepatch_fields["CheckState"]) + state = str(livepatch_fields["State"]) + patch_name = "livepatch_" + check_state + "_" + state + patch_version = str(livepatch_fields["Version"]) + + patch_status = Constants.NOT_SELECTED + if state.lower() == "applied": + patch_status = Constants.INSTALLED + self.status_handler.set_package_install_status(patch_name, patch_version, patch_status) + + def extract_livepatch_fields(self, livepatch_status): + # type: (dict) -> list + """ Extracts and returns following fields from livepatch status: Status.Livepatch.CheckState, Status.Livepatch.State, Status.Livepatch.Version. + Refer the expected output format at: \src\tools\references\canonical-livepatch_status_cmd_expected_output_format.txt""" + extracted = [] + + for status_item in livepatch_status.get("Status", []): + if status_item.get("Running", False) == True and status_item.get("Supported").lower() != "unsupported": + livepatch = status_item.get("Livepatch", {}) + extracted.append({ + "CheckState": livepatch.get("CheckState", ""), + "State": livepatch.get("State", ""), + "Version": livepatch.get("Version", "") + }) + break + + return extracted + + def __try_reformat_date_for_livepatch(self, date_str): + # type: (str) -> str + """Converts ISO 8601 date format from basic (20240401T000000Z) to extended (2024-04-01T00:00:00Z).""" + try: + return str(self.env_layer.datetime.datetime_iso_basic_string_to_extended_string(date_str)) + except Exception as error: + self.composite_logger.log_error("Invalid date string received, could not format it to a livepatch config acceptable format. [DateStr={0}][Exception={1}]" + .format(str(date_str), repr(error))) + return "" + # endregion Livepatch + def is_reboot_pending(self): """ Checks if there is a pending reboot on the machine. """ ubuntu_pro_client_check_success = False diff --git a/src/core/src/package_managers/PackageManager.py b/src/core/src/package_managers/PackageManager.py index ee177c25..c7fa04ee 100644 --- a/src/core/src/package_managers/PackageManager.py +++ b/src/core/src/package_managers/PackageManager.py @@ -450,6 +450,12 @@ def check_known_issues_and_attempt_fix(self, output): return True # endregion + # region Livepatch + def start_livepatch(self): + """ Applies livepatches on the machine, if supported by the distro and all pre-reqs are met""" + pass + # endregion + @abstractmethod def is_reboot_pending(self): """ Checks if there is a pending reboot on the machine. """ diff --git a/src/core/src/package_managers/TdnfPackageManager.py b/src/core/src/package_managers/TdnfPackageManager.py index f19c43d2..b06d09e3 100644 --- a/src/core/src/package_managers/TdnfPackageManager.py +++ b/src/core/src/package_managers/TdnfPackageManager.py @@ -734,6 +734,12 @@ def do_processes_require_restart(self): return False # endregion + # region Livepatch + def start_livepatch(self): + """ Applies livepatches on the machine, if supported by the distro and all pre-reqs are met""" + pass + # endregion + def get_tdnf_version(self): # type: () -> any """Get the version of TDNF installed on the system""" diff --git a/src/core/src/package_managers/UbuntuProClient.py b/src/core/src/package_managers/UbuntuProClient.py index 1bb4f114..899cc496 100644 --- a/src/core/src/package_managers/UbuntuProClient.py +++ b/src/core/src/package_managers/UbuntuProClient.py @@ -27,6 +27,7 @@ def __init__(self, env_layer, composite_logger): self.composite_logger = composite_logger self.ubuntu_pro_client_install_cmd = 'sudo apt-get install ubuntu-advantage-tools -y' self.ubuntu_pro_client_security_status_cmd = 'pro security-status --format=json' + self.ubuntu_pro_client_status_cmd = 'pro status --all --format=json' self.security_esm_criteria_strings = ["esm-infra", "esm-apps"] self.is_ubuntu_pro_client_attached = False self.version_comparator = VersionComparator() @@ -172,3 +173,37 @@ def is_reboot_pending(self): self.composite_logger.log_debug("[APM][Pro] Ubuntu Pro Client Reboot Required: [UbuntuProClientSuccess={0}][RebootRequiredFlag={1}][Error={2}]".format(ubuntu_pro_client_api_success, ubuntu_pro_client_reboot_required, ubuntu_pro_client_exception)) return ubuntu_pro_client_api_success, ubuntu_pro_client_reboot_required + + # region Livepatch + def pro_client_attached_for_livepatching(self): + # type: () -> bool + """ Checks if the machine is an Ubuntu LTS Pro VM since livepatching is only supported on those""" + if not self.is_ubuntu_pro_client_attached: + self.composite_logger.log_warning("[APM][Pro] Ubuntu Pro Client is not attached") + return False + + return True + + def livepatch_service_enabled_on_machine(self): + # type: () -> bool + """ Verifies if livepatch service is enabled on the machine """ + livepatch_service_enabled = False + try: + code, output = self.env_layer.run_command_output(self.ubuntu_pro_client_status_cmd, False, False) + if code == 0: + json_output = json.loads(output) + livepatch_service = next((service for service in json_output.get("services", []) if service.get('name').lower() == 'livepatch'), None) + self.composite_logger.log_debug("[APM][Pro] Livepatch service status from Ubuntu Pro Client: [LivepatchService={0}]".format(str(livepatch_service))) + livepatch_status = livepatch_service.get("status", "unknown") if livepatch_service is not None else "unknown" + if livepatch_status.lower() == 'enabled' or livepatch_status.lower() == 'warning': + self.composite_logger.log_debug("[APM][Pro] Livepatch service is enabled on the machine.") + livepatch_service_enabled = True + else: + self.composite_logger.log_warning("[APM][Pro] Livepatch service is NOT enabled on the machine.") + except Exception as error: + ubuntu_pro_client_exception = repr(error) + self.composite_logger.log_debug("[APM][Pro] Ubuntu Pro Client status Exception: [Exception={0}]".format(ubuntu_pro_client_exception)) + self.composite_logger.log_warning("[APM][Pro] Failed to determine if livepatch service is enabled on the machine due to error while querying Ubuntu Pro Client status. AzGPS will consider the service to be disabled.") + return livepatch_service_enabled + # endregion Livepatch + diff --git a/src/core/src/package_managers/YumPackageManager.py b/src/core/src/package_managers/YumPackageManager.py index 9e99cd41..d6398f42 100644 --- a/src/core/src/package_managers/YumPackageManager.py +++ b/src/core/src/package_managers/YumPackageManager.py @@ -1082,6 +1082,12 @@ def do_processes_require_restart(self): return process_count != 0 # True if there were any # endregion Reboot Management + # region Livepatch + def start_livepatch(self): + """ Applies livepatches on the machine, if supported by the distro and all pre-reqs are met""" + pass + # endregion + def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): """ Add the packages with same name as that of input parameter package but with different architectures from packages list to the list package_and_dependencies. diff --git a/src/core/src/package_managers/ZypperPackageManager.py b/src/core/src/package_managers/ZypperPackageManager.py index 365eb1da..0682774e 100644 --- a/src/core/src/package_managers/ZypperPackageManager.py +++ b/src/core/src/package_managers/ZypperPackageManager.py @@ -846,6 +846,12 @@ def do_processes_require_restart(self): return process_count != 0 # True if there were any # endregion Reboot Management + # region Livepatch + def start_livepatch(self): + """ Applies livepatches on the machine, if supported by the distro and all pre-reqs are met""" + pass + # endregion + def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): """ Add the packages with same name as that of input parameter package but with different architectures from packages list to the list package_and_dependencies. diff --git a/src/core/tests/Test_AptitudePackageManager.py b/src/core/tests/Test_AptitudePackageManager.py index afc48095..9fcb5b7f 100644 --- a/src/core/tests/Test_AptitudePackageManager.py +++ b/src/core/tests/Test_AptitudePackageManager.py @@ -69,9 +69,17 @@ def mock_is_reboot_pending_returns_False(self): def mock_os_path_isfile_raise_exception(self, file): raise Exception + def mock_run_command_output_raise_exception(self, cmd="", output=False, chk_err=False): + raise Exception + def mock_get_security_updates_return_empty_list(self): return [], [] + def mock_launch_livepatch_client_failed(self): + return False + + def mock_ubuntu_pro_client_livepatch_service_enabled_on_machine_returns_true(self): + return True # endregion Mocks # region Utility Functions @@ -1068,6 +1076,249 @@ def test_eula_acceptance_file_read_when_no_data_found(self): self.assertEqual(exec_config.accept_package_eula, False) runtime.stop() + def test_are_livepatch_prereq_met_success(self): + package_manager = self.__setup_package_manager() + self.assertIsNotNone(package_manager) + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = True + self.assertTrue(package_manager.are_livepatch_prereq_met()) + + def test_are_livepatch_prereq_met_failure(self): + package_manager = self.__setup_package_manager() + self.assertIsNotNone(package_manager) + self.runtime.status_handler.set_current_operation(Constants.INSTALLATION) + + # VM is not attached + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = False + self.assertFalse(package_manager.are_livepatch_prereq_met()) + substatus_file_data = self.__get_substatus_from_status_file()[0] + errors = json.loads(substatus_file_data["formattedMessage"]["message"])["errors"] + self.assertNotEqual(errors, None) + self.assertTrue("Livepatches will NOT be applied since the VM is not attached to a pro subscription." in str(errors)) + + #VM is attached but livepatch service not enabled + package_manager = self.__setup_package_manager(legacy_type='SadPath') + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = True + self.assertFalse(package_manager.are_livepatch_prereq_met()) + substatus_file_data = self.__get_substatus_from_status_file()[0] + updated_errors = json.loads(substatus_file_data["formattedMessage"]["message"])["errors"] + self.assertNotEqual(updated_errors, None) + self.assertTrue("The Ubuntu Pro client reported that the Livepatch service is not enabled." in str(updated_errors)) + + def test_try_set_livepatch_cutoff_date_in_config_success(self): + package_manager = self.__setup_package_manager() + self.assertTrue(package_manager.try_set_livepatch_cutoff_date_in_config()) + + def test_try_set_livepatch_cutoff_date_in_config_failure(self): + package_manager = self.__setup_package_manager(legacy_type='SadPath') + self.assertFalse(package_manager.try_set_livepatch_cutoff_date_in_config()) + + def test_try_set_livepatch_cutoff_date_in_config_exception_path(self): + package_manager = self.container.get('package_manager') + backup_run_command_output = package_manager.env_layer.run_command_output + package_manager.env_layer.run_command_output = self.mock_run_command_output_raise_exception + self.assertFalse(package_manager.try_set_livepatch_cutoff_date_in_config()) + package_manager.env_layer.run_command_output = backup_run_command_output + + def test_launch_livepatch_client_success(self): + package_manager = self.__setup_package_manager() + self.assertTrue(package_manager.launch_livepatch_client()) + + def test_launch_livepatch_client_failure(self): + package_manager = self.__setup_package_manager(legacy_type='SadPath') + self.assertFalse(package_manager.launch_livepatch_client()) + + def test_launch_livepatch_client_exception_path(self): + package_manager = self.__setup_package_manager() + backup_run_command_output = package_manager.env_layer.run_command_output + package_manager.env_layer.run_command_output = self.mock_run_command_output_raise_exception + self.assertFalse(package_manager.launch_livepatch_client()) + package_manager.env_layer.run_command_output = backup_run_command_output + + def test_try_get_livepatch_status_success(self): + expected_livepatch_status = { + "Status":[{ + "Kernel":"123", + "Running":True, + "Livepatch": {"CheckState": "checked","State": "nothing-to-apply","Version": ""}, + "Supported": "supported", + "UpgradeRequiredDate": "2027-04-13"}], + "tier":"stable", + "Using-Cutoff-Date":True + } + package_manager = self.__setup_package_manager() + self.assertEqual(package_manager.try_get_livepatch_status(), expected_livepatch_status) + + def test_try_get_livepatch_status_failure(self): + package_manager = self.__setup_package_manager(legacy_type='SadPath') + self.runtime.status_handler.set_current_operation(Constants.INSTALLATION) + self.assertEqual(package_manager.try_get_livepatch_status(), {}) + substatus_file_data = self.__get_substatus_from_status_file()[0] + errors = json.loads(substatus_file_data["formattedMessage"]["message"])["errors"] + self.assertNotEqual(errors, None) + self.assertTrue("Exception while fetching livepatch status." in str(errors)) + + def test_try_get_livepatch_status_exception_path(self): + package_manager = self.__setup_package_manager() + self.runtime.status_handler.set_current_operation(Constants.INSTALLATION) + backup_run_command_output = package_manager.env_layer.run_command_output + package_manager.env_layer.run_command_output = self.mock_run_command_output_raise_exception + self.assertEqual(package_manager.try_get_livepatch_status(), {}) + substatus_file_data = self.__get_substatus_from_status_file()[0] + errors = json.loads(substatus_file_data["formattedMessage"]["message"])["errors"] + self.assertNotEqual(errors, None) + self.assertTrue("Exception while fetching livepatch status" in str(errors)) + package_manager.env_layer.run_command_output = backup_run_command_output + + def test_update_livepatch_status_in_patch_installation_summary_success(self): + livepatch_status = { + "Status": [{ + "Kernel": "123", + "Running": True, + "Livepatch": {"CheckState": "checked", "State": "applied", "Version": "1.0"}, + "Supported": "supported", + "UpgradeRequiredDate": "2027-04-13"}], + "tier": "stable", + "Using-Cutoff-Date": True + } + substatus_file_data = self.__invoke_update_livepatch_status_and_fetch_status(livepatch_status)[0] + + self.assertNotEqual(json.loads(substatus_file_data["formattedMessage"]["message"])["errors"], None) + self.assertEqual(substatus_file_data["name"], Constants.PATCH_INSTALLATION_SUMMARY) + patch = json.loads(substatus_file_data["formattedMessage"]["message"])["patches"][0] + self.assertEqual(patch["name"],"livepatch_checked_applied") + self.assertTrue("Other" in str(patch["classifications"])) + self.assertEqual(patch["patchInstallationState"], Constants.INSTALLED) + self.assertEqual(patch["version"],"1.0") + + def test_update_livepatch_status_in_patch_installation_summary_success_with_no_livepatch_applied(self): + livepatch_status = { + "Status": [{ + "Kernel": "123", + "Running": True, + "Livepatch": {"CheckState": "checked", "State": "nothing-to-apply", "Version": ""}, + "Supported": "supported", + "UpgradeRequiredDate": "2027-04-13"}], + "tier": "stable", + "Using-Cutoff-Date": True + } + substatus_file_data = self.__invoke_update_livepatch_status_and_fetch_status(livepatch_status)[0] + + self.assertNotEqual(json.loads(substatus_file_data["formattedMessage"]["message"])["errors"], None) + self.assertEqual(substatus_file_data["name"], Constants.PATCH_INSTALLATION_SUMMARY) + patch = json.loads(substatus_file_data["formattedMessage"]["message"])["patches"][0] + self.assertEqual(patch["name"], "livepatch_checked_nothing-to-apply") + self.assertTrue("Other" in str(patch["classifications"])) + self.assertEqual(patch["patchInstallationState"], Constants.NOT_SELECTED) + self.assertEqual(patch["version"], "") + + def test_update_livepatch_status_in_patch_installation_summary_empty_livepatch_status(self): + livepatch_status = { + "Status": [{ + "Kernel": "123", + "Running": False, + "Livepatch": {"CheckState": "checked", "State": "nothing-to-apply", "Version": ""}, + "Supported": "supported", + "UpgradeRequiredDate": "2027-04-13"}], + "tier": "stable", + "Using-Cutoff-Date": True + } + substatus_file_data = self.__invoke_update_livepatch_status_and_fetch_status(livepatch_status) + self.assertEqual(len(substatus_file_data), 0) + + def test_set_config_date_in_livepatch_cmd_success(self): + self.runtime.execution_config.max_patch_publish_date = "20250324T000000Z" + package_manager_for_test = AptitudePackageManager.AptitudePackageManager(self.runtime.env_layer, self.runtime.execution_config, self.runtime.composite_logger, + self.runtime.telemetry_writer, self.runtime.status_handler) + self.assertEqual(package_manager_for_test.set_cutoff_date_in_livepatch_config_cmd, "sudo canonical-livepatch config cutoff-date=2025-03-24T00:00:00Z") + + def test_set_config_date_in_livepatch_cmd_failure(self): + self.runtime.execution_config.max_patch_publish_date = "2025-0324T000000Z" + package_manager_for_test = AptitudePackageManager.AptitudePackageManager(self.runtime.env_layer, self.runtime.execution_config, self.runtime.composite_logger, + self.runtime.telemetry_writer, self.runtime.status_handler) + self.assertEqual(package_manager_for_test.set_cutoff_date_in_livepatch_config_cmd, "sudo canonical-livepatch config cutoff-date=") + + def test_start_livepatch_success(self): + package_manager = self.__setup_package_manager() + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = True + self.runtime.status_handler.set_current_operation(Constants.INSTALLATION) + package_manager.start_livepatch() + + substatus_file_data = self.__get_substatus_from_status_file()[0] + self.assertNotEqual(json.loads(substatus_file_data["formattedMessage"]["message"])["errors"], None) + self.assertEqual(substatus_file_data["name"], Constants.PATCH_INSTALLATION_SUMMARY) + patch = json.loads(substatus_file_data["formattedMessage"]["message"])["patches"][0] + self.assertEqual(patch["name"], "livepatch_checked_nothing-to-apply") + self.assertTrue("Other" in str(patch["classifications"])) + self.assertEqual(patch["patchInstallationState"], Constants.NOT_SELECTED) + self.assertEqual(patch["version"], "") + + def test_start_livepatch_when_pre_req_not_met(self): + package_manager = self.__setup_package_manager(legacy_type='SadPath') + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = True + self.runtime.status_handler.set_current_operation(Constants.INSTALLATION) + + package_manager.start_livepatch() + substatus_file_data = self.__get_substatus_from_status_file() + self.assertEqual(len(substatus_file_data), 1) + errors = json.loads(substatus_file_data[0]["formattedMessage"]["message"])["errors"] + self.assertNotEqual(errors, None) + self.assertTrue("The Ubuntu Pro client reported that the Livepatch service is not enabled" in str(errors)) + + def test_start_livepatch_when_livepatch_config_date_not_set(self): + # cmd to set config date in livepatch service failed. So livepatch client is not launched and status not updated with livepatch data + package_manager = self.__setup_package_manager(legacy_type='SadPath') + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = True + self.runtime.status_handler.set_current_operation(Constants.INSTALLATION) + backup_ubuntu_pro_client_livepatch_service_enabled_on_machine = package_manager.ubuntu_pro_client.livepatch_service_enabled_on_machine + package_manager.ubuntu_pro_client.livepatch_service_enabled_on_machine = self.mock_ubuntu_pro_client_livepatch_service_enabled_on_machine_returns_true + + package_manager.start_livepatch() + substatus_file_data = self.__get_substatus_from_status_file()[0] + self.assertNotEqual(json.loads(substatus_file_data["formattedMessage"]["message"])["errors"], None) + self.assertEqual(substatus_file_data["name"], Constants.PATCH_INSTALLATION_SUMMARY) + patches = json.loads(substatus_file_data["formattedMessage"]["message"])["patches"] + self.assertEqual(len(patches), 0) + package_manager.ubuntu_pro_client.livepatch_service_enabled_on_machine = backup_ubuntu_pro_client_livepatch_service_enabled_on_machine + + def test_start_livepatch_when_launch_livepatch_client_failed(self): + # livepatch client is not launched, status is still updated with stale livepatch config + package_manager = self.__setup_package_manager() + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = True + self.runtime.status_handler.set_current_operation(Constants.INSTALLATION) + backup_launch_livepatch_client = package_manager.launch_livepatch_client + package_manager.launch_livepatch_client = self.mock_launch_livepatch_client_failed + + package_manager.start_livepatch() + substatus_file_data = self.__get_substatus_from_status_file()[0] + self.assertNotEqual(json.loads(substatus_file_data["formattedMessage"]["message"])["errors"], None) + self.assertEqual(substatus_file_data["name"], Constants.PATCH_INSTALLATION_SUMMARY) + patch = json.loads(substatus_file_data["formattedMessage"]["message"])["patches"][0] + self.assertEqual(patch["name"], "livepatch_checked_nothing-to-apply") + self.assertTrue("Other" in str(patch["classifications"])) + self.assertEqual(patch["patchInstallationState"], Constants.NOT_SELECTED) + self.assertEqual(patch["version"], "") + package_manager.launch_livepatch_client = backup_launch_livepatch_client + + def __setup_package_manager(self, legacy_type="HappyPath"): + self.runtime.set_legacy_test_type(legacy_type) + package_manager = self.container.get('package_manager') + return package_manager + + def __get_substatus_from_status_file(self): + substatus_file_data = [] + with self.runtime.env_layer.file_system.open(self.runtime.execution_config.status_file_path, 'r') as file_handle: + status = json.load(file_handle) + self.assertEqual(status[0]["status"]["status"].lower(), Constants.STATUS_SUCCESS.lower()) + substatus_file_data = status[0]["status"]["substatus"] + return substatus_file_data + + def __invoke_update_livepatch_status_and_fetch_status(self, livepatch_status): + package_manager = self.__setup_package_manager() + self.runtime.status_handler.set_current_operation(Constants.INSTALLATION) + package_manager.update_livepatch_status_in_patch_installation_summary(livepatch_status) + self.assertEqual(os.path.exists(self.runtime.execution_config.status_file_path), True) + return self.__get_substatus_from_status_file() + if __name__ == '__main__': unittest.main() diff --git a/src/core/tests/Test_ExecutionConfig.py b/src/core/tests/Test_ExecutionConfig.py index 16756e2c..21a6a4e6 100644 --- a/src/core/tests/Test_ExecutionConfig.py +++ b/src/core/tests/Test_ExecutionConfig.py @@ -13,9 +13,12 @@ # limitations under the License. # # Requires Python 2.7+ +import json +import os import unittest from core.src.bootstrap.Constants import Constants +from core.src.core_logic.ExecutionConfig import ExecutionConfig from core.tests.library.ArgumentComposer import ArgumentComposer from core.tests.library.RuntimeCompositor import RuntimeCompositor @@ -27,6 +30,11 @@ def setUp(self): def tearDown(self): pass + # region Mocks + def mock_read_with_retry_raise_exception(self): + raise Exception + # endregion + def test_get_max_patch_publish_date(self): test_input_output_table = [ ["pub_off_sku_2020.09.29", "20200929T000000Z"], @@ -42,6 +50,179 @@ def test_get_max_patch_publish_date(self): runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.YUM) for row in test_input_output_table: self.assertEqual(runtime.execution_config._ExecutionConfig__get_max_patch_publish_date(row[0]), row[1]) + self.__teardown(runtime) + + def test_livepatch_config_when_file_does_not_exist(self): + # livepatch in-VM customer config does not exist + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=False) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=dict(), + expected_livepatch_requested=False, expected_livepatch_only_requested=False, expected_file_exists=False) + self.__teardown(runtime) + + def test_livepatch_config_when_no_data_found_in_file(self): + # livepatch in-VM customer config file exists but the file has no data + livepatch_settings = None + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=dict(), + expected_livepatch_requested=False, expected_livepatch_only_requested=False, expected_file_exists=True) + self.__teardown(runtime) + + def test_livepatch_config_when_enable_livepatch_not_in_config(self): + # EnableLivePatch not set in config + livepatch_settings = { + "EnabledBy": "TestSetup", + "LastModified": "2026-04-21" + } + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=False, expected_livepatch_only_requested=False, expected_file_exists=True) + self.__teardown(runtime) + + def test_livepatch_config_when_enable_livepatch_not_set_as_boolean(self): + livepatch_settings = { + "EnabledBy": "TestSetup", + "LastModified": "2026-04-21", + "EnableLivePatch": "test", + "LivePatchOnly": "test" + } + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=False, expected_livepatch_only_requested=False, expected_file_exists=True) + self.__teardown(runtime) + + def test_livepatch_config_with_illformed_config(self): + livepatch_settings = ["test unexpected config value"] + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=False, expected_livepatch_only_requested=False, expected_file_exists=True) + self.__teardown(runtime) + + def test_livepatch_config_when_file_read_raises_exception(self): + livepatch_settings = { + "EnabledBy": "TestSetup", + "LastModified": "2026-04-21", + "EnableLivePatch": "test", + "LivePatchOnly": "test" + } + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.backup_read_with_retry = runtime.env_layer.file_system.read_with_retry + runtime.env_layer.file_system.read_with_retry = self.mock_read_with_retry_raise_exception + exec_config = ExecutionConfig(runtime.env_layer, runtime.composite_logger, str(runtime.argv)) + self.__assert_livepatch_configs(execution_config=exec_config, expected_livepatch_config_settings=dict(), + expected_livepatch_requested=False, expected_livepatch_only_requested=False, expected_file_exists=True) + runtime.env_layer.file_system.read_with_retry = self.backup_read_with_retry + self.__teardown(runtime) + + def test_livepatch_config_file_with_livepatch_enabled_set_to_false(self): + # Tests livepatch enable set with different non-true values + + # Value set to boolean False + livepatch_settings = { + "EnabledBy": "TestSetup", + "LastModified": "2026-04-21", + "EnableLivePatch": False, + "LivePatchOnly": False + } + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=False, expected_livepatch_only_requested=False, expected_file_exists=True) + self.__teardown(runtime) + + # Value set to random string + livepatch_settings = { + "EnabledBy": "TestSetup", + "LastModified": "2026-04-21", + "EnableLivePatch": "3", + "LivePatchOnly": "test" + } + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=False, expected_livepatch_only_requested=False, expected_file_exists=True) + self.__teardown(runtime) + + # LivepatchOnly set to true + livepatch_settings = { + "EnabledBy": "TestSetup", + "LastModified": "2026-04-21", + "EnableLivePatch": False, + "LivePatchOnly": True + } + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=False, expected_livepatch_only_requested=True, expected_file_exists=True) + self.__teardown(runtime) + + def test_livepatch_config_file_with_livepatch_enable_set_to_true(self): + # Tests EnableLivePatch with all acceptable values of true + + # Value set to boolean True + livepatch_settings = { + "EnabledBy": "TestSetup", + "LastModified": "2026-04-21", + "EnableLivePatch": True, + "LivePatchOnly": False + } + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=True, expected_livepatch_only_requested=False, expected_file_exists=True) + self.__teardown(runtime) + + # Value set to string "True" + livepatch_settings["EnableLivePatch"] = "True" + livepatch_settings["LivePatchOnly"] = "True" + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=True, expected_livepatch_only_requested=True, expected_file_exists=True) + self.__teardown(runtime) + + # Value set to string "true" + livepatch_settings["EnableLivePatch"] = "true" + livepatch_settings["LivePatchOnly"] = "true" + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=True, expected_livepatch_only_requested=True, expected_file_exists=True) + self.__teardown(runtime) + + # Value set to string "1" + livepatch_settings["EnableLivePatch"] = "1" + livepatch_settings["LivePatchOnly"] = "1" + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=True, expected_livepatch_only_requested=True, expected_file_exists=True) + self.__teardown(runtime) + + # Value set to 1 + livepatch_settings["EnableLivePatch"] = 1 + livepatch_settings["LivePatchOnly"] = 1 + runtime, execution_config = self.__setup_and_init_execution_config(write_to_file=True, livepatch_settings=livepatch_settings) + self.__assert_livepatch_configs(execution_config=execution_config, expected_livepatch_config_settings=livepatch_settings, + expected_livepatch_requested=True, expected_livepatch_only_requested=True, expected_file_exists=True) + self.__teardown(runtime) + + def __write_livepatch_settings_to_file(self, livepatch_settings): + f = open(Constants.AzGPSPaths.LIVEPATCH_CUSTOMER_SETTINGS, "w+") + f.write(json.dumps(livepatch_settings)) + f.close() + + def __setup_and_init_execution_config(self, write_to_file=False, livepatch_settings= None): + argument_composer = ArgumentComposer() + if write_to_file: + self.__write_livepatch_settings_to_file(livepatch_settings) + runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, package_manager_name=Constants.APT) + container = runtime.container + execution_config = container.get('execution_config') + return runtime, execution_config + + def __assert_livepatch_configs(self, execution_config, expected_livepatch_config_settings, expected_livepatch_requested, expected_livepatch_only_requested, expected_file_exists): + self.assertEqual(execution_config.livepatch_customer_config_settings, expected_livepatch_config_settings) + self.assertEqual(execution_config.is_livepatch_requested, expected_livepatch_requested) + self.assertEqual(execution_config.is_livepatch_only_requested, expected_livepatch_only_requested) + self.assertEqual(os.path.exists(Constants.AzGPSPaths.LIVEPATCH_CUSTOMER_SETTINGS), expected_file_exists) + + def __teardown(self, runtime): + # remove the livepatch settings file if it exists after the test + if os.path.exists(Constants.AzGPSPaths.LIVEPATCH_CUSTOMER_SETTINGS): + os.remove(Constants.AzGPSPaths.LIVEPATCH_CUSTOMER_SETTINGS) runtime.stop() diff --git a/src/core/tests/Test_UbuntuProClient.py b/src/core/tests/Test_UbuntuProClient.py index 9865df00..a1d07ba1 100644 --- a/src/core/tests/Test_UbuntuProClient.py +++ b/src/core/tests/Test_UbuntuProClient.py @@ -405,5 +405,43 @@ def test_get_other_updates_exception(self): package_manager.ubuntu_pro_client.get_ubuntu_pro_client_updates = backup_get_ubuntu_pro_client_updates obj.mock_unimport_uaclient_update_module() + def test_pro_client_attached_for_livepatching_returns_true(self): + package_manager = self.container.get('package_manager') + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = True + self.assertTrue(package_manager.ubuntu_pro_client.pro_client_attached_for_livepatching()) + + def test_pro_client_attached_for_livepatching_returns_false(self): + package_manager = self.container.get('package_manager') + package_manager.ubuntu_pro_client.is_ubuntu_pro_client_attached = False + self.assertFalse(package_manager.ubuntu_pro_client.pro_client_attached_for_livepatching()) + + def test_livepatch_service_enabled_on_machine_returns_true(self): + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager.ubuntu_pro_client.livepatch_service_enabled_on_machine()) + + def test_livepatch_service_enabled_on_machine_returns_false(self): + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + self.assertFalse(package_manager.ubuntu_pro_client.livepatch_service_enabled_on_machine()) + + def test_livepatch_service_enabled_on_machine_exception_path(self): + package_manager = self.container.get('package_manager') + backup_run_command_output = package_manager.env_layer.run_command_output + package_manager.env_layer.run_command_output = self.mock_run_command_output_raise_exception + self.assertFalse(package_manager.ubuntu_pro_client.livepatch_service_enabled_on_machine()) + package_manager.env_layer.run_command_output = backup_run_command_output + + def test_livepatch_service_enabled_on_machine_returns_false_when_no_data_on_livepatch_found(self): + # Test scenario: No data for livepatch service returned + self.runtime.set_legacy_test_type('AnotherSadPath') + package_manager = self.container.get('package_manager') + self.assertFalse(package_manager.ubuntu_pro_client.livepatch_service_enabled_on_machine()) + + # Test scenario: No service data returned + self.runtime.set_legacy_test_type('UnalignedPath') + package_manager = self.container.get('package_manager') + self.assertFalse(package_manager.ubuntu_pro_client.livepatch_service_enabled_on_machine()) + + if __name__ == '__main__': unittest.main() diff --git a/src/core/tests/library/ArgumentComposer.py b/src/core/tests/library/ArgumentComposer.py index 550f225b..6e3a0e15 100644 --- a/src/core/tests/library/ArgumentComposer.py +++ b/src/core/tests/library/ArgumentComposer.py @@ -48,6 +48,7 @@ def __init__(self): self.events_folder = self.__get_custom_folder(self.__log_folder, self.__EVENTS_FOLDER) self.temp_folder = self.__get_custom_folder(scratch_folder, self.__TEMP_FOLDER) Constants.AzGPSPaths.EULA_SETTINGS = os.path.join(scratch_folder, "patch.eula.settings") + Constants.AzGPSPaths.LIVEPATCH_CUSTOMER_SETTINGS = os.path.join(scratch_folder, "patch.livepatching.settings") # config settings self.operation = Constants.INSTALLATION diff --git a/src/core/tests/library/LegacyEnvLayerExtensions.py b/src/core/tests/library/LegacyEnvLayerExtensions.py index 967ab32e..2e45284b 100644 --- a/src/core/tests/library/LegacyEnvLayerExtensions.py +++ b/src/core/tests/library/LegacyEnvLayerExtensions.py @@ -539,6 +539,28 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find('pro security-status --format=json') > -1: code = 0 output = "{\"summary\":{\"ua\":{\"attached\":true}}}" + elif cmd.find('pro status --all --format=json') > -1: + code = 0 + output = ("{\"services\":[" + "{\"available\":\"no\", \"blocked_by\":[],\"description\":\"Management and administration tool for Ubuntu\",\"description_override\": null," + "\"entitled\":\"yes\", \"name\":\"landscape\", \"status\":\"n/a\",\"status_details\":\"\",\"warning\": null}," + "{\"available\":\"yes\",\"blocked_by\":[],\"description\":\"Canonical Livepatch service\",\"description_override\":null," + "\"entitled\":\"yes\",\"name\":\"livepatch\", \"status\":\"enabled\", \"status_details\": \"\",\"warning\":null}]}") + elif cmd.find('canonical-livepatch config cutoff-date') > -1: + code = 0 + output = 'cutoff-date: "2025-10-01T12:00:00Z"' + elif cmd.find('sudo systemctl restart snap.canonical-livepatch.canonical-livepatchd') > -1: + code = 0 + output = '' + elif cmd.find('sudo canonical-livepatch status --verbose --format json') > -1: + code = 0 + output = ("{\"Status\":[" + "{\"Kernel\":\"123\"," + "\"Running\":true," + "\"Livepatch\": {\"CheckState\": \"checked\",\"State\": \"nothing-to-apply\",\"Version\": \"\"}," + "\"Supported\": \"supported\"," + "\"UpgradeRequiredDate\": \"2027-04-13\"}]," + "\"tier\":\"stable\",\"Using-Cutoff-Date\":true}") elif self.legacy_package_manager_name is Constants.TDNF: if cmd.find("--security list updates") > -1: code = 0 @@ -657,6 +679,22 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find('pro security-status --format=json') > -1: code = 0 output = "{\"summary\":{\"ua\":{\"attached\":false}}}" + elif cmd.find('pro status --all --format=json') > -1: + code = 0 + output = ("{\"services\":[" + "{\"available\":\"no\", \"blocked_by\":[],\"description\":\"Management and administration tool for Ubuntu\",\"description_override\": null," + "\"entitled\":\"yes\", \"name\":\"landscape\", \"status\":\"n/a\",\"status_details\":\"\",\"warning\": null}," + "{\"available\":\"yes\",\"blocked_by\":[],\"description\":\"Canonical Livepatch service\",\"description_override\":null," + "\"entitled\":\"yes\",\"name\":\"livepatch\", \"status\":\"disabled\", \"status_details\": \"\",\"warning\":null}]}") + elif cmd.find('canonical-livepatch config cutoff-date') > -1: + code = -1 + output = 'config update aborted: cutoff-date: this configuration option is only available to paid Ubuntu Pro users.' + elif cmd.find('sudo systemctl restart snap.canonical-livepatch.canonical-livepatchd') > -1: + code = -1 + output = 'Failure to restart snap.canonical-livepatch.canonical-livepatchd' + elif cmd.find('sudo canonical-livepatch status --verbose --format json') > -1: + code = -1 + output = "Failure to get status of canonical-livepatch" elif self.legacy_package_manager_name is Constants.YUM: if cmd.find("microcode_ctl") > -1: code = 1 @@ -706,6 +744,9 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif self.legacy_package_manager_name is Constants.APT: code = 100 output = '' + if cmd.find('pro status --all --format=json') > -1: + code = 0 + output = "{\"attached\":true}" elif self.legacy_package_manager_name is Constants.YUM: code = 100 output = 'NetworkManager-config-server.x86_64 1:1.4.0-20.el7_3 \n' + \ @@ -762,6 +803,12 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find("systemctl is-enabled ") > -1: code = 0 output = 'enabled' + if self.legacy_package_manager_name is Constants.APT: + if cmd.find('pro status --all --format=json') > -1: + code = 0 + output = ("{\"services\":[" + "{\"available\":\"yes\", \"blocked_by\":[],\"description\":\"Management and administration tool for Ubuntu\",\"description_override\": null," + "\"entitled\":\"yes\", \"name\":\"landscape\", \"status\":\"active\",\"status_details\":\"\",\"warning\": null}]}") elif self.legacy_test_type == 'ExceptionPath': code = -1 output = '' diff --git a/src/tools/references/canonical-livepatch_status_cmd_expected_output_format.txt b/src/tools/references/canonical-livepatch_status_cmd_expected_output_format.txt new file mode 100644 index 00000000..8a18b8e9 --- /dev/null +++ b/src/tools/references/canonical-livepatch_status_cmd_expected_output_format.txt @@ -0,0 +1,42 @@ +This is a sample of livepatch status output for reference: + { + "Client-Version": "<>", + "Machine-Id": "<>", + "Architecture": "<>", + "CPU-Model": "<>", + "Last-Check": "<>", + "Boot-Time": "<>", + "Uptime": "<>", + "Status": [ + { + "Kernel": "<>", + "Running": true, + "Livepatch": { + "CheckState": "checked", + "State": "<>", // "nothing-to-apply" or "applied" + "Version": "" // "" or a version such as "1.0", + "Fixes": // empty if no livepatches available or a list of CVEs installed + [{ + "Name": "<>", //cve identifier such as CVE-000-0000 + "Description": "<>", // description of the livepatch fix + "Bug": "", + "Patched": // boolean value indicating status + }] + }, + "Supported": "<>", // "supported" or a quick text on what is needed such as "kernel-upgrade-required" + "UpgradeRequiredDate": "<>" // date + }], + "tier": "updates", + "Excluded-LSNs": [], // List of excluded LSNs + "Fixed-CVEs": { + "Timestamp": "", + "Kernel-Package-Fixes": [], // list of all kernel packages fixed + "Installed-Kernels": [], + "Patched-CVEs": [], // list of patched CVEs identifiers + "Digest": "" + }, + "Blocking-Options": [ // List of configs blocking livepatch, if any. For eg: cutoff-date set for livepatch client + "cutoff-date" + ], + "Using-Cutoff-Date": // boolean value indicating whether livepatch client is using cutoff-date config or not + } \ No newline at end of file