Skip to content

Commit

Permalink
Fix issue number propagation | Catch only running/stopped CI instances
Browse files Browse the repository at this point in the history
  • Loading branch information
abejgonzalez committed Dec 10, 2022
1 parent 36d8b7e commit 46880ce
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 21 deletions.
9 changes: 7 additions & 2 deletions .github/scripts/github_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,17 @@ def deregister_runners(gh_token: str, runner_name: str) -> None:
if runner_name in runner["name"]:
delete_runner(gh_token, runner)

def issue_post(gh_token: str, body: str) -> None:
# obtain issue number separately since workflow-monitor shouldn't query the GH-A runner area
# since it's separate from it
def get_issue_number() -> int:
with open(ci_env['GITHUB_EVENT_PATH']) as f:
event_payload = json.load(f)
gh_issue_id = event_payload["number"]
return gh_issue_id
raise Exception(f"Unable to return an issue number using {ci_env['GITHUB_EVENT_PATH']}")

res = requests.post(f"{gh_issues_api_url}/{gh_issue_id}/comments",
def issue_post(gh_token: str, issue_num: int, body: str) -> None:
res = requests.post(f"{gh_issues_api_url}/{issue_num}/comments",
json={"body": body}, headers=get_header(gh_token))
if res.status_code != 201:
raise Exception(f"HTTP POST error: {res} {res.json()}\nUnable to post GitHub PR comment.")
20 changes: 10 additions & 10 deletions .github/scripts/platform_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from xmlrpc.client import DateTime

from ci_variables import ci_env
from github_common import issue_post
from github_common import issue_post, get_issue_number

from azure.mgmt.resource import ResourceManagementClient
from azure.identity import DefaultAzureCredential
Expand Down Expand Up @@ -147,7 +147,7 @@ def get_manager_hostname(self, workflow_tag: str) -> str:
return f"centos@{self.get_manager_ip(workflow_tag)}"

@abc.abstractmethod
def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None:
def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str, issue_id: int) -> None:
""" Check if run farm instances are running past a `timeout` minutes designated time. If so, then terminate them. """
raise NotImplementedError

Expand Down Expand Up @@ -182,7 +182,7 @@ def check_manager_exists(self, workflow_tag: str) -> bool:
return not (inst is None)

def find_manager(self, workflow_tag: str):
instances = get_instances_with_filter([self.get_filter(workflow_tag), manager_filter])
instances = get_instances_with_filter([self.get_filter(workflow_tag), manager_filter], allowed_states = ['pending', 'running', 'stopping', 'stopped'])
if instances:
assert len(instances) == 1 # this must be called before any new instances are launched by workflow
return instances[0]
Expand All @@ -191,12 +191,12 @@ def find_manager(self, workflow_tag: str):

def find_all_workflow_instances(self, workflow_tag: str) -> List:
""" Grabs a list of all instance dicts sharing the CI workflow run's unique tag """
return get_instances_with_filter([self.get_filter(workflow_tag)])
return get_instances_with_filter([self.get_filter(workflow_tag)], allowed_states = ['pending', 'running', 'stopping', 'stopped'])

def find_all_ci_instances(self) -> List:
""" Grabs a list of all instances across all CI using the CI unique tag key"""
all_ci_instances_filter = self.get_filter('*')
all_ci_instances = get_instances_with_filter([all_ci_instances_filter], allowed_states=['*'])
all_ci_instances = get_instances_with_filter([all_ci_instances_filter], allowed_states = ['pending', 'running', 'stopping', 'stopped'])
return all_ci_instances

def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List:
Expand All @@ -205,7 +205,7 @@ def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List:
{'Name': 'tag:fsimcluster', 'Values': [f'*{workflow_tag}*']},
{'Name': 'instance-type', 'Values': ['f1.2xlarge', 'f1.4xlarge', 'f1.16xlarge']},
]
ci_instances = get_instances_with_filter(instances_filter, allowed_states=['*'])
ci_instances = get_instances_with_filter(instances_filter, allowed_states = ['pending', 'running', 'stopping', 'stopped'])
return ci_instances

def get_manager_ip(self, workflow_tag: str) -> str:
Expand Down Expand Up @@ -281,7 +281,7 @@ def instance_metadata_str(self, instance) -> str:

return static_md + dynamic_md

def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None:
def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str, issue_id: int) -> None:
# We need this in case terminate is called in setup-self-hosted-workflow before aws-configure is run
if self.client is None:
self.client = boto3.client('ec2')
Expand All @@ -293,13 +293,13 @@ def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str
map(lambda x: (x, x['LaunchTime']), instances))

