Skip to content

Commit

Permalink
Improved fleet error handling + smaller fixes (#388)
Browse files Browse the repository at this point in the history
* Improved error handing

* Version bump

* Fix tests

* Enforce newer Benchmark

* Revert idle timeout parameters
  • Loading branch information
alexander-veit committed Mar 22, 2023
1 parent 85b27b6 commit d9da66d
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 172 deletions.
10 changes: 5 additions & 5 deletions awsf3/cloudwatch_agent_config.json
@@ -1,6 +1,6 @@
{
"agent": {
"metrics_collection_interval": 60,
"metrics_collection_interval": 120,
"run_as_user": "root",
"logfile": "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log",
"debug": true
Expand All @@ -19,15 +19,15 @@
"measurement": [
"usage_active"
],
"metrics_collection_interval": 60,
"metrics_collection_interval": 120,
"totalcpu": true
},
"disk": {
"measurement": [
"used",
"used_percent"
],
"metrics_collection_interval": 60,
"metrics_collection_interval": 120,
"resources": [
"/mnt/data1"
]
Expand All @@ -36,7 +36,7 @@
"measurement": [
"read_bytes"
],
"metrics_collection_interval": 60,
"metrics_collection_interval": 120,
"resources": [
"*"
]
Expand All @@ -47,7 +47,7 @@
"used",
"available"
],
"metrics_collection_interval": 60
"metrics_collection_interval": 120
}
}
}
Expand Down
242 changes: 118 additions & 124 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tibanna"
version = "3.2.2"
version = "3.3.0"
description = "Tibanna runs portable pipelines (in CWL/WDL) on the AWS Cloud."
authors = ["4DN-DCIC Team <support@4dnucleome.org>"]
license = "MIT"
Expand Down Expand Up @@ -32,7 +32,7 @@ python-lambda-4dn = "0.12.3"
boto3 = "^1.9.0"
botocore = "^1.12.1"
requests = "2.27.1"
Benchmark-4dn = "^0.5.8"
Benchmark-4dn = "^0.5.21"
tomlkit = "^0.11.0"

[tool.poetry.dev-dependencies]
Expand Down
29 changes: 23 additions & 6 deletions tests/tibanna/unicorn/test_ec2_utils.py
Expand Up @@ -656,7 +656,7 @@ def test_launch_and_get_instance_id(test_delete_fleet, test_create_fleet, test_c
execution = Execution(input_dict, dryrun=True)
with pytest.raises(Exception) as ex:
execution.launch_and_get_instance_id()
assert 'Failed to launch instance for job' in str(ex.value)
assert 'Unexpected result from create_fleet command' in str(ex.value)

test_create_fleet.return_value = {
"FleetId": "fid",
Expand Down Expand Up @@ -693,6 +693,23 @@ def test_launch_and_get_instance_id(test_delete_fleet, test_create_fleet, test_c
assert execution.cfg.behavior_on_capacity_limit == 'fail'
assert execution.cfg.spot_instance == False

test_create_fleet.return_value = {
"FleetId": "fid",
"Instances": [],
"Errors":[{
"ErrorCode": "InvalidFleetConfiguration",
"ErrorMessage": "InvalidFleetConfiguration"
},
{
"ErrorCode": "InvalidFleetConfiguration",
"ErrorMessage": "InvalidFleetConfiguration"
}]
}
execution = Execution(input_dict, dryrun=True)
with pytest.raises(Exception) as ex:
execution.launch_and_get_instance_id()
assert 'Invalid fleet configuration' in str(ex.value)


def test_create_fleet_spec():
"""Test fleet specs for different input configs"""
Expand Down Expand Up @@ -820,15 +837,15 @@ def test_instance_types():
input_dict['config']['cpu'] = 1
input_dict['config']['mem_as_is'] = True
execution = Execution(input_dict, dryrun=True)
possible_instance_types = ['t3.small', 't3.medium', 't3.large', 'c5.large', 'm5a.large']
assert len(execution.instance_type_list) == 5
possible_instance_types = ['t3.small', 't3.medium', 'c6a.large', 't3.large', 'c6i.large', 'c5.large', 'm5a.large', 'm6a.large']
assert len(execution.instance_type_list) == 8
for instance in possible_instance_types:
assert instance in execution.instance_type_list

input_dict['config']['EBS_optimized'] = False
execution = Execution(input_dict, dryrun=True)
possible_instance_types = ['t3.small', 't2.small', 't3.medium', 't2.medium', 'm3.medium', 't3.large', 'c5.large',
'm5a.large', 'm1.medium', 't2.large']
possible_instance_types = ['t3.small', 't2.small', 't3.medium', 't2.medium', 'c6a.large', 't3.large', 'c6i.large', 'c5.large', 'm5a.large', 'm6a.large']

for instance in possible_instance_types:
assert instance in execution.instance_type_list

Expand All @@ -849,7 +866,7 @@ def test_instance_types():
'mem': 2, 'cpu': 1
}
execution = Execution(input_dict, dryrun=True)
assert len(execution.instance_type_list) == 11
assert len(execution.instance_type_list) == 1 # instance_type has priority


def test_upload_workflow_to_s3(run_task_awsem_event_cwl_upload):
Expand Down
8 changes: 8 additions & 0 deletions tibanna/check_task.py
Expand Up @@ -77,6 +77,8 @@ def run(self):
if does_key_exist(bucket_name, job_aborted):
try:
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json)
# Instance should already be terminated here. Sending a second signal just in case
boto3.client('ec2').terminate_instances(InstanceIds=[instance_id])
except Exception as e:
logger.warning("error occurred while handling postrun json but continuing. %s" % str(e))
raise JobAbortedException("job aborted")
Expand All @@ -87,6 +89,10 @@ def run(self):
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json)
except Exception as e:
logger.warning("error occurred while handling postrun json but continuing. %s" % str(e))

