Skip to content

Commit

Permalink
Issue #3534: Tag compute resources according to the tool's metadata -…
Browse files Browse the repository at this point in the history
… support --custom_tags for nodeup
  • Loading branch information
ekazachkova committed May 23, 2024
1 parent e89bdcb commit 574f423
Showing 1 changed file with 40 additions and 15 deletions.
55 changes: 40 additions & 15 deletions scripts/autoscaling/aws/nodeup.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,25 @@ def run_id_tag(run_id, pool_id):
return tags


def get_tags(run_id, cloud_region, pool_id):
def build_custom_tags(tags):
instance_tags = []
if not tags:
return instance_tags
for tag_name, tag_value in tags.items():
tag = {'Key': tag_name}
if tag_value:
tag.update({'Value': tag_value})
instance_tags.append(tag)
return instance_tags


def get_tags(run_id, cloud_region, pool_id, custom_tags):
tags = run_id_tag(run_id, pool_id)
res_tags = resource_tags(cloud_region)
if res_tags:
tags.extend(res_tags)
if custom_tags:
tags.extend(custom_tags)
return tags


Expand Down Expand Up @@ -436,25 +450,26 @@ def get_random_subnet(ec2):
def run_instance(api_url, api_token, api_user, bid_price, ec2, aws_region, ins_hdd, kms_encyr_key_id, ins_img, ins_platform, ins_key, ins_type,
is_spot, num_rep, run_id, pool_id, time_rep, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, kube_client,
global_distribution_url, pre_pull_images, instance_additional_spec,
availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network):
availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network,
instance_tags):
swap_size = get_swap_size(aws_region, ins_type, is_spot)
user_data_script = get_user_data_script(api_url, api_token, api_user, aws_region, ins_type, ins_img, ins_platform, kube_ip,
kubeadm_token, kubeadm_cert_hash, kube_node_token,
global_distribution_url, swap_size, pre_pull_images, node_ssh_port,
run_id)
if is_spot:
ins_id, ins_ip = find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins_type, ins_key, ins_hdd, kms_encyr_key_id,
user_data_script, num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network)
user_data_script, num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, instance_tags)
else:
ins_id, ins_ip = run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd, kms_encyr_key_id, run_id, pool_id, user_data_script,
num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network)
num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, instance_tags)
return ins_id, ins_ip


def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd,
kms_encyr_key_id, run_id, pool_id, user_data_script, num_rep, time_rep, swap_size,
kube_client, instance_additional_spec, availability_zone, security_groups, subnet,
network_interface, is_dedicated, performance_network):
network_interface, is_dedicated, performance_network, instance_tags):
pipe_log('Creating on demand instance')
allowed_networks = get_networks_config(ec2, aws_region, ins_type)
additional_args = instance_additional_spec if instance_additional_spec else {}
Expand Down Expand Up @@ -531,6 +546,7 @@ def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd,
})

response = {}
instance_tags = build_custom_tags(instance_tags)
try:
response = ec2.run_instances(
ImageId=ins_img,
Expand All @@ -543,7 +559,7 @@ def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd,
TagSpecifications=[
{
'ResourceType': 'instance',
"Tags": get_tags(run_id, aws_region, pool_id)
"Tags": get_tags(run_id, aws_region, pool_id, instance_tags)
}
],
MetadataOptions={
Expand Down Expand Up @@ -584,6 +600,8 @@ def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd,
pipe_log('Instance created. ID: {}, IP: {}\n-'.format(ins_id, ins_ip))

ebs_tags = resource_tags(aws_region)
if instance_tags:
ebs_tags.extend(instance_tags)
if ebs_tags:
instance_description = ec2.describe_instances(InstanceIds=[ins_id])['Reservations'][0]['Instances'][0]
volumes = instance_description['BlockDeviceMappings']
Expand Down Expand Up @@ -1108,7 +1126,7 @@ def exit_if_spot_unavailable(run_id, last_status):
def find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins_type, ins_key,
ins_hdd, kms_encyr_key_id, user_data_script, num_rep, time_rep, swap_size, kube_client,
instance_additional_spec, availability_zone, security_groups, subnet, network_interface,
is_dedicated, performance_network):
is_dedicated, performance_network, instance_tags):
pipe_log('Creating spot request')

