Skip to content
6 changes: 5 additions & 1 deletion src/core/src/bootstrap/Constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ class AutoAssessmentStates(EnumBackport):
DISABLED = "Disabled"
ENABLED = "Enabled"

# To separately preserve assessment + auto-assessment state information
ASSESSMENT_STATE_FILE = "AssessmentState.json"
AUTO_ASSESSMENT_MAXIMUM_DURATION = "PT1H"
MIN_AUTO_ASSESSMENT_INTERVAL = "PT6H" # do not perform auto-assessment if the last assessment happened less than this time interval ago

# wait time after status updates
WAIT_TIME_AFTER_HEALTHSTORE_STATUS_UPDATE_IN_SECS = 20
AUTO_ASSESSMENT_MAXIMUM_DURATION = "PT1H"

# Status file states
STATUS_TRANSITIONING = "Transitioning"
Expand Down
131 changes: 130 additions & 1 deletion src/core/src/core_logic/PatchAssessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
# Requires Python 2.7+

""" A patch assessment """
import datetime
import json
import os
import shutil
import time
from core.src.bootstrap.Constants import Constants

Expand All @@ -30,13 +34,20 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ
self.status_handler = status_handler
self.lifecycle_manager = lifecycle_manager
self.package_manager = package_manager
self.assessment_state_file_path = os.path.join(self.execution_config.config_folder, Constants.ASSESSMENT_STATE_FILE)

def start_assessment(self):
""" Start a patch assessment """
self.status_handler.set_current_operation(Constants.ASSESSMENT)
self.raise_if_telemetry_unsupported()

if self.execution_config.exec_auto_assess_only and not self.should_auto_assessment_run():
self.composite_logger.log("\nAutomatic patch assessment not required at this time.\n")
self.lifecycle_manager.lifecycle_status_check()
return True

self.composite_logger.log('\nStarting patch assessment...')
self.write_assessment_state() # success / failure does not matter, only that an attempt started

self.status_handler.set_assessment_substatus_json(status=Constants.STATUS_TRANSITIONING)
self.composite_logger.log("\nMachine Id: " + self.env_layer.platform.node())
Expand All @@ -63,7 +74,7 @@ def start_assessment(self):
break
except Exception as error:
if i < Constants.MAX_ASSESSMENT_RETRY_COUNT - 1:
error_msg = 'Retryable error retrieving available patches: ' + repr(error)
error_msg = 'Retriable error retrieving available patches: ' + repr(error)
self.composite_logger.log_warning(error_msg)
self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.DEFAULT_ERROR)
time.sleep(2*(i + 1))
Expand Down Expand Up @@ -91,3 +102,121 @@ def raise_if_telemetry_unsupported(self):

self.composite_logger.log("{0}".format(Constants.TELEMETRY_COMPATIBLE_MSG))

# region - Auto-assessment extensions
def should_auto_assessment_run(self):
# get last start time
try:
assessment_state = self.read_assessment_state()
last_start_in_seconds_since_epoch = assessment_state['lastStartInSecondsSinceEpoch']
except Exception as error:
self.composite_logger.log_warning("No valid last start information available for auto-assessment.")
return True

# get minimum elapsed time required
min_elapsed_seconds_required = self.convert_iso8601_duration_to_total_seconds(Constants.MIN_AUTO_ASSESSMENT_INTERVAL)

# check if required duration has passed
elapsed_time_in_seconds = self.__get_seconds_since_epoch() - last_start_in_seconds_since_epoch
if elapsed_time_in_seconds < 0:
self.composite_logger.log_warning("Anomaly detected in system time now or during the last assessment run. Assessment will run anyway.")
return True
else:
return elapsed_time_in_seconds >= min_elapsed_seconds_required

def read_assessment_state(self):
""" Reads the assessment state file. """
self.composite_logger.log_debug("Reading assessment state...")
if not os.path.exists(self.assessment_state_file_path) or not os.path.isfile(self.assessment_state_file_path):
# Neutralizes directories
if os.path.isdir(self.assessment_state_file_path):
self.composite_logger.log_error("Assessment state file path returned a directory. Attempting to reset.")
shutil.rmtree(self.assessment_state_file_path)
# Writes a vanilla assessment statefile
self.write_assessment_state(first_write=True)

# Read (with retries for only IO Errors)
for i in range(0, Constants.MAX_FILE_OPERATION_RETRY_COUNT):
try:
with self.env_layer.file_system.open(self.assessment_state_file_path, mode="r") as file_handle:
return json.load(file_handle)['assessmentState']
except Exception as error:
if i < Constants.MAX_FILE_OPERATION_RETRY_COUNT - 1:
self.composite_logger.log_warning("Exception on assessment state read. [Exception={0}] [RetryCount={1}]".format(repr(error), str(i)))
time.sleep(i + 1)
else:
self.composite_logger.log_error("Unable to read assessment state file (retries exhausted). [Exception={0}]".format(repr(error)))
raise