# Instance should already be terminated here. Sending a second signal just in case
boto3.client('ec2').terminate_instances(InstanceIds=[instance_id])

eh = AWSEMErrorHandler()
if 'custom_errors' in self.input_json['args']:
eh.add_custom_errors(self.input_json['args']['custom_errors'])
Expand All @@ -102,6 +108,8 @@ def run(self):
if does_key_exist(bucket_name, job_success):
self.handle_postrun_json(bucket_name, jobid, self.input_json, public_read=public_postrun_json)
print("completed successfully")
# Instance should already be terminated here. Sending a second signal just in case
boto3.client('ec2').terminate_instances(InstanceIds=[instance_id])
return self.input_json

# checking if instance is terminated for no reason
Expand Down
33 changes: 19 additions & 14 deletions tibanna/cw_utils.py
Expand Up @@ -8,7 +8,7 @@
from .top import Top
from .vars import (
AWS_REGION,
EBS_MOUNT_POINT,
METRICS_COLLECTION_INTERVAL,
S3_ENCRYT_KEY_ID
)
from datetime import datetime
Expand Down Expand Up @@ -219,7 +219,7 @@ def max_memory_utilization_all_pts(self):
Dimensions=[{
'Name': 'InstanceId', 'Value': self.instance_id
}],
Period=60,
Period=METRICS_COLLECTION_INTERVAL,
Statistics=['Maximum'],
StartTime=self.starttime,
EndTime=self.endtime,
Expand All @@ -235,7 +235,7 @@ def max_memory_used_all_pts(self):
Dimensions=[{
'Name': 'InstanceId', 'Value': self.instance_id
}],
Period=60,
Period=METRICS_COLLECTION_INTERVAL,
Statistics=['Maximum'],
StartTime=self.starttime,
EndTime=self.endtime,
Expand All @@ -251,7 +251,7 @@ def min_memory_available_all_pts(self):
Dimensions=[{
'Name': 'InstanceId', 'Value': self.instance_id
}],
Period=60,
Period=METRICS_COLLECTION_INTERVAL,
Statistics=['Minimum'],
StartTime=self.starttime,
EndTime=self.endtime,
Expand All @@ -267,7 +267,7 @@ def max_cpu_utilization_all_pts(self):
Dimensions=[{
'Name': 'InstanceId', 'Value': self.instance_id
}],
Period=60,
Period=METRICS_COLLECTION_INTERVAL,
Statistics=['Maximum'],
StartTime=self.starttime,
EndTime=self.endtime,
Expand All @@ -283,7 +283,7 @@ def max_disk_space_utilization_all_pts(self):
Dimensions=[{
'Name': 'InstanceId', 'Value': self.instance_id
}],
Period=60,
Period=METRICS_COLLECTION_INTERVAL,
Statistics=['Maximum'],
StartTime=self.starttime,
EndTime=self.endtime,
Expand All @@ -299,7 +299,7 @@ def max_disk_space_used_all_pts(self):
Dimensions=[{
'Name': 'InstanceId', 'Value': self.instance_id
}],
Period=60,
Period=METRICS_COLLECTION_INTERVAL,
Statistics=['Maximum'],
StartTime=self.starttime,
EndTime=self.endtime,
Expand All @@ -315,7 +315,7 @@ def max_ebs_read_used_all_pts(self):
Dimensions=[{
'Name': 'InstanceId', 'Value': self.instance_id
}],
Period=60*5,
Period=METRICS_COLLECTION_INTERVAL,
Statistics=['Average'],
StartTime=self.starttime,
EndTime=self.endtime,
Expand Down Expand Up @@ -429,6 +429,7 @@ def write_html(self, instance_type, directory):
'---', # cost placeholder for now
cost_estimate, self.cost_estimate_type,
str(self.start), str(self.end), str(self.end - self.start),
METRICS_COLLECTION_INTERVAL,
max_mem_used_MB_js, min_mem_available_MB_js, max_disk_space_used_GB_js,
max_mem_utilization_percent_js, max_disk_space_utilization_percent_js, max_cpu_utilization_percent_js,
cpu_columns_js, cpu_data_js,
Expand Down Expand Up @@ -490,6 +491,7 @@ def update_html(cls, bucket, prefix, directory='.', upload_new=True):
cost,
estimated_cost, cost_estimate_type,
str(starttime), str(endtime), str(endtime-starttime),
METRICS_COLLECTION_INTERVAL,
max_mem_used_MB_js, min_mem_available_MB_js, max_disk_space_used_GB_js,
max_mem_utilization_percent_js, max_disk_space_utilization_percent_js, max_cpu_utilization_percent_js,
cpu_columns_js, cpu_data_js,
Expand Down Expand Up @@ -811,6 +813,9 @@ def create_html(cls):
//window.onload = function(){
// window.addEventListener('resize', onResize);
//}
const TIME_SCALING_FACTOR=Math.round(%s/60);
/* Functions definition */
function make_x_gridlines(x, n) {
var n_l = 0
Expand Down Expand Up @@ -863,24 +868,24 @@ def create_html(cls):
var n_cpu = data_cpu.length;
// X scale will use the index of our data
var xScale = d3.scaleLinear()
.domain([0, n]) // input
.domain([0, TIME_SCALING_FACTOR * n]) // input
.range([0, width]); // output
// X scale for CPU utilization
var xScale_cpu = d3.scaleLinear()
.domain([0, n_cpu]) // input
.domain([0, TIME_SCALING_FACTOR * n_cpu]) // input
.range([0, width*(n_cpu)/(n)]); // output
// Y scale will use the randomly generate number
var yScale = d3.scaleLinear()
.domain([0, 100]) // input
.range([height, 0]); // output
// d3's line generator
var line = d3.line()
.x(function(d, i) { return xScale(i) + xScale(1); }) // set the x values for the line generator
.x(function(d, i) { return TIME_SCALING_FACTOR * (xScale(i)+xScale(1)); }) // set the x values for the line generator
.y(function(d) { return yScale(d.y); }) // set the y values for the line generator
//.curve(d3.curveMonotoneX) // apply smoothing to the line
// d3's line generator for CPU utilization
var line_cpu = d3.line()
.x(function(d, i) { return xScale_cpu(i) + xScale(1); }) // set the x values for the line generator
.x(function(d, i) { return TIME_SCALING_FACTOR * (xScale_cpu(i)+xScale_cpu(1)); }) // set the x values for the line generator
.y(function(d) { return yScale(d.y); }) // set the y values for the line generator
//.curve(d3.curveMonotoneX) // apply smoothing to the line
// An array of objects of length N. Each object has key -> value pair, the key being "y" and the value is a random number
Expand Down Expand Up @@ -965,15 +970,15 @@ def create_html(cls):
}
// X scale will use the index of our data
var xScale = d3.scaleLinear()
.domain([0, n]) // input
.domain([0, TIME_SCALING_FACTOR * n]) // input
.range([0, width]); // output
// Y scale will use the randomly generate number
var yScale = d3.scaleLinear()
.domain([0, d3.max(data)]) // input
.range([height, 0]); // output
// d3's line generator
var line = d3.line()
.x(function(d, i) { return xScale(i) + xScale(1); }) // set the x values for the line generator
.x(function(d, i) { return TIME_SCALING_FACTOR * (xScale(i)+xScale(1)); }) // set the x values for the line generator
.y(function(d) { return yScale(d.y); }) // set the y values for the line generator
//.curve(d3.curveMonotoneX) // apply smoothing to the line
// An array of objects of length N. Each object has key -> value pair, the key being "y" and the value is a random number
Expand Down
51 changes: 33 additions & 18 deletions tibanna/ec2_utils.py
Expand Up @@ -409,7 +409,7 @@ def create_instance_type_list(self):
'EBS_optimized': self.cfg.EBS_optimized})

