Skip to content

Commit 13eb8e4

Browse files
Rexrexcsn
authored andcommitted
Handling EC2 Health Scheduled Events
* Modified sqswatcher to retrieve and process messages from health queue. Lock instance if there is an EC2 health scheduled event, so that instance can be replaced by nodewatcher ASAP * Modified nodewatcher to consider nodes that are locked and have no job as down nodes * Modified unit tests to adapt to above modifications * This feature can be turned off by specifying `disable_health_check = True` in `/etc/sqswatcher.cfg` and restarting sqswatcher Signed-off-by: Rex <shuningc@amazon.com>
1 parent 0df444f commit 13eb8e4

File tree

17 files changed

+725
-106
lines changed

17 files changed

+725
-106
lines changed

src/common/schedulers/slurm_commands.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212

1313
import logging
1414
import math
15+
import subprocess
1516
from textwrap import wrap
1617

1718
from common.schedulers.converters import ComparableObject, from_table_to_obj_list
18-
from common.utils import check_command_output
19+
from common.utils import check_command_output, run_command
1920

21+
SLURM_NODE_ERROR_STATES = ["down", "drained", "fail"]
22+
SLURM_NODE_DISABLED_STATES = ["draining", "drained"]
2023
PENDING_RESOURCES_REASONS = [
2124
"Resources",
2225
"Nodes required for job are DOWN, DRAINED or reserved for jobs in higher priority partitions",
@@ -59,6 +62,19 @@ def get_jobs_info(job_state_filter=None):
5962
return SlurmJob.from_table(output)
6063

6164

65+
def get_node_state(hostname):
66+
# retrieves the state of a specific node
67+
# https://slurm.schedmd.com/sinfo.html#lbAG
68+
# Output format:
69+
# down*
70+
try:
71+
command = "/bin/bash -c \"/opt/slurm/bin/sinfo --noheader -o '%T' -n {}\"".format(hostname)
72+
output = check_command_output(command).strip()
73+
return output
74+
except Exception as e:
75+
logging.error("Failed when checking if node {} state with exception {}.".format(hostname, e))
76+
77+
6278
def get_pending_jobs_info(
6379
instance_properties=None, max_nodes_filter=None, filter_by_pending_reasons=None, log_pending_jobs=True
6480
):
@@ -269,6 +285,33 @@ def job_runnable_on_given_node(job_resources_per_node, resources_available, exis
269285
return True
270286

271287

288+
def lock_node(hostname, unlock=False, note=None):
289+
# hostname format: ip-10-0-0-114.eu-west-1.compute.internal
290+
hostname = hostname.split(".")[0]
291+
if unlock:
292+
logging.info("Unlocking host %s", hostname)
293+
command = [
294+
"/opt/slurm/bin/scontrol",
295+
"update",
296+
"NodeName={0}".format(hostname),
297+
"State=RESUME",
298+
"Reason={}".format(note if note else '"Unlocking"'),
299+
]
300+
else:
301+
logging.info("Locking host %s", hostname)
302+
command = [
303+
"/opt/slurm/bin/scontrol",
304+
"update",
305+
"NodeName={0}".format(hostname),
306+
"State=DRAIN",
307+
"Reason={}".format(note if note else '"Shutting down"'),
308+
]
309+
try:
310+
run_command(command)
311+
except subprocess.CalledProcessError:
312+
logging.error("Error %s host %s", "unlocking" if unlock else "locking", hostname)
313+
314+
272315
class SlurmJob(ComparableObject):
273316
# This is the format after being processed by reformat_table function
274317
# JOBID|ST|NODES|CPUS|TASKS|CPUS_PER_TASK|MIN_CPUS|REASON|TRES_PER_JOB|TRES_PER_TASK

src/common/schedulers/torque_commands.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
from common.schedulers.converters import ComparableObject, from_xml_to_obj
1818
from common.utils import check_command_output, run_command
1919

20-
TORQUE_NODE_ERROR_STATES = ("down", "offline", "unknown")
20+
TORQUE_NODE_ERROR_STATES = ("down", "unknown")
21+
TORQUE_NODE_DISABLED_STATE = "offline"
2122
TORQUE_NODE_STATES = (
2223
"free",
2324
"offline",
@@ -129,6 +130,20 @@ def delete_nodes(hosts):
129130
return succeeded_hosts
130131

131132

133+
def lock_node(hostname, unlock=False, note=None):
134+
# hostname format: ip-10-0-0-114.eu-west-1.compute.internal
135+
hostname = hostname.split(".")[0]
136+
mod = unlock and "-c" or "-o"
137+
command = [TORQUE_BIN_DIR + "pbsnodes", mod, hostname]
138+
if note:
139+
command.append("-N '{}'".format(note))
140+
try:
141+
run_command(command)
142+
except subprocess.CalledProcessError:
143+
logging.error("Error %s host %s", "unlocking" if unlock else "locking", hostname)
144+
raise
145+
146+
132147
def update_cluster_limits(max_nodes, node_slots):
133148
try:
134149
logging.info("Updating cluster limits: max_nodes=%d, node_slots=%d", max_nodes, node_slots)

src/common/utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class EventType(Enum):
4343

4444
Host = collections.namedtuple("Host", ["instance_id", "hostname", "slots", "gpus"])
4545
UpdateEvent = collections.namedtuple("UpdateEvent", ["action", "message", "host"])
46+
INSTANCE_ALIVE_STATE = ["pending", "running"]
4647

4748

4849
def load_module(module):
@@ -389,3 +390,38 @@ def retrieve_max_cluster_size(region, proxy_config, asg_name, fallback):
389390
)
390391
log.critical(error_msg)
391392
raise CriticalError(error_msg)
393+
394+
395+
def get_cluster_instance_info(cluster_name, region, include_master=False):
396+
"""Return a dict of instance_id to nodename."""
397+
try:
398+
instances_in_cluster = {}
399+
ec2_client = boto3.client("ec2", region_name=region)
400+
nodes_to_include = ["Compute", "Master"] if include_master else ["Compute"]
401+
next_token = None
402+
while True:
403+
function_args = {
404+
"Filters": [
405+
{"Name": "tag:Application", "Values": [cluster_name]},
406+
{"Name": "tag:Name", "Values": nodes_to_include},
407+
],
408+
"MaxResults": 1000,
409+
}
410+
if next_token:
411+
function_args["NextToken"] = next_token
412+
response = ec2_client.describe_instances(**function_args)
413+
for reservation in response.get("Reservations"):
414+
for instance in reservation.get("Instances"):
415+
is_alive = instance.get("State").get("Name") in INSTANCE_ALIVE_STATE
416+
instance_id = instance.get("InstanceId")
417+
hostname = instance.get("PrivateDnsName").split(".")[0]
418+
if is_alive:
419+
instances_in_cluster[instance_id] = hostname
420+
next_token = response.get("NextToken")
421+
if not next_token or next_token == "null":
422+
break
423+
424+
return instances_in_cluster
425+
426+
except Exception as e:
427+
logging.error("Failed retrieving instance_ids for cluster {} with exception: {}".format(cluster_name, e))

src/nodewatcher/nodewatcher.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ def _init_idletime():
277277

278278

279279
def _lock_and_terminate(region, proxy_config, scheduler_module, hostname, instance_id):
280+
# handle case that the instance is placed in lock by scheduled event and has job running
281+
if _has_jobs(scheduler_module, hostname):
282+
log.info("Instance has active jobs.")
283+
return
284+
# handle case that instance has no job running to begin with
280285
_lock_host(scheduler_module, hostname)
281286
if _has_jobs(scheduler_module, hostname):
282287
log.info("Instance has active jobs.")

src/nodewatcher/plugins/sge.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import subprocess
1515

1616
from common.schedulers.sge_commands import (
17+
SGE_DISABLED_STATE,
1718
SGE_ERROR_STATES,
1819
SGE_HOLD_STATE,
1920
get_compute_nodes_info,
@@ -86,7 +87,11 @@ def is_node_down():
8687

8788
node = nodes.get(host_fqdn, nodes.get(hostname))
8889
log.info("Node is in state: '{0}'".format(node.state))
90+
# check if any error state is present
8991
if all(error_state not in node.state for error_state in SGE_ERROR_STATES):
92+
# Consider the node down if it's in disabled state and there is no job running
93+
if SGE_DISABLED_STATE in node.state and not has_jobs(hostname):
94+
return True
9095
return False
9196
except Exception as e:
9297
log.error("Failed when checking if node is down with exception %s. Reporting node as down.", e)

src/nodewatcher/plugins/slurm.py

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,14 @@
1212
import logging
1313
import subprocess
1414

15-
from common.schedulers.slurm_commands import PENDING_RESOURCES_REASONS, get_pending_jobs_info
16-
from common.utils import check_command_output, run_command
15+
from common.schedulers.slurm_commands import (
16+
PENDING_RESOURCES_REASONS,
17+
SLURM_NODE_ERROR_STATES,
18+
get_node_state,
19+
get_pending_jobs_info,
20+
lock_node,
21+
)
22+
from common.utils import check_command_output
1723

1824
log = logging.getLogger(__name__)
1925

@@ -54,43 +60,16 @@ def has_pending_jobs(instance_properties, max_size):
5460

5561

5662
def lock_host(hostname, unlock=False):
57-
# hostname format: ip-10-0-0-114.eu-west-1.compute.internal
58-
hostname = hostname.split(".")[0]
59-
if unlock:
60-
log.info("Unlocking host %s", hostname)
61-
command = [
62-
"/opt/slurm/bin/scontrol",
63-
"update",
64-
"NodeName={0}".format(hostname),
65-
"State=RESUME",
66-
'Reason="Unlocking"',
67-
]
68-
else:
69-
log.info("Locking host %s", hostname)
70-
command = [
71-
"/opt/slurm/bin/scontrol",
72-
"update",
73-
"NodeName={0}".format(hostname),
74-
"State=DRAIN",
75-
'Reason="Shutting down"',
76-
]
77-
try:
78-
run_command(command)
79-
except subprocess.CalledProcessError:
80-
log.error("Error %s host %s", "unlocking" if unlock else "locking", hostname)
63+
lock_node(hostname, unlock=unlock)
8164

8265

8366
def is_node_down():
8467
"""Check if node is down according to scheduler."""
8568
try:
86-
# retrieves the state of a specific node
87-
# https://slurm.schedmd.com/sinfo.html#lbAG
88-
# Output format:
89-
# down*
90-
command = "/bin/bash -c \"/opt/slurm/bin/sinfo --noheader -o '%T' -n $(hostname)\""
91-
output = check_command_output(command).strip()
69+
hostname = check_command_output("hostname").strip()
70+
output = get_node_state(hostname)
9271
log.info("Node is in state: '{0}'".format(output))
93-
if output and all(state not in output for state in ["down", "drained", "fail"]):
72+
if output and all(state not in output for state in SLURM_NODE_ERROR_STATES):
9473
return False
9574
except Exception as e:
9675
log.error("Failed when checking if node is down with exception %s. Reporting node as down.", e)

src/nodewatcher/plugins/torque.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@
1010
# limitations under the License.
1111

1212
import logging
13-
import subprocess
1413

1514
from common.schedulers.torque_commands import (
16-
TORQUE_BIN_DIR,
15+
TORQUE_NODE_DISABLED_STATE,
1716
TORQUE_NODE_ERROR_STATES,
1817
TORQUE_RUNNING_JOB_STATE,
1918
TORQUE_SUSPENDED_JOB_STATE,
2019
get_compute_nodes_info,
2120
get_jobs_info,
2221
get_pending_jobs_info,
22+
lock_node,
2323
)
24-
from common.utils import check_command_output, run_command
24+
from common.utils import check_command_output
2525

2626
log = logging.getLogger(__name__)
2727

@@ -56,14 +56,7 @@ def has_pending_jobs(instance_properties, max_size):
5656

5757

5858
def lock_host(hostname, unlock=False):
59-
# hostname format: ip-10-0-0-114.eu-west-1.compute.internal
60-
hostname = hostname.split(".")[0]
61-
mod = unlock and "-c" or "-o"
62-
command = [TORQUE_BIN_DIR + "pbsnodes", mod, hostname]
63-
try:
64-
run_command(command)
65-
except subprocess.CalledProcessError:
66-
log.error("Error %s host %s", "unlocking" if unlock else "locking", hostname)
59+
lock_node(hostname, unlock=unlock)
6760

6861

6962
def is_node_down():
@@ -74,6 +67,10 @@ def is_node_down():
7467
if node:
7568
log.info("Node is in state: '{0}'".format(node.state))
7669
if all(error_state not in node.state for error_state in TORQUE_NODE_ERROR_STATES):
70+
# Consider the node down if it is in Disabled state placed by scheduled event
71+
# and does not have job
72+
if TORQUE_NODE_DISABLED_STATE in node.state and not has_jobs(hostname):
73+
return True
7774
return False
7875
else:
7976
log.warning("Node is not attached to scheduler. Reporting as down")

src/sqswatcher/plugins/sge.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,17 @@
99
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
1010
# limitations under the License.
1111
import logging
12+
import socket
1213

1314
from common.schedulers.sge_commands import (
1415
QCONF_COMMANDS,
16+
SGE_DISABLED_STATE,
1517
add_host_slots,
1618
add_hosts_to_group,
1719
exec_qconf_command,
20+
get_compute_nodes_info,
1821
install_sge_on_compute_nodes,
22+
lock_host,
1923
remove_hosts_from_group,
2024
remove_hosts_from_queue,
2125
)
@@ -99,5 +103,48 @@ def update_cluster(max_cluster_size, cluster_user, update_events, instance_prope
99103
return failed, succeeded
100104

101105

106+
def _is_node_locked(hostname):
107+
node_info = get_compute_nodes_info(hostname_filter=hostname)
108+
node = node_info.get(socket.getfqdn(hostname), node_info.get(hostname))
109+
if SGE_DISABLED_STATE in node.state:
110+
return True
111+
return False
112+
113+
114+
def perform_health_actions(health_events):
115+
"""Update and write node lists( and gres_nodes if instance has GPU); restart relevant nodes."""
116+
failed = []
117+
succeeded = []
118+
for event in health_events:
119+
try:
120+
# to-do, ignore fail to lock message if node is not in scheduler
121+
if _is_node_locked(event.host.hostname):
122+
log.error(
123+
"Instance {}/{} currently in disabled state 'd'. "
124+
"Risk of lock being released by nodewatcher if locking the node because of scheduled event now. "
125+
"Marking event as failed to retry later.".format(event.host.instance_id, event.host.hostname)
126+
)
127+
failed.append(event)
128+
continue
129+
lock_host(event.host.hostname)
130+
if _is_node_locked:
131+
succeeded.append(event)
132+
log.info(
133+
"Successfully locked {} in response to scheduled maintainence event".format(event.host.hostname)
134+
)
135+
else:
136+
failed.append(event)
137+
log.info("Failed to lock {} in response to scheduled maintainence event".format(event.host.hostname))
138+
except Exception as e:
139+
log.error(
140+
"Encountered exception when locking {} because of a scheduled maintainence event: {}".format(
141+
event.host.hostname, e
142+
)
143+
)
144+
failed.append(event)
145+
146+
return failed, succeeded
147+
148+
102149
def init():
103150
pass

0 commit comments

Comments
 (0)