def write_assessment_state(self, first_write=False):
"""
AssessmentState.json sample structure:
{
"number": "<sequence number>",
"lastStartInSecondsSinceEpoch": "<number>",
"lastHeartbeat": "<timestamp>",
"processIds": ["", ...],
"autoAssessment": "<true/false>"
}
"""
self.composite_logger.log_debug("Updating assessment state... ")

# lastHeartbeat below is redundant, but is present for ease of debuggability
assessment_state = {'number': self.execution_config.sequence_number,
# Set lastStartInSecondsSinceEpoch to 0 if file did not exist before (first write) to ensure it can run assessment when first created
'lastStartInSecondsSinceEpoch': self.__get_seconds_since_epoch() if not first_write else 0,
'lastHeartbeat': str(self.env_layer.datetime.timestamp()),
'processIds': [os.getpid()],
'autoAssessment': str(self.execution_config.exec_auto_assess_only)}
assessment_state_payload = json.dumps({"assessmentState": assessment_state})

if os.path.isdir(self.assessment_state_file_path):
self.composite_logger.log_error("Assessment state file path returned a directory. Attempting to reset.")
shutil.rmtree(self.assessment_state_file_path)

for i in range(0, Constants.MAX_FILE_OPERATION_RETRY_COUNT):
try:
with self.env_layer.file_system.open(self.assessment_state_file_path, 'w+') as file_handle:
file_handle.write(assessment_state_payload)
break
except Exception as error:
if i < Constants.MAX_FILE_OPERATION_RETRY_COUNT - 1:
self.composite_logger.log_warning("Exception on assessment state update. [Exception={0}] [RetryCount={1}]".format(repr(error), str(i)))
time.sleep(i + 1)
else:
self.composite_logger.log_error("Unable to write to assessment state file (retries exhausted). [Exception={0}]".format(repr(error)))
raise

self.composite_logger.log_debug("Completed updating assessment state.")

@staticmethod
def __get_seconds_since_epoch():
return int((datetime.datetime.now() - datetime.datetime(1970, 1, 1)).total_seconds())

def convert_iso8601_duration_to_total_seconds(self, duration):
"""
No non-default period (Y,M,W,D) is supported. Time is supported (H,M,S).
"""
remaining = str(duration)
if 'PT' not in remaining:
raise Exception("Unexpected duration format. [Duration={0}]".format(duration))

discard, remaining = self.__extract_most_significant_unit_from_duration(remaining, 'PT')
hours, remaining = self.__extract_most_significant_unit_from_duration(remaining, 'H')
minutes, remaining = self.__extract_most_significant_unit_from_duration(remaining, 'M')
seconds, remaining = self.__extract_most_significant_unit_from_duration(remaining, 'S')

return datetime.timedelta(hours=int(hours), minutes=int(minutes), seconds=int(seconds)).total_seconds()

@staticmethod
def __extract_most_significant_unit_from_duration(duration_portion, unit_delimiter):
""" Internal helper function """
duration_split = duration_portion.split(unit_delimiter)
most_significant_unit = 0
remaining_duration_portion = ''
if len(duration_split) == 2: # found and extracted
most_significant_unit = duration_split[0]
remaining_duration_portion = duration_split[1]
elif len(duration_split) == 1: # not found
remaining_duration_portion = duration_split[0]

return most_significant_unit, remaining_duration_portion
86 changes: 85 additions & 1 deletion src/core/tests/Test_PatchAssessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
# limitations under the License.
#
# Requires Python 2.7+

import datetime
import json
import os
import unittest

from core.src.bootstrap.Constants import Constants
from core.src.service_interfaces.TelemetryWriter import TelemetryWriter
from core.tests.library.ArgumentComposer import ArgumentComposer
from core.tests.library.RuntimeCompositor import RuntimeCompositor
Expand Down Expand Up @@ -66,6 +68,88 @@ def test_assessment_telemetry_fail(self):
self.assertRaises(Exception, self.runtime.patch_assessor.start_assessment)
self.runtime.patch_assessor.telemetry_writer = backup_telemetry_writer

def test_assessment_state_file(self):
# read_assessment_state creates a vanilla assessment state file if none exists
assessment_state = self.runtime.patch_assessor.read_assessment_state()
with open(self.runtime.patch_assessor.assessment_state_file_path, 'r') as file_handle:
assessment_state_from_file = json.loads(file_handle.read())["assessmentState"]
self.assessment_state_equals(assessment_state, assessment_state_from_file)

