Skip to content

Commit

Permalink
Issue #3534: Tag compute resources according to the tool's metadata -…
Browse files Browse the repository at this point in the history
… support --custom_tags for reassign
  • Loading branch information
ekazachkova committed May 23, 2024
1 parent 574f423 commit d35113c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 38 deletions.
24 changes: 22 additions & 2 deletions scripts/autoscaling/aws/node_reassign.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,28 @@
KUBE_CONFIG_PATH = '~/.kube/config'


def find_and_tag_instance(ec2, old_id, new_id):
def build_custom_tags(input_tags):
if not input_tags:
return []
instance_tags = []
for input_tag in input_tags:
tag_parts = input_tag.split("=")
if len(tag_parts) == 1:
instance_tags.append({'Key': tag_parts[0]})
else:
instance_tags.append({
'Key': tag_parts[0],
'Value': tag_parts[1]
})
return instance_tags


def find_and_tag_instance(ec2, old_id, new_id, custom_tags):
response = ec2.describe_instances(Filters=[{'Name': 'tag:Name', 'Values': [old_id]},
{'Name': 'instance-state-name', 'Values': ['pending', 'running']}])
tags = [{'Key': 'Name', 'Value': new_id}]
if custom_tags:
tags.extend(custom_tags)
if len(response['Reservations']) > 0:
ins_id = response['Reservations'][0]['Instances'][0]['InstanceId']
ec2.create_tags(
Expand Down Expand Up @@ -111,14 +129,16 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument("--old_id", "-kid", type=str, required=True)
parser.add_argument("--new_id", "-nid", type=str, required=True)
parser.add_argument("--custom_tags", type=str, default=[], required=False, action='append')
args, unknown = parser.parse_known_args()
old_id = args.old_id
new_id = args.new_id
custom_tags = build_custom_tags(args.custom_tags)

kube_api = get_kube_api()
aws_region = get_aws_region(kube_api, old_id)
ec2 = boto3.client('ec2', region_name=aws_region)
ins_id = find_and_tag_instance(ec2, old_id, new_id)
ins_id = find_and_tag_instance(ec2, old_id, new_id, custom_tags)
nodename = verify_regnode(kube_api, ec2, ins_id)
change_label(kube_api, nodename, new_id, aws_region)

Expand Down
73 changes: 37 additions & 36 deletions scripts/autoscaling/aws/nodeup.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,18 +404,6 @@ def run_id_tag(run_id, pool_id):
return tags


def build_custom_tags(tags):
instance_tags = []
if not tags:
return instance_tags
for tag_name, tag_value in tags.items():
tag = {'Key': tag_name}
if tag_value:
tag.update({'Value': tag_value})
instance_tags.append(tag)
return instance_tags


def get_tags(run_id, cloud_region, pool_id, custom_tags):
tags = run_id_tag(run_id, pool_id)
res_tags = resource_tags(cloud_region)
Expand Down Expand Up @@ -451,25 +439,25 @@ def run_instance(api_url, api_token, api_user, bid_price, ec2, aws_region, ins_h
is_spot, num_rep, run_id, pool_id, time_rep, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, kube_client,
global_distribution_url, pre_pull_images, instance_additional_spec,
availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network,
instance_tags):
custom_tags):
swap_size = get_swap_size(aws_region, ins_type, is_spot)
user_data_script = get_user_data_script(api_url, api_token, api_user, aws_region, ins_type, ins_img, ins_platform, kube_ip,
kubeadm_token, kubeadm_cert_hash, kube_node_token,
global_distribution_url, swap_size, pre_pull_images, node_ssh_port,
run_id)
if is_spot:
ins_id, ins_ip = find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins_type, ins_key, ins_hdd, kms_encyr_key_id,
user_data_script, num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, instance_tags)
user_data_script, num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, custom_tags)
else:
ins_id, ins_ip = run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd, kms_encyr_key_id, run_id, pool_id, user_data_script,
num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, instance_tags)
num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, custom_tags)
return ins_id, ins_ip


