diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index 67143f49..f5750a6d 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -127,9 +127,13 @@ class AutoAssessmentStates(EnumBackport): DISABLED = "Disabled" ENABLED = "Enabled" + # To separately preserve assessment + auto-assessment state information + ASSESSMENT_STATE_FILE = "AssessmentState.json" + AUTO_ASSESSMENT_MAXIMUM_DURATION = "PT1H" + MIN_AUTO_ASSESSMENT_INTERVAL = "PT6H" # do not perform auto-assessment if the last assessment happened less than this time interval ago + # wait time after status updates WAIT_TIME_AFTER_HEALTHSTORE_STATUS_UPDATE_IN_SECS = 20 - AUTO_ASSESSMENT_MAXIMUM_DURATION = "PT1H" # Status file states STATUS_TRANSITIONING = "Transitioning" diff --git a/src/core/src/core_logic/PatchAssessor.py b/src/core/src/core_logic/PatchAssessor.py index de6b9d78..1ae8e545 100644 --- a/src/core/src/core_logic/PatchAssessor.py +++ b/src/core/src/core_logic/PatchAssessor.py @@ -15,6 +15,10 @@ # Requires Python 2.7+ """ A patch assessment """ +import datetime +import json +import os +import shutil import time from core.src.bootstrap.Constants import Constants @@ -30,13 +34,20 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ self.status_handler = status_handler self.lifecycle_manager = lifecycle_manager self.package_manager = package_manager + self.assessment_state_file_path = os.path.join(self.execution_config.config_folder, Constants.ASSESSMENT_STATE_FILE) def start_assessment(self): """ Start a patch assessment """ self.status_handler.set_current_operation(Constants.ASSESSMENT) self.raise_if_telemetry_unsupported() + if self.execution_config.exec_auto_assess_only and not self.should_auto_assessment_run(): + self.composite_logger.log("\nAutomatic patch assessment not required at this time.\n") + self.lifecycle_manager.lifecycle_status_check() + return True + self.composite_logger.log('\nStarting patch assessment...') + self.write_assessment_state() # success / failure does not matter, only that an attempt started self.status_handler.set_assessment_substatus_json(status=Constants.STATUS_TRANSITIONING) self.composite_logger.log("\nMachine Id: " + self.env_layer.platform.node()) @@ -63,7 +74,7 @@ def start_assessment(self): break except Exception as error: if i < Constants.MAX_ASSESSMENT_RETRY_COUNT - 1: - error_msg = 'Retryable error retrieving available patches: ' + repr(error) + error_msg = 'Retriable error retrieving available patches: ' + repr(error) self.composite_logger.log_warning(error_msg) self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.DEFAULT_ERROR) time.sleep(2*(i + 1)) @@ -91,3 +102,121 @@ def raise_if_telemetry_unsupported(self): self.composite_logger.log("{0}".format(Constants.TELEMETRY_COMPATIBLE_MSG)) + # region - Auto-assessment extensions + def should_auto_assessment_run(self): + # get last start time + try: + assessment_state = self.read_assessment_state() + last_start_in_seconds_since_epoch = assessment_state['lastStartInSecondsSinceEpoch'] + except Exception as error: + self.composite_logger.log_warning("No valid last start information available for auto-assessment.") + return True + + # get minimum elapsed time required + min_elapsed_seconds_required = self.convert_iso8601_duration_to_total_seconds(Constants.MIN_AUTO_ASSESSMENT_INTERVAL) + + # check if required duration has passed + elapsed_time_in_seconds = self.__get_seconds_since_epoch() - last_start_in_seconds_since_epoch + if elapsed_time_in_seconds < 0: + self.composite_logger.log_warning("Anomaly detected in system time now or during the last assessment run. Assessment will run anyway.") + return True + else: + return elapsed_time_in_seconds >= min_elapsed_seconds_required + + def read_assessment_state(self): + """ Reads the assessment state file. """ + self.composite_logger.log_debug("Reading assessment state...") + if not os.path.exists(self.assessment_state_file_path) or not os.path.isfile(self.assessment_state_file_path): + # Neutralizes directories + if os.path.isdir(self.assessment_state_file_path): + self.composite_logger.log_error("Assessment state file path returned a directory. Attempting to reset.") + shutil.rmtree(self.assessment_state_file_path) + # Writes a vanilla assessment statefile + self.write_assessment_state(first_write=True) + + # Read (with retries for only IO Errors) + for i in range(0, Constants.MAX_FILE_OPERATION_RETRY_COUNT): + try: + with self.env_layer.file_system.open(self.assessment_state_file_path, mode="r") as file_handle: + return json.load(file_handle)['assessmentState'] + except Exception as error: + if i < Constants.MAX_FILE_OPERATION_RETRY_COUNT - 1: + self.composite_logger.log_warning("Exception on assessment state read. [Exception={0}] [RetryCount={1}]".format(repr(error), str(i))) + time.sleep(i + 1) + else: + self.composite_logger.log_error("Unable to read assessment state file (retries exhausted). [Exception={0}]".format(repr(error))) + raise + + def write_assessment_state(self, first_write=False): + """ + AssessmentState.json sample structure: + { + "number": "", + "lastStartInSecondsSinceEpoch": "", + "lastHeartbeat": "", + "processIds": ["", ...], + "autoAssessment": "" + } + """ + self.composite_logger.log_debug("Updating assessment state... ") + + # lastHeartbeat below is redundant, but is present for ease of debuggability + assessment_state = {'number': self.execution_config.sequence_number, + # Set lastStartInSecondsSinceEpoch to 0 if file did not exist before (first write) to ensure it can run assessment when first created + 'lastStartInSecondsSinceEpoch': self.__get_seconds_since_epoch() if not first_write else 0, + 'lastHeartbeat': str(self.env_layer.datetime.timestamp()), + 'processIds': [os.getpid()], + 'autoAssessment': str(self.execution_config.exec_auto_assess_only)} + assessment_state_payload = json.dumps({"assessmentState": assessment_state}) + + if os.path.isdir(self.assessment_state_file_path): + self.composite_logger.log_error("Assessment state file path returned a directory. Attempting to reset.") + shutil.rmtree(self.assessment_state_file_path) + + for i in range(0, Constants.MAX_FILE_OPERATION_RETRY_COUNT): + try: + with self.env_layer.file_system.open(self.assessment_state_file_path, 'w+') as file_handle: + file_handle.write(assessment_state_payload) + break + except Exception as error: + if i < Constants.MAX_FILE_OPERATION_RETRY_COUNT - 1: + self.composite_logger.log_warning("Exception on assessment state update. [Exception={0}] [RetryCount={1}]".format(repr(error), str(i))) + time.sleep(i + 1) + else: + self.composite_logger.log_error("Unable to write to assessment state file (retries exhausted). [Exception={0}]".format(repr(error))) + raise + + self.composite_logger.log_debug("Completed updating assessment state.") + + @staticmethod + def __get_seconds_since_epoch(): + return int((datetime.datetime.now() - datetime.datetime(1970, 1, 1)).total_seconds()) + + def convert_iso8601_duration_to_total_seconds(self, duration): + """ + No non-default period (Y,M,W,D) is supported. Time is supported (H,M,S). + """ + remaining = str(duration) + if 'PT' not in remaining: + raise Exception("Unexpected duration format. [Duration={0}]".format(duration)) + + discard, remaining = self.__extract_most_significant_unit_from_duration(remaining, 'PT') + hours, remaining = self.__extract_most_significant_unit_from_duration(remaining, 'H') + minutes, remaining = self.__extract_most_significant_unit_from_duration(remaining, 'M') + seconds, remaining = self.__extract_most_significant_unit_from_duration(remaining, 'S') + + return datetime.timedelta(hours=int(hours), minutes=int(minutes), seconds=int(seconds)).total_seconds() + + @staticmethod + def __extract_most_significant_unit_from_duration(duration_portion, unit_delimiter): + """ Internal helper function """ + duration_split = duration_portion.split(unit_delimiter) + most_significant_unit = 0 + remaining_duration_portion = '' + if len(duration_split) == 2: # found and extracted + most_significant_unit = duration_split[0] + remaining_duration_portion = duration_split[1] + elif len(duration_split) == 1: # not found + remaining_duration_portion = duration_split[0] + + return most_significant_unit, remaining_duration_portion diff --git a/src/core/tests/Test_PatchAssessor.py b/src/core/tests/Test_PatchAssessor.py index 843035d9..9b2daa94 100644 --- a/src/core/tests/Test_PatchAssessor.py +++ b/src/core/tests/Test_PatchAssessor.py @@ -13,10 +13,12 @@ # limitations under the License. # # Requires Python 2.7+ - +import datetime import json +import os import unittest +from core.src.bootstrap.Constants import Constants from core.src.service_interfaces.TelemetryWriter import TelemetryWriter from core.tests.library.ArgumentComposer import ArgumentComposer from core.tests.library.RuntimeCompositor import RuntimeCompositor @@ -66,6 +68,88 @@ def test_assessment_telemetry_fail(self): self.assertRaises(Exception, self.runtime.patch_assessor.start_assessment) self.runtime.patch_assessor.telemetry_writer = backup_telemetry_writer + def test_assessment_state_file(self): + # read_assessment_state creates a vanilla assessment state file if none exists + assessment_state = self.runtime.patch_assessor.read_assessment_state() + with open(self.runtime.patch_assessor.assessment_state_file_path, 'r') as file_handle: + assessment_state_from_file = json.loads(file_handle.read())["assessmentState"] + self.assessment_state_equals(assessment_state, assessment_state_from_file) + + # write and test again + self.runtime.patch_assessor.write_assessment_state() + assessment_state = self.runtime.patch_assessor.read_assessment_state() + with open(self.runtime.patch_assessor.assessment_state_file_path, 'r') as file_handle: + assessment_state_from_file = json.loads(file_handle.read())["assessmentState"] + self.assessment_state_equals(assessment_state, assessment_state_from_file) + + # Assessment state file is a directory + if os.path.exists(self.runtime.patch_assessor.assessment_state_file_path): + os.remove(self.runtime.patch_assessor.assessment_state_file_path) + + # Attempt to read when it does not exist - should create default assessment state file + os.mkdir(self.runtime.patch_assessor.assessment_state_file_path) + self.assertTrue(self.runtime.patch_assessor.read_assessment_state() is not None) + + if os.path.exists(self.runtime.patch_assessor.assessment_state_file_path): + os.remove(self.runtime.patch_assessor.assessment_state_file_path) + + os.mkdir(self.runtime.patch_assessor.assessment_state_file_path) + # Attempt to write when it does not exist - should also create default assessment state file + self.runtime.patch_assessor.write_assessment_state() + self.assertTrue(self.runtime.patch_assessor.read_assessment_state() is not None) + + # Opening file throws exception + backup_open = self.runtime.patch_assessor.env_layer.file_system.open + self.runtime.patch_assessor.env_layer.file_system.open = lambda: self.raise_ex() + self.assertRaises(Exception, self.runtime.patch_assessor.read_assessment_state) + self.assertRaises(Exception, self.runtime.patch_assessor.write_assessment_state) + self.runtime.patch_assessor.env_layer.file_system.open = backup_open + + def assessment_state_equals(self, state1, state2): + self.assertEqual(state1["processIds"][0], state2["processIds"][0]) + self.assertEqual(state1["lastHeartbeat"], state2["lastHeartbeat"]) + self.assertEqual(state1["number"], state2["number"]) + self.assertEqual(state1["autoAssessment"], state2["autoAssessment"]) + self.assertEqual(state1["lastStartInSecondsSinceEpoch"], state2["lastStartInSecondsSinceEpoch"]) + + def test_should_auto_assessment_run(self): + # First file write (since it does not exist on read) so it should succeed since last assessment time is 0 + self.runtime.patch_assessor.read_assessment_state() + self.assertTrue(self.runtime.patch_assessor.should_auto_assessment_run()) + + # Second file write, should fail now since minimum delay between assessments hasn't been met + self.runtime.patch_assessor.write_assessment_state() + self.assertFalse(self.runtime.patch_assessor.should_auto_assessment_run()) + + # It has been minimum delay time since last run + assessment_state = self.runtime.patch_assessor.read_assessment_state() + min_auto_assess_interval_in_seconds = self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds(Constants.MIN_AUTO_ASSESSMENT_INTERVAL) + assessment_state["lastStartInSecondsSinceEpoch"] -= min_auto_assess_interval_in_seconds + with open(self.runtime.patch_assessor.assessment_state_file_path, 'w+') as file_handle: + file_handle.write(json.dumps({"assessmentState": assessment_state})) + self.assertTrue(self.runtime.patch_assessor.should_auto_assessment_run()) + + # Time is in future, so run assessment and correct anomaly + self.runtime.patch_assessor.write_assessment_state() + assessment_state["lastStartInSecondsSinceEpoch"] += 5000000 + with open(self.runtime.patch_assessor.assessment_state_file_path, 'w+') as file_handle: + file_handle.write(json.dumps({"assessmentState": assessment_state})) + self.assertTrue(self.runtime.patch_assessor.should_auto_assessment_run()) + + # Test exception case: exception is caught and assessment should run + self.runtime.patch_assessor.read_assessment_state = lambda: self.raise_ex() + self.assertTrue(self.runtime.patch_assessor.should_auto_assessment_run()) + + def test_convert_iso8601_duration_to_total_seconds(self): + self.assertEqual(self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('PT6H'), 21600) + self.assertEqual(self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('PT6H5M'), 21900) + self.assertEqual(self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('PT6H5M14S'), 21914) + self.assertRaises(Exception, lambda: self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('6H5M14S')) + self.assertRaises(Exception, lambda: self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('')) + + def raise_ex(self): + raise Exception() + def mock_refresh_repo(self): pass