diff --git a/jobwatcher/jobwatcher.py b/jobwatcher/jobwatcher.py index 2144250b2..39f10c812 100644 --- a/jobwatcher/jobwatcher.py +++ b/jobwatcher/jobwatcher.py @@ -21,8 +21,12 @@ from configparser import ConfigParser from retrying import retry +from common.time_utils import seconds from common.utils import get_asg_name, get_asg_settings, get_compute_instance_type, get_instance_properties, load_module +LOOP_TIME = 60 +UPDATE_INSTANCE_PROPERTIES_INTERVAL = 180 + log = logging.getLogger(__name__) @@ -76,14 +80,20 @@ def _poll_scheduler_status(config, asg_name, scheduler_module): :param scheduler_module: scheduler module """ instance_type = None + instance_properties = None + update_instance_properties_timer = 0 while True: # Get instance properties - new_instance_type = get_compute_instance_type( - config.region, config.proxy_config, config.stack_name, fallback=instance_type - ) - if new_instance_type != instance_type: - instance_type = new_instance_type - instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type) + if not instance_properties or update_instance_properties_timer >= UPDATE_INSTANCE_PROPERTIES_INTERVAL: + logging.info("Refreshing compute instance properties") + update_instance_properties_timer = 0 + new_instance_type = get_compute_instance_type( + config.region, config.proxy_config, config.stack_name, fallback=instance_type + ) + if new_instance_type != instance_type: + instance_type = new_instance_type + instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type) + update_instance_properties_timer += LOOP_TIME # get current limits _, current_desired, max_size = get_asg_settings(config.region, config.proxy_config, asg_name) @@ -123,10 +133,10 @@ def _poll_scheduler_status(config, asg_name, scheduler_module): asg_client = boto3.client("autoscaling", region_name=config.region, config=config.proxy_config) asg_client.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=requested) - time.sleep(60) + time.sleep(LOOP_TIME) -@retry(wait_fixed=60000) +@retry(wait_fixed=seconds(LOOP_TIME)) def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s") log.info("jobwatcher startup") diff --git a/nodewatcher/nodewatcher.py b/nodewatcher/nodewatcher.py index d01f0e651..6b7c54ba6 100755 --- a/nodewatcher/nodewatcher.py +++ b/nodewatcher/nodewatcher.py @@ -29,14 +29,7 @@ from retrying import RetryError, retry from common.time_utils import minutes, seconds -from common.utils import ( - CriticalError, - get_asg_name, - get_asg_settings, - get_compute_instance_type, - get_instance_properties, - load_module, -) +from common.utils import CriticalError, get_asg_name, get_asg_settings, get_instance_properties, load_module log = logging.getLogger(__name__) @@ -281,7 +274,7 @@ def _init_idletime(): return idletime -def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id): +def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id, instance_type): """ Verify instance/scheduler status and self-terminate the instance. @@ -291,12 +284,12 @@ def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance :param asg_name: ASG name :param hostname: current hostname :param instance_id: current instance id + :param instance_type: current instance type """ _wait_for_stack_ready(config.stack_name, config.region, config.proxy_config) _terminate_if_down(scheduler_module, config, asg_name, instance_id, INITIAL_TERMINATE_TIMEOUT) idletime = _init_idletime() - instance_type = get_compute_instance_type(config.region, config.proxy_config, config.stack_name) instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type) while True: time.sleep(60) @@ -356,10 +349,11 @@ def main(): instance_id = _get_metadata("instance-id") hostname = _get_metadata("local-hostname") - log.info("Instance id is %s, hostname is %s", instance_id, hostname) + instance_type = _get_metadata("instance-type") + log.info("Instance id is %s, hostname is %s, instance type is %s", instance_id, hostname, instance_type) asg_name = get_asg_name(config.stack_name, config.region, config.proxy_config) - _poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id) + _poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id, instance_type) except Exception as e: log.critical("An unexpected error occurred: %s", e) raise diff --git a/sqswatcher/sqswatcher.py b/sqswatcher/sqswatcher.py index e66654e2f..af8f037a2 100755 --- a/sqswatcher/sqswatcher.py +++ b/sqswatcher/sqswatcher.py @@ -26,6 +26,7 @@ from configparser import ConfigParser from retrying import retry +from common.time_utils import seconds from common.utils import ( CriticalError, get_asg_name, @@ -35,6 +36,9 @@ load_module, ) +LOOP_TIME = 30 +CLUSTER_PROPERTIES_REFRESH_INTERVAL = 180 + class QueryConfigError(Exception): pass @@ -336,17 +340,27 @@ def _poll_queue(sqs_config, queue, table, asg_name): max_cluster_size = None instance_type = None + cluster_properties_refresh_timer = 0 while True: + force_cluster_update = False # dynamically retrieve max_cluster_size and compute_instance_type - new_max_cluster_size = _retrieve_max_cluster_size(sqs_config, asg_name, fallback=max_cluster_size) - new_instance_type = get_compute_instance_type( - sqs_config.region, sqs_config.proxy_config, sqs_config.stack_name, fallback=instance_type - ) - force_cluster_update = new_max_cluster_size != max_cluster_size or new_instance_type != instance_type - if new_instance_type != instance_type: - instance_type = new_instance_type - instance_properties = get_instance_properties(sqs_config.region, sqs_config.proxy_config, instance_type) - max_cluster_size = new_max_cluster_size + if ( + not max_cluster_size + or not instance_type + or cluster_properties_refresh_timer >= CLUSTER_PROPERTIES_REFRESH_INTERVAL + ): + cluster_properties_refresh_timer = 0 + logging.info("Refreshing cluster properties") + new_max_cluster_size = _retrieve_max_cluster_size(sqs_config, asg_name, fallback=max_cluster_size) + new_instance_type = get_compute_instance_type( + sqs_config.region, sqs_config.proxy_config, sqs_config.stack_name, fallback=instance_type + ) + force_cluster_update = new_max_cluster_size != max_cluster_size or new_instance_type != instance_type + if new_instance_type != instance_type: + instance_type = new_instance_type + instance_properties = get_instance_properties(sqs_config.region, sqs_config.proxy_config, instance_type) + max_cluster_size = new_max_cluster_size + cluster_properties_refresh_timer += LOOP_TIME messages = _retrieve_all_sqs_messages(queue) update_events = _parse_sqs_messages(messages, table) @@ -360,10 +374,10 @@ def _poll_queue(sqs_config, queue, table, asg_name): instance_properties, force_cluster_update, ) - time.sleep(30) + time.sleep(LOOP_TIME) -@retry(wait_fixed=30000) +@retry(wait_fixed=seconds(LOOP_TIME)) def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s") log.info("sqswatcher startup")