def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd,
kms_encyr_key_id, run_id, pool_id, user_data_script, num_rep, time_rep, swap_size,
kube_client, instance_additional_spec, availability_zone, security_groups, subnet,
network_interface, is_dedicated, performance_network, instance_tags):
network_interface, is_dedicated, performance_network, custom_tags):
pipe_log('Creating on demand instance')
allowed_networks = get_networks_config(ec2, aws_region, ins_type)
additional_args = instance_additional_spec if instance_additional_spec else {}
Expand Down Expand Up @@ -546,7 +534,6 @@ def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd,
})

response = {}
instance_tags = build_custom_tags(instance_tags)
try:
response = ec2.run_instances(
ImageId=ins_img,
Expand All @@ -559,7 +546,7 @@ def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd,
TagSpecifications=[
{
'ResourceType': 'instance',
"Tags": get_tags(run_id, aws_region, pool_id, instance_tags)
"Tags": get_tags(run_id, aws_region, pool_id, custom_tags)
}
],
MetadataOptions={
Expand Down Expand Up @@ -600,8 +587,8 @@ def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd,
pipe_log('Instance created. ID: {}, IP: {}\n-'.format(ins_id, ins_ip))

ebs_tags = resource_tags(aws_region)
if instance_tags:
ebs_tags.extend(instance_tags)
if custom_tags:
ebs_tags.extend(custom_tags)
if ebs_tags:
instance_description = ec2.describe_instances(InstanceIds=[ins_id])['Reservations'][0]['Instances'][0]
volumes = instance_description['BlockDeviceMappings']
Expand Down Expand Up @@ -1126,7 +1113,7 @@ def exit_if_spot_unavailable(run_id, last_status):
def find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins_type, ins_key,
ins_hdd, kms_encyr_key_id, user_data_script, num_rep, time_rep, swap_size, kube_client,
instance_additional_spec, availability_zone, security_groups, subnet, network_interface,
is_dedicated, performance_network, instance_tags):
is_dedicated, performance_network, custom_tags):
pipe_log('Creating spot request')

pipe_log('- Checking spot prices for current region...')
Expand Down Expand Up @@ -1310,15 +1297,14 @@ def find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins
sleep(time_rep)
continue
ins_ip = instance_reservation['PrivateIpAddress']
instance_tags = build_custom_tags(instance_tags)
ec2.create_tags(
Resources=[ins_id],
Tags=get_tags(run_id, aws_region, pool_id, instance_tags),
Tags=get_tags(run_id, aws_region, pool_id, custom_tags),
)

ebs_tags = resource_tags(aws_region)
if instance_tags:
ebs_tags.extend(instance_tags)
if custom_tags:
ebs_tags.extend(custom_tags)
if ebs_tags:
volumes = instance_reservation['BlockDeviceMappings']
for volume in volumes:
Expand Down Expand Up @@ -1374,7 +1360,7 @@ def wait_for_fulfilment(status):
or status == 'pending-fulfillment' or status == 'fulfilled'


def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id, instance_tags):
def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id, custom_tags):
pipe_log('Checking if spot request for RunID {} already exists...'.format(run_id))
for interation in range(0, 5):
spot_req = get_spot_req_by_run_id(ec2, run_id)
Expand All @@ -1384,7 +1370,7 @@ def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_i
pipe_log('- Spot request for RunID {} already exists: SpotInstanceRequestId: {}, Status: {}'.format(run_id, request_id, status))
rep = 0
if status == 'request-canceled-and-instance-running' and instance_is_active(ec2, spot_req['InstanceId']):
return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, instance_tags)
return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, custom_tags)
if wait_for_fulfilment(status):
while status != 'fulfilled':
pipe_log('- Spot request ({}) is not yet fulfilled. Waiting...'.format(request_id))
Expand All @@ -1399,7 +1385,7 @@ def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_i
exit_if_spot_unavailable(run_id, status)
return '', ''
if instance_is_active(ec2, spot_req['InstanceId']):
return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, instance_tags)
return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, custom_tags)
sleep(5)
pipe_log('No spot request for RunID {} found\n-'.format(run_id))
return '', ''
Expand All @@ -1414,16 +1400,15 @@ def get_spot_req_by_run_id(ec2, run_id):
return None


def tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, instance_tags):
def tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, custom_tags):
ins_id = spot_req['InstanceId']
pipe_log('Setting \"Name={}\" tag for instance {}'.format(run_id, ins_id))
instance = ec2.describe_instances(InstanceIds=[ins_id])
ins_ip = instance['Reservations'][0]['Instances'][0]['PrivateIpAddress']
if not tag_name_is_present(instance): # create tag name if not presents
instance_tags = build_custom_tags(instance_tags)
ec2.create_tags(
Resources=[ins_id],
Tags=get_tags(run_id, aws_region, pool_id, instance_tags),
Tags=get_tags(run_id, aws_region, pool_id, custom_tags),
)
pipe_log('Tag ({}) created for instance ({})\n-'.format(run_id, ins_id))
else:
Expand Down Expand Up @@ -1453,6 +1438,22 @@ def map_labels_to_dict(additional_labels_list):
return additional_labels_dict


def build_custom_tags(input_tags):
if not input_tags:
return []
instance_tags = []
for input_tag in input_tags:
tag_parts = input_tag.split("=")
if len(tag_parts) == 1:
instance_tags.append({'Key': tag_parts[0]})
else:
instance_tags.append({
'Key': tag_parts[0],
'Value': tag_parts[1]
})
return instance_tags


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--ins_key", type=str, required=True)
Expand Down Expand Up @@ -1516,7 +1517,7 @@ def main():
pre_pull_images = args.image
additional_labels = map_labels_to_dict(args.label)
pool_id = additional_labels.get(POOL_ID_KEY)
instance_tags = map_labels_to_dict(args.custom_tags)
custom_tags = build_custom_tags(args.custom_tags)
global_distribution_url = os.getenv('GLOBAL_DISTRIBUTION_URL',
default='https://cloud-pipeline-oss-builds.s3.us-east-1.amazonaws.com/')

Expand Down Expand Up @@ -1588,11 +1589,11 @@ def main():
pipe_log('Found matching rule {instance_mask} for requested instance type {instance_type}'.format(instance_mask=allowed_instance["instance_mask"], instance_type=ins_type))
instance_additional_spec = allowed_instance["additional_spec"]
if instance_additional_spec:
pipe_log('Additional custom instance configuration will be added: {}'.format(instance_additional_spec))
pipe_log('Additional custom instance configuration will be added: {}'.format(instance_additional_spec))
if not ins_img or ins_img == 'null':
if allowed_instance and allowed_instance["instance_mask_ami"]:
ins_img = allowed_instance["instance_mask_ami"]
pipe_log('Instance image was not provided explicitly, {instance_image} will be used (retrieved for {instance_mask}/{instance_type} rule)'.format(instance_image=allowed_instance["instance_mask_ami"],
pipe_log('Instance image was not provided explicitly, {instance_image} will be used (retrieved for {instance_mask}/{instance_type} rule)'.format(instance_image=allowed_instance["instance_mask_ami"],
instance_mask=allowed_instance["instance_mask"],
instance_type=ins_type))
else:
Expand All @@ -1601,13 +1602,13 @@ def main():
ins_id, ins_ip = verify_run_id(ec2, run_id)
if not ins_id:
ins_id, ins_ip = check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id,
instance_tags)
custom_tags)

if not ins_id:
ins_id, ins_ip = run_instance(api_url, api_token, api_user, bid_price, ec2, aws_region, ins_hdd, kms_encyr_key_id, ins_img, ins_platform, ins_key, ins_type, is_spot,
num_rep, run_id, pool_id, time_rep, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, api,
global_distribution_url, pre_pull_images, instance_additional_spec,
availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network, instance_tags)
availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network, custom_tags)

check_instance(ec2, ins_id, run_id, num_rep, time_rep, api)

Expand Down

0 comments on commit d35113c

Please sign in to comment.