# user specified mem and cpu - use the benchmark package to retrieve instance types
if self.cfg.mem and self.cfg.cpu:
elif self.cfg.mem and self.cfg.cpu:
mem = self.cfg.mem if self.cfg.mem_as_is else self.cfg.mem + 1
list0 = get_instance_types(self.cfg.cpu, mem, instance_list(exclude_t=False))
current_list = [i['instance_type'] for i in instance_type_dlist]
Expand Down Expand Up @@ -549,31 +549,48 @@ def launch_and_get_instance_id(self):
return instance_id

elif 'Errors' in fleet_result and len(fleet_result['Errors']) > 0:
error_code = fleet_result['Errors'][0]['ErrorCode']
error_msg = fleet_result['Errors'][0]['ErrorMessage']

error_codes = list(map(lambda err: err['ErrorCode'], fleet_result['Errors']))
error_msgs = list(map(lambda err: err['ErrorMessage'], fleet_result['Errors']))

num_unique_errors = len(set(error_codes))

self.delete_fleet(fleet_result['FleetId'])

if ('InsufficientInstanceCapacity' in error_code or 'InstanceLimitExceeded' in error_code
or 'is not supported in your requested Availability Zone' in error_msg
or 'UnfulfillableCapacity' in error_code):

if 'InvalidLaunchTemplate' in error_codes and invalid_launch_template_retries < 5:
invalid_launch_template_retries += 1
logger.info(f"LaunchTemplate not found. Retry #{invalid_launch_template_retries}")
# continue without creating a new launch template
continue