# write and test again
self.runtime.patch_assessor.write_assessment_state()
assessment_state = self.runtime.patch_assessor.read_assessment_state()
with open(self.runtime.patch_assessor.assessment_state_file_path, 'r') as file_handle:
assessment_state_from_file = json.loads(file_handle.read())["assessmentState"]
self.assessment_state_equals(assessment_state, assessment_state_from_file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to validate write_assessment_state(), updated an existing AssessmentState.json will be to compare the lastHeartbeat before and after write_assessment_state(), i.e:

  • Read AssessmentState before line 79 i.e. before write_assessment_state()
  • Call write_assessment_state()
  • Read the file
  • compare lastHeartbeat between step 1 and 3, should not be the same

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ben is aware of what to do here. Ack


# Assessment state file is a directory
if os.path.exists(self.runtime.patch_assessor.assessment_state_file_path):
os.remove(self.runtime.patch_assessor.assessment_state_file_path)

# Attempt to read when it does not exist - should create default assessment state file
os.mkdir(self.runtime.patch_assessor.assessment_state_file_path)
self.assertTrue(self.runtime.patch_assessor.read_assessment_state() is not None)

if os.path.exists(self.runtime.patch_assessor.assessment_state_file_path):
os.remove(self.runtime.patch_assessor.assessment_state_file_path)

os.mkdir(self.runtime.patch_assessor.assessment_state_file_path)
# Attempt to write when it does not exist - should also create default assessment state file
self.runtime.patch_assessor.write_assessment_state()
self.assertTrue(self.runtime.patch_assessor.read_assessment_state() is not None)

# Opening file throws exception
backup_open = self.runtime.patch_assessor.env_layer.file_system.open
self.runtime.patch_assessor.env_layer.file_system.open = lambda: self.raise_ex()
self.assertRaises(Exception, self.runtime.patch_assessor.read_assessment_state)
self.assertRaises(Exception, self.runtime.patch_assessor.write_assessment_state)
self.runtime.patch_assessor.env_layer.file_system.open = backup_open

def assessment_state_equals(self, state1, state2):
self.assertEqual(state1["processIds"][0], state2["processIds"][0])
self.assertEqual(state1["lastHeartbeat"], state2["lastHeartbeat"])
self.assertEqual(state1["number"], state2["number"])
self.assertEqual(state1["autoAssessment"], state2["autoAssessment"])
self.assertEqual(state1["lastStartInSecondsSinceEpoch"], state2["lastStartInSecondsSinceEpoch"])

def test_should_auto_assessment_run(self):
# First file write (since it does not exist on read) so it should succeed since last assessment time is 0
self.runtime.patch_assessor.read_assessment_state()
self.assertTrue(self.runtime.patch_assessor.should_auto_assessment_run())

# Second file write, should fail now since minimum delay between assessments hasn't been met
self.runtime.patch_assessor.write_assessment_state()
self.assertFalse(self.runtime.patch_assessor.should_auto_assessment_run())

# It has been minimum delay time since last run
assessment_state = self.runtime.patch_assessor.read_assessment_state()
min_auto_assess_interval_in_seconds = self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds(Constants.MIN_AUTO_ASSESSMENT_INTERVAL)
assessment_state["lastStartInSecondsSinceEpoch"] -= min_auto_assess_interval_in_seconds
with open(self.runtime.patch_assessor.assessment_state_file_path, 'w+') as file_handle:
file_handle.write(json.dumps({"assessmentState": assessment_state}))
self.assertTrue(self.runtime.patch_assessor.should_auto_assessment_run())

# Time is in future, so run assessment and correct anomaly
self.runtime.patch_assessor.write_assessment_state()
assessment_state["lastStartInSecondsSinceEpoch"] += 5000000
with open(self.runtime.patch_assessor.assessment_state_file_path, 'w+') as file_handle:
file_handle.write(json.dumps({"assessmentState": assessment_state}))
self.assertTrue(self.runtime.patch_assessor.should_auto_assessment_run())

# Test exception case: exception is caught and assessment should run
self.runtime.patch_assessor.read_assessment_state = lambda: self.raise_ex()
self.assertTrue(self.runtime.patch_assessor.should_auto_assessment_run())

def test_convert_iso8601_duration_to_total_seconds(self):
self.assertEqual(self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('PT6H'), 21600)
self.assertEqual(self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('PT6H5M'), 21900)
self.assertEqual(self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('PT6H5M14S'), 21914)
self.assertRaises(Exception, lambda: self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds('6H5M14S'))
self.assertRaises(Exception, lambda: self.runtime.patch_assessor.convert_iso8601_duration_to_total_seconds(''))

def raise_ex(self):
raise Exception()

def mock_refresh_repo(self):
pass

Expand Down