diff --git a/.github/scripts/github_common.py b/.github/scripts/github_common.py index aa9f96ffc1..6075d3a06c 100644 --- a/.github/scripts/github_common.py +++ b/.github/scripts/github_common.py @@ -53,12 +53,17 @@ def deregister_runners(gh_token: str, runner_name: str) -> None: if runner_name in runner["name"]: delete_runner(gh_token, runner) -def issue_post(gh_token: str, body: str) -> None: +# obtain issue number separately since workflow-monitor shouldn't query the GH-A runner area +# since it's separate from it +def get_issue_number() -> int: with open(ci_env['GITHUB_EVENT_PATH']) as f: event_payload = json.load(f) gh_issue_id = event_payload["number"] + return gh_issue_id + raise Exception(f"Unable to return an issue number using {ci_env['GITHUB_EVENT_PATH']}") - res = requests.post(f"{gh_issues_api_url}/{gh_issue_id}/comments", +def issue_post(gh_token: str, issue_num: int, body: str) -> None: + res = requests.post(f"{gh_issues_api_url}/{issue_num}/comments", json={"body": body}, headers=get_header(gh_token)) if res.status_code != 201: raise Exception(f"HTTP POST error: {res} {res.json()}\nUnable to post GitHub PR comment.") diff --git a/.github/scripts/platform_lib.py b/.github/scripts/platform_lib.py index fd4ec4e2c8..1b4974589c 100644 --- a/.github/scripts/platform_lib.py +++ b/.github/scripts/platform_lib.py @@ -11,7 +11,7 @@ from xmlrpc.client import DateTime from ci_variables import ci_env -from github_common import issue_post +from github_common import issue_post, get_issue_number from azure.mgmt.resource import ResourceManagementClient from azure.identity import DefaultAzureCredential @@ -147,7 +147,7 @@ def get_manager_hostname(self, workflow_tag: str) -> str: return f"centos@{self.get_manager_ip(workflow_tag)}" @abc.abstractmethod - def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None: + def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str, issue_id: int) -> None: """ Check if run farm instances are running past a `timeout` minutes designated time. If so, then terminate them. """ raise NotImplementedError @@ -182,7 +182,7 @@ def check_manager_exists(self, workflow_tag: str) -> bool: return not (inst is None) def find_manager(self, workflow_tag: str): - instances = get_instances_with_filter([self.get_filter(workflow_tag), manager_filter]) + instances = get_instances_with_filter([self.get_filter(workflow_tag), manager_filter], allowed_states = ['pending', 'running', 'stopping', 'stopped']) if instances: assert len(instances) == 1 # this must be called before any new instances are launched by workflow return instances[0] @@ -191,12 +191,12 @@ def find_manager(self, workflow_tag: str): def find_all_workflow_instances(self, workflow_tag: str) -> List: """ Grabs a list of all instance dicts sharing the CI workflow run's unique tag """ - return get_instances_with_filter([self.get_filter(workflow_tag)]) + return get_instances_with_filter([self.get_filter(workflow_tag)], allowed_states = ['pending', 'running', 'stopping', 'stopped']) def find_all_ci_instances(self) -> List: """ Grabs a list of all instances across all CI using the CI unique tag key""" all_ci_instances_filter = self.get_filter('*') - all_ci_instances = get_instances_with_filter([all_ci_instances_filter], allowed_states=['*']) + all_ci_instances = get_instances_with_filter([all_ci_instances_filter], allowed_states = ['pending', 'running', 'stopping', 'stopped']) return all_ci_instances def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List: @@ -205,7 +205,7 @@ def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List: {'Name': 'tag:fsimcluster', 'Values': [f'*{workflow_tag}*']}, {'Name': 'instance-type', 'Values': ['f1.2xlarge', 'f1.4xlarge', 'f1.16xlarge']}, ] - ci_instances = get_instances_with_filter(instances_filter, allowed_states=['*']) + ci_instances = get_instances_with_filter(instances_filter, allowed_states = ['pending', 'running', 'stopping', 'stopped']) return ci_instances def get_manager_ip(self, workflow_tag: str) -> str: @@ -281,7 +281,7 @@ def instance_metadata_str(self, instance) -> str: return static_md + dynamic_md - def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None: + def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str, issue_id: int) -> None: # We need this in case terminate is called in setup-self-hosted-workflow before aws-configure is run if self.client is None: self.client = boto3.client('ec2') @@ -293,13 +293,13 @@ def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str map(lambda x: (x, x['LaunchTime']), instances)) for inst in instances_to_terminate: - print("Uncaught run farm instance shutdown detected") + print(f"Uncaught run farm instance shutdown detected for inst: {inst}") self.client.terminate_instances(InstanceIds=[inst['InstanceId']]) print(f"Terminated run farm instance {inst['InstanceId']}") # post comment after instances are terminated just in case there is an issue with posting if len(instances_to_terminate) > 0: - issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], + issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], issue_id, f"Uncaught {len(instances_to_terminate)} FPGA instance shutdown(s) detected for CI run: {ci_env['GITHUB_RUN_ID']}. Verify CI state before submitting PR.") @@ -447,7 +447,7 @@ def terminate_azure_vms(self, resource_list: List) -> None: else: print(f"Succeeded in deleting VM {vm['name']}") - def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None: + def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str, issue_id: int) -> None: raise NotImplementedError def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List: diff --git a/.github/scripts/setup-workflow-monitor.py b/.github/scripts/setup-workflow-monitor.py index 576eb5db5b..888b9596ed 100755 --- a/.github/scripts/setup-workflow-monitor.py +++ b/.github/scripts/setup-workflow-monitor.py @@ -7,6 +7,7 @@ from common import manager_ci_dir, set_fabric_firesim_pem from platform_lib import Platform, get_platform_enum +from github_common import get_issue_number from ci_variables import RUN_LOCAL, ci_env def setup_workflow_monitor(platform: Platform, max_runtime: int) -> None: @@ -36,7 +37,7 @@ def setup_workflow_monitor(platform: Platform, max_runtime: int) -> None: run((f"screen -S ttl -dm bash -c \'sleep {int(max_runtime) * 3600};" f"./change-workflow-instance-states.py {platform} {ci_env['GITHUB_RUN_ID']} terminate {ci_env['PERSONAL_ACCESS_TOKEN']}\'") , pty=False) - run((f"screen -S workflow-monitor -L -Logfile {workflow_log} -dm bash -c \'./workflow-monitor.py {platform}\'") + run((f"screen -S workflow-monitor -L -Logfile {workflow_log} -dm bash -c \'./workflow-monitor.py {platform} {get_issue_number()}\'") , pty=False) time.sleep(30) diff --git a/.github/scripts/workflow-monitor.py b/.github/scripts/workflow-monitor.py index aab35cbdb8..7abc8a53f6 100755 --- a/.github/scripts/workflow-monitor.py +++ b/.github/scripts/workflow-monitor.py @@ -38,7 +38,7 @@ def wrap_in_code(wrap: str) -> str: return f"\n```\n{wrap}\n```" -def main(platform: Platform): +def main(platform: Platform, issue_id: int): consecutive_failures = 0 print("Workflow monitor started") @@ -59,16 +59,16 @@ def main(platform: Platform): print(f"Workflow {ci_env['GITHUB_RUN_ID']} status: {state_status} {state_concl}") # check that select instances are terminated on time - platform_lib.check_and_terminate_run_farm_instances(45, ci_env['GITHUB_RUN_ID']) + platform_lib.check_and_terminate_run_farm_instances(45, ci_env['GITHUB_RUN_ID'], issue_id) if state_status in ['completed']: if state_concl in TERMINATE_STATES: - platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID']) + platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'], issue_id) platform_lib.terminate_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID']) return elif state_concl in STOP_STATES: # if we stop then we should terminate the run farm instances - platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID']) + platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'], issue_id) platform_lib.stop_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID']) return elif state_concl not in NOP_STATES: @@ -87,14 +87,16 @@ def main(platform: Platform): post_str += f"**Exception Message:**{wrap_in_code(e)}\n" post_str += f"**Traceback Message:**{wrap_in_code(traceback.format_exc())}" - issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], post_str) + print(post_str) + issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], issue_id, post_str) - platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID']) + platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'], issue_id) platform_lib.terminate_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID']) post_str = f"Instances for CI run {ci_env['GITHUB_RUN_ID']} were supposedly terminated. Verify termination manually.\n" - issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], post_str) + print(post_str) + issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], issue_id, post_str) exit(1) @@ -105,7 +107,9 @@ def main(platform: Platform): parser.add_argument('platform', choices = platform_choices, help = "The platform CI is being run on") + parser.add_argument('issue_id', + help="Issue ID that is used for posting messages") args = parser.parse_args() platform = get_platform_enum(args.platform) - main(platform) + main(platform, args.issue_id)