pipe_log('- Checking spot prices for current region...')
Expand Down Expand Up @@ -1292,12 +1310,15 @@ def find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins
sleep(time_rep)
continue
ins_ip = instance_reservation['PrivateIpAddress']
instance_tags = build_custom_tags(instance_tags)
ec2.create_tags(
Resources=[ins_id],
Tags=get_tags(run_id, aws_region, pool_id),
Tags=get_tags(run_id, aws_region, pool_id, instance_tags),
)

ebs_tags = resource_tags(aws_region)
if instance_tags:
ebs_tags.extend(instance_tags)
if ebs_tags:
volumes = instance_reservation['BlockDeviceMappings']
for volume in volumes:
Expand Down Expand Up @@ -1353,7 +1374,7 @@ def wait_for_fulfilment(status):
or status == 'pending-fulfillment' or status == 'fulfilled'


def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id):
def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id, instance_tags):
pipe_log('Checking if spot request for RunID {} already exists...'.format(run_id))
for interation in range(0, 5):
spot_req = get_spot_req_by_run_id(ec2, run_id)
Expand All @@ -1363,7 +1384,7 @@ def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_i
pipe_log('- Spot request for RunID {} already exists: SpotInstanceRequestId: {}, Status: {}'.format(run_id, request_id, status))
rep = 0
if status == 'request-canceled-and-instance-running' and instance_is_active(ec2, spot_req['InstanceId']):
return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id)
return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, instance_tags)
if wait_for_fulfilment(status):
while status != 'fulfilled':
pipe_log('- Spot request ({}) is not yet fulfilled. Waiting...'.format(request_id))
Expand All @@ -1378,7 +1399,7 @@ def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_i
exit_if_spot_unavailable(run_id, status)
return '', ''
if instance_is_active(ec2, spot_req['InstanceId']):
return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id)
return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, instance_tags)
sleep(5)
pipe_log('No spot request for RunID {} found\n-'.format(run_id))
return '', ''
Expand All @@ -1393,15 +1414,16 @@ def get_spot_req_by_run_id(ec2, run_id):
return None


def tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id):
def tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, instance_tags):
ins_id = spot_req['InstanceId']
pipe_log('Setting \"Name={}\" tag for instance {}'.format(run_id, ins_id))
instance = ec2.describe_instances(InstanceIds=[ins_id])
ins_ip = instance['Reservations'][0]['Instances'][0]['PrivateIpAddress']
if not tag_name_is_present(instance): # create tag name if not presents
instance_tags = build_custom_tags(instance_tags)
ec2.create_tags(
Resources=[ins_id],
Tags=get_tags(run_id, aws_region, pool_id),
Tags=get_tags(run_id, aws_region, pool_id, instance_tags),
)
pipe_log('Tag ({}) created for instance ({})\n-'.format(run_id, ins_id))
else:
Expand Down Expand Up @@ -1460,6 +1482,7 @@ def main():
parser.add_argument("--node_ssh_port", type=str, default='')
parser.add_argument("--label", type=str, default=[], required=False, action='append')
parser.add_argument("--image", type=str, default=[], required=False, action='append')
parser.add_argument("--custom_tags", type=str, default=[], required=False, action='append')

args, unknown = parser.parse_known_args()
ins_key = args.ins_key
Expand Down Expand Up @@ -1493,6 +1516,7 @@ def main():
pre_pull_images = args.image
additional_labels = map_labels_to_dict(args.label)
pool_id = additional_labels.get(POOL_ID_KEY)
instance_tags = map_labels_to_dict(args.custom_tags)
global_distribution_url = os.getenv('GLOBAL_DISTRIBUTION_URL',
default='https://cloud-pipeline-oss-builds.s3.us-east-1.amazonaws.com/')

Expand Down Expand Up @@ -1576,13 +1600,14 @@ def main():

ins_id, ins_ip = verify_run_id(ec2, run_id)
if not ins_id:
ins_id, ins_ip = check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id)
ins_id, ins_ip = check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id,
instance_tags)

if not ins_id:
ins_id, ins_ip = run_instance(api_url, api_token, api_user, bid_price, ec2, aws_region, ins_hdd, kms_encyr_key_id, ins_img, ins_platform, ins_key, ins_type, is_spot,
num_rep, run_id, pool_id, time_rep, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, api,
global_distribution_url, pre_pull_images, instance_additional_spec,
availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network)
availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network, instance_tags)

check_instance(ec2, ins_id, run_id, num_rep, time_rep, api)

Expand Down

0 comments on commit 574f423

Please sign in to comment.