Skip to content
Permalink
Browse files

AWS Lambda Deployment misc improvements (#407)

* Delete deployment record when it failed to create related cloud resources

* Improved error message
  • Loading branch information
parano committed Nov 28, 2019
1 parent b646ec3 commit 0e04540a340d71301afb24d5c4129282c41df448
@@ -303,17 +303,24 @@ def create(
'timeout': timeout,
}
yatai_service = get_yatai_service()
result = create_deployment(
name,
namespace,
bento_name,
bento_version,
platform,
operator_spec,
parse_key_value_pairs(labels),
parse_key_value_pairs(annotations),
yatai_service,
)
try:
result = create_deployment(
name,
namespace,
bento_name,
bento_version,
platform,
operator_spec,
parse_key_value_pairs(labels),
parse_key_value_pairs(annotations),
yatai_service,
)
except BentoMLException as e:
_echo(
'Failed to create deployment {}.: {}'.format(name, str(e)),
CLI_COLOR_ERROR,
)
return

if result.status.status_code != status_pb2.Status.OK:
_echo(
@@ -44,7 +44,6 @@
from bentoml.exceptions import BentoMLException
from bentoml.proto.deployment_pb2 import (
ApplyDeploymentResponse,
Deployment,
DeploymentState,
DescribeDeploymentResponse,
DeleteDeploymentResponse,
@@ -307,31 +306,19 @@ def _apply(
)
return ApplyDeploymentResponse(status=Status.INTERNAL(str(e)))

logger.info('Finish deployed lambda project, fetching latest status')
res_deployment_pb = Deployment(state=DeploymentState())
res_deployment_pb.CopyFrom(deployment_pb)
state = self.describe(res_deployment_pb, yatai_service).state
res_deployment_pb.state.CopyFrom(state)
deployment_pb.state.state = DeploymentState.PENDING
return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb)
except BentoMLException as error:
deployment_pb.state = DeploymentState(
state=DeploymentState.ERROR, error_message='Error: {}'.format(error)
)
return ApplyDeploymentResponse(
status=Status.OK(), deployment=res_deployment_pb
status=exception_to_return_status(error), deployment=deployment_pb
)

except BentoMLException as error:
return ApplyDeploymentResponse(status=exception_to_return_status(error))

def delete(self, deployment_pb, yatai_service):
try:
logger.debug('Deleting AWS Lambda deployment')
describe_state_result = self.describe(deployment_pb, yatai_service).state
if describe_state_result.state != DeploymentState.RUNNING:
message = (
'Failed to delete, no active deployment {name}. '
'The current state is {state}'.format(
name=deployment_pb.name,
state=DeploymentState.State.Name(describe_state_result.state),
)
)
return DeleteDeploymentResponse(status=Status.ABORTED(message))

deployment_spec = deployment_pb.spec
lambda_deployment_config = deployment_spec.aws_lambda_operator_config
@@ -340,10 +327,11 @@ def delete(self, deployment_pb, yatai_service):
stack_name = generate_aws_compatible_string(
deployment_pb.namespace + '-' + deployment_pb.name
)
deployment_info_json = json.loads(describe_state_result.info_json)
bucket_name = deployment_info_json.get('s3_bucket')
if bucket_name:
_cleanup_s3_bucket(bucket_name, lambda_deployment_config.region)
if deployment_pb.state.info_json:
deployment_info_json = json.loads(deployment_pb.state.info_json)
bucket_name = deployment_info_json.get('s3_bucket')
if bucket_name:
_cleanup_s3_bucket(bucket_name, lambda_deployment_config.region)

logger.debug(
'Deleting AWS CloudFormation: %s that includes Lambda function '
@@ -33,10 +33,10 @@ def ensure_sam_available_or_raise():
try:
import samcli

if list(map(int, samcli.__version__.split('.'))) < [0, 33, 1]:
if samcli.__version__ != '0.33.1':
raise BentoMLException(
'aws-sam-cli package requires version 0.33.1 or '
'higher. Update the package with `pip install -U aws-sam-cli`'
'aws-sam-cli package requires version 0.33.1 '
'Install the package with `pip install -U aws-sam-cli==0.33.1`'
)
except ImportError:
raise ImportError(
@@ -172,12 +172,12 @@ def init_sam_project(
requirement_txt_path = os.path.join(bento_service_bundle_path, 'requirements.txt')
shutil.copy(requirement_txt_path, function_path)

# Copy bundled pip dependencies
logger.debug('Coping bundled_dependencies')
bundled_dep_path = os.path.join(
bento_service_bundle_path, 'bundled_pip_dependencies'
)
if os.path.isdir(bundled_dep_path):
# Copy bundled pip dependencies
logger.debug('Coping bundled_dependencies')
shutil.copytree(
bundled_dep_path, os.path.join(function_path, 'bundled_pip_dependencies')
)
@@ -70,7 +70,7 @@ def ensure_docker_available_or_raise():
subprocess.check_output(['docker', 'info'])
except subprocess.CalledProcessError as error:
raise BentoMLException(
'Error executing docker command: {}'.format(error.output)
'Error executing docker command: {}'.format(error.output.decode())
)
except not_found_error:
raise BentoMLMissingDependencyException(
@@ -213,90 +213,98 @@ def create_deployment(

yatai_service = get_yatai_service()

try:
# Make sure there is no active deployment with the same deployment name
get_deployment_pb = yatai_service.GetDeployment(
GetDeploymentRequest(deployment_name=deployment_name, namespace=namespace)
# Make sure there is no active deployment with the same deployment name
get_deployment_pb = yatai_service.GetDeployment(
GetDeploymentRequest(deployment_name=deployment_name, namespace=namespace)
)
if get_deployment_pb.status.status_code == status_pb2.Status.OK:
raise BentoMLDeploymentException(
'Deployment "{name}" already existed, use Update or Apply for updating '
'existing deployment, delete the deployment, or use a different deployment '
'name'.format(name=deployment_name)
)
if get_deployment_pb.status.status_code == status_pb2.Status.OK:
raise BentoMLDeploymentException(
'Deployment "{name}" already existed, use Update or Apply for updating'
'existing deployment, or create the deployment with a different name or'
'under a different deployment namespace'.format(name=deployment_name)
)
if get_deployment_pb.status.status_code != status_pb2.Status.NOT_FOUND:
raise BentoMLDeploymentException(
'Failed accesing YataiService deployment store. {error_code}:'
'{error_message}'.format(
error_code=Status.Name(get_deployment_pb.status.status_code),
error_message=get_deployment_pb.status.error_message,
)
if get_deployment_pb.status.status_code != status_pb2.Status.NOT_FOUND:
raise BentoMLDeploymentException(
'Failed accesing YataiService deployment store. {error_code}:'
'{error_message}'.format(
error_code=Status.Name(get_deployment_pb.status.status_code),
error_message=get_deployment_pb.status.error_message,
)
)

deployment_dict = {
"name": deployment_name,
"namespace": namespace or config().get('deployment', 'default_namespace'),
"labels": labels,
"annotations": annotations,
"spec": {
"bento_name": bento_name,
"bento_version": bento_version,
"operator": platform,
},
deployment_dict = {
"name": deployment_name,
"namespace": namespace or config().get('deployment', 'default_namespace'),
"labels": labels,
"annotations": annotations,
"spec": {
"bento_name": bento_name,
"bento_version": bento_version,
"operator": platform,
},
}

operator = platform.replace('-', '_').upper()
try:
operator_value = DeploymentSpec.DeploymentOperator.Value(operator)
except ValueError:
return ApplyDeploymentResponse(
status=Status.INVALID_ARGUMENT('Invalid platform "{}"'.format(platform))
)
if operator_value == DeploymentSpec.AWS_SAGEMAKER:
deployment_dict['spec']['sagemaker_operator_config'] = {
'region': operator_spec.get('region')
or config().get('aws', 'default_region'),
'instance_count': operator_spec.get('instance_count')
or config().getint('sagemaker', 'default_instance_count'),
'instance_type': operator_spec.get('instance_type')
or config().get('sagemaker', 'default_instance_type'),
'api_name': operator_spec.get('api_name', ''),
}
elif operator_value == DeploymentSpec.AWS_LAMBDA:
deployment_dict['spec']['aws_lambda_operator_config'] = {
'region': operator_spec.get('region')
or config().get('aws', 'default_region')
}
for field in ['api_name', 'memory_size', 'timeout']:
if operator_spec.get(field):
deployment_dict['spec']['aws_lambda_operator_config'][
field
] = operator_spec[field]
elif operator_value == DeploymentSpec.GCP_FCUNTION:
deployment_dict['spec']['gcp_function_operatorConfig'] = {
'region': operator_spec.get('region')
or config().get('google-cloud', 'default_region')
}
if operator_spec.get('api_name'):
deployment_dict['spec']['gcp_function_operator_config'][
'api_name'
] = operator_spec['api_name']
elif operator_value == DeploymentSpec.KUBERNETES:
deployment_dict['spec']['kubernetes_operator_config'] = {
'kube_namespace': operator_spec.get('kube_namespace', ''),
'replicas': operator_spec.get('replicas', 0),
'service_name': operator_spec.get('service_name', ''),
'service_type': operator_spec.get('service_type', ''),
}
else:
raise BentoMLDeploymentException(
'Platform "{}" is not supported in the current version of '
'BentoML'.format(platform)
)

operator = platform.replace('-', '_').upper()
try:
operator_value = DeploymentSpec.DeploymentOperator.Value(operator)
except ValueError:
return ApplyDeploymentResponse(
status=Status.INVALID_ARGUMENT('Invalid platform "{}"'.format(platform))
)
if operator_value == DeploymentSpec.AWS_SAGEMAKER:
deployment_dict['spec']['sagemaker_operator_config'] = {
'region': operator_spec.get('region')
or config().get('aws', 'default_region'),
'instance_count': operator_spec.get('instance_count')
or config().getint('sagemaker', 'default_instance_count'),
'instance_type': operator_spec.get('instance_type')
or config().get('sagemaker', 'default_instance_type'),
'api_name': operator_spec.get('api_name', ''),
}
elif operator_value == DeploymentSpec.AWS_LAMBDA:
deployment_dict['spec']['aws_lambda_operator_config'] = {
'region': operator_spec.get('region')
or config().get('aws', 'default_region')
}
for field in ['api_name', 'memory_size', 'timeout']:
if operator_spec.get(field):
deployment_dict['spec']['aws_lambda_operator_config'][
field
] = operator_spec[field]
elif operator_value == DeploymentSpec.GCP_FCUNTION:
deployment_dict['spec']['gcp_function_operatorConfig'] = {
'region': operator_spec.get('region')
or config().get('google-cloud', 'default_region')
}
if operator_spec.get('api_name'):
deployment_dict['spec']['gcp_function_operator_config'][
'api_name'
] = operator_spec['api_name']
elif operator_value == DeploymentSpec.KUBERNETES:
deployment_dict['spec']['kubernetes_operator_config'] = {
'kube_namespace': operator_spec.get('kube_namespace', ''),
'replicas': operator_spec.get('replicas', 0),
'service_name': operator_spec.get('service_name', ''),
'service_type': operator_spec.get('service_type', ''),
}
else:
raise BentoMLDeploymentException(
'Platform "{}" is not supported in the current version of '
'BentoML'.format(platform)
)
apply_response = apply_deployment(deployment_dict, yatai_service)

return apply_deployment(deployment_dict, yatai_service)
except BentoMLException as error:
return ApplyDeploymentResponse(status=Status.INTERNAL(str(error)))
if apply_response.status.status_code == status_pb2.Status.OK:
describe_response = describe_deployment(
deployment_name, namespace, yatai_service
)
if describe_response.status.status_code == status_pb2.Status.OK:
deployment_state = describe_response.state
apply_response.deployment.state.CopyFrom(deployment_state)
return apply_response

return apply_response


# TODO update_deployment is not finished. It will be working on along with cli command
@@ -117,8 +117,32 @@ def ApplyDeployment(self, request, context=None):
# deploying to target platform
response = operator.apply(request.deployment, self, previous_deployment)

# update deployment state
self.deployment_store.insert_or_update(response.deployment)
if response.status.status_code == status_pb2.Status.OK:
# update deployment state
if response and response.deployment:
self.deployment_store.insert_or_update(response.deployment)
else:
raise BentoMLException(
"DeploymentOperator Internal Error: Invalid Response"
)
logger.info(
"ApplyDeployment (%s, namespace %s) succeeded",
request.deployment.name,
request.deployment.namespace,
)
else:
if not previous_deployment:
# When failed to create the deployment, delete it from active
# deployments records
self.deployment_store.delete(
request.deployment.name, request.deployment.namespace
)
logger.debug(
"ApplyDeployment (%s, namespace %s) failed: %s",
request.deployment.name,
request.deployment.namespace,
response.status.error_message,
)

return response

@@ -299,6 +323,7 @@ def DangerouslyDeleteBento(self, request, context=None):
self.bento_metadata_store.dangerously_delete(
request.bento_name, request.bento_version
)
self.repo.dangerously_delete(request.bento_name, request.bento_version)
except BentoMLException as e:
logger.error("INTERNAL ERROR: %s", e)
return DangerouslyDeleteBentoResponse(status=Status.INTERNAL(str(e)))

0 comments on commit 0e04540

Please sign in to comment.
You can’t perform that action at this time.