elif 'InvalidLaunchTemplate' in error_codes and invalid_launch_template_retries >= 5:
self.delete_launch_template()
raise Exception(f"InvalidLaunchTemplate. Result from create_fleet command: {json.dumps(fleet_result)}")

elif num_unique_errors == 1 and 'InvalidFleetConfiguration' in error_codes:
# This error code includes the "Your requested instance type (xxx) is not supported in your requested Availability Zone (xxx)" error
# In this case there must be an issue with the general setup, otherwise we would get additional error codes, e.g., InsufficientInstanceCapacity
self.delete_launch_template()
raise Exception(f"Invalid fleet configuration. Result from create_fleet command: {json.dumps(fleet_result)}")

elif 'InsufficientInstanceCapacity' in error_codes or 'InstanceLimitExceeded' in error_codes or 'UnfulfillableCapacity' in error_codes:
# We ignore the 'InvalidFleetConfiguration' error here
behavior = self.cfg.behavior_on_capacity_limit
if behavior == 'fail':
self.delete_launch_template()
msg = "Instance limit exception - use 'behavior_on_capacity_limit' option " + \
"to change the behavior to wait_and_retry, " + \
"or retry_without_spot. %s" % error_msg
msg = "Instance limit exception - use 'behavior_on_capacity_limit' option to change the behavior to wait_and_retry, or retry_without_spot. Errors: "
msg += "; ".join(error_msgs)
raise EC2InstanceLimitException(msg)
elif behavior == 'wait_and_retry' or behavior == 'other_instance_types': # 'other_instance_types' is there for backwards compatibility
self.delete_launch_template()
msg = "Instance limit exception - wait and retry later: %s" % error_msg
msg = "Instance limit exception - wait and retry later. Errors: "
msg += "; ".join(error_msgs)
raise EC2InstanceLimitWaitException(msg)
elif behavior == 'retry_without_spot':
if not self.cfg.spot_instance:
self.delete_launch_template()
msg = "'behavior_on_capacity_limit': 'retry_without_spot' works only with " + \
"'spot_instance' : true. %s" % error_msg
msg = "'behavior_on_capacity_limit': 'retry_without_spot' works only with 'spot_instance' : true. Errors: "
msg += "; ".join(error_msgs)
raise Exception(msg)
else:
self.cfg.spot_instance = False
Expand All @@ -582,13 +599,11 @@ def launch_and_get_instance_id(self):
self.cfg.behavior_on_capacity_limit = 'fail'
logger.info("trying without spot...")
continue
elif 'InvalidLaunchTemplate' in error_code and invalid_launch_template_retries < 5:
invalid_launch_template_retries += 1
logger.info(f"LaunchTemplate not found. Retry #{invalid_launch_template_retries}")
continue

else:
self.delete_launch_template()
raise Exception(f"Failed to launch instance for job {self.jobid}. {error_code}: {error_msg}")
raise Exception(f"Unexpected result from create_fleet command: {json.dumps(fleet_result)}")

else:
self.delete_launch_template()
self.delete_fleet(fleet_result['FleetId'])
Expand Down
2 changes: 1 addition & 1 deletion tibanna/lambdas/requirements.txt
@@ -1,4 +1,4 @@
Benchmark-4dn>=0.5.8
Benchmark-4dn>=0.5.21
boto3>=1.9.0
botocore>=1.12.1
tomlkit>=0.11.0

0 comments on commit d9da66d

Please sign in to comment.