for inst in instances_to_terminate:
print("Uncaught run farm instance shutdown detected")
print(f"Uncaught run farm instance shutdown detected for inst: {inst}")
self.client.terminate_instances(InstanceIds=[inst['InstanceId']])
print(f"Terminated run farm instance {inst['InstanceId']}")

# post comment after instances are terminated just in case there is an issue with posting
if len(instances_to_terminate) > 0:
issue_post(ci_env['PERSONAL_ACCESS_TOKEN'],
issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], issue_id,
f"Uncaught {len(instances_to_terminate)} FPGA instance shutdown(s) detected for CI run: {ci_env['GITHUB_RUN_ID']}. Verify CI state before submitting PR.")


Expand Down Expand Up @@ -447,7 +447,7 @@ def terminate_azure_vms(self, resource_list: List) -> None:
else:
print(f"Succeeded in deleting VM {vm['name']}")

def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str) -> None:
def check_and_terminate_run_farm_instances(self, timeout: int, workflow_tag: str, issue_id: int) -> None:
raise NotImplementedError

def find_run_farm_ci_instances(self, workflow_tag: str = '*') -> List:
Expand Down
3 changes: 2 additions & 1 deletion .github/scripts/setup-workflow-monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from common import manager_ci_dir, set_fabric_firesim_pem
from platform_lib import Platform, get_platform_enum
from github_common import get_issue_number
from ci_variables import RUN_LOCAL, ci_env

def setup_workflow_monitor(platform: Platform, max_runtime: int) -> None:
Expand Down Expand Up @@ -36,7 +37,7 @@ def setup_workflow_monitor(platform: Platform, max_runtime: int) -> None:
run((f"screen -S ttl -dm bash -c \'sleep {int(max_runtime) * 3600};"
f"./change-workflow-instance-states.py {platform} {ci_env['GITHUB_RUN_ID']} terminate {ci_env['PERSONAL_ACCESS_TOKEN']}\'")
, pty=False)
run((f"screen -S workflow-monitor -L -Logfile {workflow_log} -dm bash -c \'./workflow-monitor.py {platform}\'")
run((f"screen -S workflow-monitor -L -Logfile {workflow_log} -dm bash -c \'./workflow-monitor.py {platform} {get_issue_number()}\'")
, pty=False)

time.sleep(30)
Expand Down
20 changes: 12 additions & 8 deletions .github/scripts/workflow-monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
def wrap_in_code(wrap: str) -> str:
return f"\n```\n{wrap}\n```"

def main(platform: Platform):
def main(platform: Platform, issue_id: int):
consecutive_failures = 0

print("Workflow monitor started")
Expand All @@ -59,16 +59,16 @@ def main(platform: Platform):
print(f"Workflow {ci_env['GITHUB_RUN_ID']} status: {state_status} {state_concl}")

# check that select instances are terminated on time
platform_lib.check_and_terminate_run_farm_instances(45, ci_env['GITHUB_RUN_ID'])
platform_lib.check_and_terminate_run_farm_instances(45, ci_env['GITHUB_RUN_ID'], issue_id)

if state_status in ['completed']:
if state_concl in TERMINATE_STATES:
platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'])
platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'], issue_id)
platform_lib.terminate_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID'])
return
elif state_concl in STOP_STATES:
# if we stop then we should terminate the run farm instances
platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'])
platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'], issue_id)
platform_lib.stop_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID'])
return
elif state_concl not in NOP_STATES:
Expand All @@ -87,14 +87,16 @@ def main(platform: Platform):
post_str += f"**Exception Message:**{wrap_in_code(e)}\n"
post_str += f"**Traceback Message:**{wrap_in_code(traceback.format_exc())}"

issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], post_str)
print(post_str)
issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], issue_id, post_str)

platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'])
platform_lib.check_and_terminate_run_farm_instances(0, ci_env['GITHUB_RUN_ID'], issue_id)
platform_lib.terminate_instances(ci_env['PERSONAL_ACCESS_TOKEN'], ci_env['GITHUB_RUN_ID'])

post_str = f"Instances for CI run {ci_env['GITHUB_RUN_ID']} were supposedly terminated. Verify termination manually.\n"

issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], post_str)
print(post_str)
issue_post(ci_env['PERSONAL_ACCESS_TOKEN'], issue_id, post_str)

exit(1)

Expand All @@ -105,7 +107,9 @@ def main(platform: Platform):
parser.add_argument('platform',
choices = platform_choices,
help = "The platform CI is being run on")
parser.add_argument('issue_id',
help="Issue ID that is used for posting messages")
args = parser.parse_args()
platform = get_platform_enum(args.platform)

main(platform)
main(platform, args.issue_id)

0 comments on commit 46880ce

Please sign in to comment.