Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions jobwatcher/jobwatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
from configparser import ConfigParser
from retrying import retry

from common.time_utils import seconds
from common.utils import get_asg_name, get_asg_settings, get_compute_instance_type, get_instance_properties, load_module

LOOP_TIME = 60
UPDATE_INSTANCE_PROPERTIES_INTERVAL = 180

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -76,14 +80,20 @@ def _poll_scheduler_status(config, asg_name, scheduler_module):
:param scheduler_module: scheduler module
"""
instance_type = None
instance_properties = None
update_instance_properties_timer = 0
while True:
# Get instance properties
new_instance_type = get_compute_instance_type(
config.region, config.proxy_config, config.stack_name, fallback=instance_type
)
if new_instance_type != instance_type:
instance_type = new_instance_type
instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type)
if not instance_properties or update_instance_properties_timer >= UPDATE_INSTANCE_PROPERTIES_INTERVAL:
logging.info("Refreshing compute instance properties")
update_instance_properties_timer = 0
new_instance_type = get_compute_instance_type(
config.region, config.proxy_config, config.stack_name, fallback=instance_type
)
if new_instance_type != instance_type:
instance_type = new_instance_type
instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type)
update_instance_properties_timer += LOOP_TIME

# get current limits
_, current_desired, max_size = get_asg_settings(config.region, config.proxy_config, asg_name)
Expand Down Expand Up @@ -123,10 +133,10 @@ def _poll_scheduler_status(config, asg_name, scheduler_module):
asg_client = boto3.client("autoscaling", region_name=config.region, config=config.proxy_config)
asg_client.update_auto_scaling_group(AutoScalingGroupName=asg_name, DesiredCapacity=requested)

time.sleep(60)
time.sleep(LOOP_TIME)


@retry(wait_fixed=60000)
@retry(wait_fixed=seconds(LOOP_TIME))
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s")
log.info("jobwatcher startup")
Expand Down
18 changes: 6 additions & 12 deletions nodewatcher/nodewatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@
from retrying import RetryError, retry

from common.time_utils import minutes, seconds
from common.utils import (
CriticalError,
get_asg_name,
get_asg_settings,
get_compute_instance_type,
get_instance_properties,
load_module,
)
from common.utils import CriticalError, get_asg_name, get_asg_settings, get_instance_properties, load_module

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -281,7 +274,7 @@ def _init_idletime():
return idletime


def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id):
def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id, instance_type):
"""
Verify instance/scheduler status and self-terminate the instance.

Expand All @@ -291,12 +284,12 @@ def _poll_instance_status(config, scheduler_module, asg_name, hostname, instance
:param asg_name: ASG name
:param hostname: current hostname
:param instance_id: current instance id
:param instance_type: current instance type
"""
_wait_for_stack_ready(config.stack_name, config.region, config.proxy_config)
_terminate_if_down(scheduler_module, config, asg_name, instance_id, INITIAL_TERMINATE_TIMEOUT)

idletime = _init_idletime()
instance_type = get_compute_instance_type(config.region, config.proxy_config, config.stack_name)
instance_properties = get_instance_properties(config.region, config.proxy_config, instance_type)
while True:
time.sleep(60)
Expand Down Expand Up @@ -356,10 +349,11 @@ def main():

instance_id = _get_metadata("instance-id")
hostname = _get_metadata("local-hostname")
log.info("Instance id is %s, hostname is %s", instance_id, hostname)
instance_type = _get_metadata("instance-type")
log.info("Instance id is %s, hostname is %s, instance type is %s", instance_id, hostname, instance_type)
asg_name = get_asg_name(config.stack_name, config.region, config.proxy_config)

_poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id)
_poll_instance_status(config, scheduler_module, asg_name, hostname, instance_id, instance_type)
except Exception as e:
log.critical("An unexpected error occurred: %s", e)
raise
Expand Down
36 changes: 25 additions & 11 deletions sqswatcher/sqswatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from configparser import ConfigParser
from retrying import retry

from common.time_utils import seconds
from common.utils import (
CriticalError,
get_asg_name,
Expand All @@ -35,6 +36,9 @@
load_module,
)

LOOP_TIME = 30
CLUSTER_PROPERTIES_REFRESH_INTERVAL = 180


class QueryConfigError(Exception):
pass
Expand Down Expand Up @@ -336,17 +340,27 @@ def _poll_queue(sqs_config, queue, table, asg_name):

max_cluster_size = None
instance_type = None
cluster_properties_refresh_timer = 0
while True:
force_cluster_update = False
# dynamically retrieve max_cluster_size and compute_instance_type
new_max_cluster_size = _retrieve_max_cluster_size(sqs_config, asg_name, fallback=max_cluster_size)
new_instance_type = get_compute_instance_type(
sqs_config.region, sqs_config.proxy_config, sqs_config.stack_name, fallback=instance_type
)
force_cluster_update = new_max_cluster_size != max_cluster_size or new_instance_type != instance_type
if new_instance_type != instance_type:
instance_type = new_instance_type
instance_properties = get_instance_properties(sqs_config.region, sqs_config.proxy_config, instance_type)
max_cluster_size = new_max_cluster_size
if (
not max_cluster_size
or not instance_type
or cluster_properties_refresh_timer >= CLUSTER_PROPERTIES_REFRESH_INTERVAL
):
cluster_properties_refresh_timer = 0
logging.info("Refreshing cluster properties")
new_max_cluster_size = _retrieve_max_cluster_size(sqs_config, asg_name, fallback=max_cluster_size)
new_instance_type = get_compute_instance_type(
sqs_config.region, sqs_config.proxy_config, sqs_config.stack_name, fallback=instance_type
)
force_cluster_update = new_max_cluster_size != max_cluster_size or new_instance_type != instance_type
if new_instance_type != instance_type:
instance_type = new_instance_type
instance_properties = get_instance_properties(sqs_config.region, sqs_config.proxy_config, instance_type)
max_cluster_size = new_max_cluster_size
cluster_properties_refresh_timer += LOOP_TIME

messages = _retrieve_all_sqs_messages(queue)
update_events = _parse_sqs_messages(messages, table)
Expand All @@ -360,10 +374,10 @@ def _poll_queue(sqs_config, queue, table, asg_name):
instance_properties,
force_cluster_update,
)
time.sleep(30)
time.sleep(LOOP_TIME)


@retry(wait_fixed=30000)
@retry(wait_fixed=seconds(LOOP_TIME))
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s [%(module)s:%(funcName)s] %(message)s")
log.info("sqswatcher startup")
Expand Down