diff --git a/bentoml/yatai/client/deployment_api.py b/bentoml/yatai/client/deployment_api.py index 9aa19be9c8a..97c298dadfb 100644 --- a/bentoml/yatai/client/deployment_api.py +++ b/bentoml/yatai/client/deployment_api.py @@ -32,7 +32,7 @@ ) from bentoml.exceptions import BentoMLException, YataiDeploymentException from bentoml.yatai.proto import status_pb2 -from bentoml.yatai.validator import validate_deployment_pb_schema +from bentoml.yatai.validator import validate_deployment_pb from bentoml.yatai.deployment_utils import ( deployment_yaml_string_to_pb, deployment_dict_to_pb, @@ -120,7 +120,7 @@ def create(self, deployment_info, wait): ) ) - validation_errors = validate_deployment_pb_schema(deployment_pb) + validation_errors = validate_deployment_pb(deployment_pb) if validation_errors: raise YataiDeploymentException( f'Failed to validate deployment {deployment_pb.name}: ' @@ -168,7 +168,7 @@ def apply(self, deployment_info, wait): ) ) - validation_errors = validate_deployment_pb_schema(deployment_pb) + validation_errors = validate_deployment_pb(deployment_pb) if validation_errors: raise YataiDeploymentException( f'Failed to validate deployment {deployment_pb.name}: ' diff --git a/bentoml/yatai/deployment/aws_lambda/__init__.py b/bentoml/yatai/deployment/aws_lambda/__init__.py index 38ad75f41c0..4b378e85c85 100644 --- a/bentoml/yatai/deployment/aws_lambda/__init__.py +++ b/bentoml/yatai/deployment/aws_lambda/__init__.py @@ -11,535 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import json -import os -import logging -import shutil -import uuid -from pathlib import Path - -from botocore.exceptions import ClientError - -import boto3 -from ruamel.yaml import YAML - -from bentoml.saved_bundle import loader -from bentoml.yatai.deployment.aws_lambda.utils import ( - ensure_sam_available_or_raise, - init_sam_project, - lambda_deploy, - lambda_package, - validate_lambda_template, - reduce_bundle_size_and_upload_extra_resources_to_s3, - total_file_or_directory_size, - LAMBDA_FUNCTION_LIMIT, - LAMBDA_FUNCTION_MAX_LIMIT, - FAILED_CLOUDFORMATION_STACK_STATUS, -) -from bentoml.yatai.deployment.operator import DeploymentOperatorBase -from bentoml.yatai.deployment.utils import ( - ensure_docker_available_or_raise, - generate_aws_compatible_string, - raise_if_api_names_not_found_in_bento_service_metadata, - get_default_aws_region, -) -from bentoml.exceptions import ( - BentoMLException, - InvalidArgument, - YataiDeploymentException, -) -from bentoml.yatai.proto import status_pb2 -from bentoml.yatai.proto.deployment_pb2 import ( - ApplyDeploymentResponse, - DeploymentState, - DescribeDeploymentResponse, - DeleteDeploymentResponse, -) -from bentoml.yatai.proto.repository_pb2 import GetBentoRequest, BentoUri -from bentoml.utils import status_pb_to_error_code_and_message -from bentoml.utils.s3 import create_s3_bucket_if_not_exists -from bentoml.utils.tempdir import TempDirectory -from bentoml.yatai.status import Status - - -logger = logging.getLogger(__name__) - - -def _create_aws_lambda_cloudformation_template_file( - project_dir, - namespace, - deployment_name, - deployment_path_prefix, - api_names, - bento_service_name, - s3_bucket_name, - py_runtime, - memory_size, - timeout, -): - template_file_path = os.path.join(project_dir, 'template.yaml') - yaml = YAML() - sam_config = { - 'AWSTemplateFormatVersion': '2010-09-09', - 'Transform': 'AWS::Serverless-2016-10-31', - 'Globals': { - 'Function': {'Timeout': timeout, 'Runtime': py_runtime}, - 'Api': { - 'BinaryMediaTypes': ['image~1*'], - 'Cors': {'AllowOrigin': "'*'"}, - 'Auth': { - 'ApiKeyRequired': False, - 'DefaultAuthorizer': 'NONE', - 'AddDefaultAuthorizerToCorsPreflight': False, - }, - }, - }, - 'Resources': {}, - 'Outputs': { - 'S3Bucket': { - 'Value': s3_bucket_name, - 'Description': 'S3 Bucket for saving artifacts and lambda bundle', - } - }, - } - for api_name in api_names: - sam_config['Resources'][api_name] = { - 'Type': 'AWS::Serverless::Function', - 'Properties': { - 'Runtime': py_runtime, - 'CodeUri': deployment_name + '/', - 'Handler': 'app.{}'.format(api_name), - 'FunctionName': f'{namespace}-{deployment_name}-{api_name}', - 'Timeout': timeout, - 'MemorySize': memory_size, - 'Events': { - 'Api': { - 'Type': 'Api', - 'Properties': { - 'Path': '/{}'.format(api_name), - 'Method': 'post', - }, - } - }, - 'Policies': [{'S3ReadPolicy': {'BucketName': s3_bucket_name}}], - 'Environment': { - 'Variables': { - 'BENTOML_BENTO_SERVICE_NAME': bento_service_name, - 'BENTOML_API_NAME': api_name, - 'BENTOML_S3_BUCKET': s3_bucket_name, - 'BENTOML_DEPLOYMENT_PATH_PREFIX': deployment_path_prefix, - } - }, - }, - } - - yaml.dump(sam_config, Path(template_file_path)) - - # We add Outputs section separately, because the value should not - # have "'" around !Sub - with open(template_file_path, 'a') as f: - f.write( - """\ - EndpointUrl: - Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.\ -amazonaws.com/Prod" - Description: URL for endpoint -""" - ) - return template_file_path - - -def _cleanup_s3_bucket_if_exist(bucket_name, region): - s3_client = boto3.client('s3', region) - s3 = boto3.resource('s3') - try: - logger.debug('Removing all objects inside bucket %s', bucket_name) - s3.Bucket(bucket_name).objects.all().delete() - logger.debug('Deleting bucket %s', bucket_name) - s3_client.delete_bucket(Bucket=bucket_name) - except ClientError as e: - if e.response and e.response['Error']['Code'] == 'NoSuchBucket': - # If there is no bucket, we just let it silently fail, dont have to do - # any thing - return - else: - raise e - - -def _deploy_lambda_function( - deployment_pb, - bento_service_metadata, - deployment_spec, - lambda_s3_bucket, - lambda_deployment_config, - bento_path, -): - deployment_path_prefix = os.path.join(deployment_pb.namespace, deployment_pb.name) - - py_major, py_minor, _ = bento_service_metadata.env.python_version.split('.') - if py_major != '3': - raise BentoMLException('Python 2 is not supported for Lambda Deployment') - python_runtime = 'python{}.{}'.format(py_major, py_minor) - - artifact_types = [item.artifact_type for item in bento_service_metadata.artifacts] - if any( - i in ['TensorflowSavedModelArtifact', 'KerasModelArtifact'] - for i in artifact_types - ) and (py_major, py_minor) != ('3', '6'): - raise BentoMLException( - 'For Tensorflow and Keras model, only python3.6 is ' - 'supported for AWS Lambda deployment' - ) - - api_names = ( - [lambda_deployment_config.api_name] - if lambda_deployment_config.api_name - else [api.name for api in bento_service_metadata.apis] - ) - - raise_if_api_names_not_found_in_bento_service_metadata( - bento_service_metadata, api_names - ) - - with TempDirectory() as lambda_project_dir: - logger.debug( - 'Generating cloudformation template.yaml for lambda project at %s', - lambda_project_dir, - ) - template_file_path = _create_aws_lambda_cloudformation_template_file( - project_dir=lambda_project_dir, - namespace=deployment_pb.namespace, - deployment_name=deployment_pb.name, - deployment_path_prefix=deployment_path_prefix, - api_names=api_names, - bento_service_name=deployment_spec.bento_name, - s3_bucket_name=lambda_s3_bucket, - py_runtime=python_runtime, - memory_size=lambda_deployment_config.memory_size, - timeout=lambda_deployment_config.timeout, - ) - logger.debug('Validating generated template.yaml') - validate_lambda_template( - template_file_path, lambda_deployment_config.region, lambda_project_dir, - ) - logger.debug( - 'Initializing lambda project in directory: %s ...', lambda_project_dir, - ) - init_sam_project( - lambda_project_dir, - bento_path, - deployment_pb.name, - deployment_spec.bento_name, - api_names, - aws_region=lambda_deployment_config.region, - ) - for api_name in api_names: - build_directory = os.path.join( - lambda_project_dir, '.aws-sam', 'build', api_name - ) - logger.debug( - 'Checking is function "%s" bundle under lambda size ' 'limit', api_name, - ) - # Since we only use s3 get object in lambda function, and - # lambda function pack their own boto3/botocore modules, - # we will just delete those modules from function bundle - # directory - delete_list = ['boto3', 'botocore'] - for name in delete_list: - logger.debug('Remove module "%s" from build directory', name) - shutil.rmtree(os.path.join(build_directory, name)) - total_build_dir_size = total_file_or_directory_size(build_directory) - if total_build_dir_size > LAMBDA_FUNCTION_MAX_LIMIT: - raise BentoMLException( - 'Build function size is over 700MB, max size ' - 'capable for AWS Lambda function' - ) - if total_build_dir_size >= LAMBDA_FUNCTION_LIMIT: - logger.debug( - 'Function %s is over lambda size limit, attempting ' 'reduce it', - api_name, - ) - reduce_bundle_size_and_upload_extra_resources_to_s3( - build_directory=build_directory, - region=lambda_deployment_config.region, - s3_bucket=lambda_s3_bucket, - deployment_prefix=deployment_path_prefix, - function_name=api_name, - lambda_project_dir=lambda_project_dir, - ) - else: - logger.debug( - 'Function bundle is within Lambda limit, removing ' - 'download_extra_resources.py file from function bundle' - ) - os.remove(os.path.join(build_directory, 'download_extra_resources.py')) - logger.info('Packaging AWS Lambda project at %s ...', lambda_project_dir) - lambda_package( - lambda_project_dir, - lambda_deployment_config.region, - lambda_s3_bucket, - deployment_path_prefix, - ) - logger.info('Deploying lambda project') - stack_name = generate_aws_compatible_string( - deployment_pb.namespace + '-' + deployment_pb.name - ) - lambda_deploy( - lambda_project_dir, lambda_deployment_config.region, stack_name=stack_name, - ) - - -class AwsLambdaDeploymentOperator(DeploymentOperatorBase): - def add(self, deployment_pb): - try: - deployment_spec = deployment_pb.spec - deployment_spec.aws_lambda_operator_config.region = ( - deployment_spec.aws_lambda_operator_config.region - or get_default_aws_region() - ) - if not deployment_spec.aws_lambda_operator_config.region: - raise InvalidArgument('AWS region is missing') - - ensure_sam_available_or_raise() - ensure_docker_available_or_raise() - bento_pb = self.yatai_service.GetBento( - GetBentoRequest( - bento_name=deployment_spec.bento_name, - bento_version=deployment_spec.bento_version, - ) - ) - if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3): - raise BentoMLException( - 'BentoML currently not support {} repository'.format( - BentoUri.StorageType.Name(bento_pb.bento.uri.type) - ) - ) - - return self._add(deployment_pb, bento_pb, bento_pb.bento.uri.uri) - except BentoMLException as error: - deployment_pb.state.state = DeploymentState.ERROR - deployment_pb.state.error_message = f'Error: {str(error)}' - return ApplyDeploymentResponse( - status=error.status_proto, deployment=deployment_pb - ) - - def _add(self, deployment_pb, bento_pb, bento_path): - if loader._is_remote_path(bento_path): - with loader._resolve_remote_bundle_path(bento_path) as local_path: - return self._add(deployment_pb, bento_pb, local_path) - - deployment_spec = deployment_pb.spec - lambda_deployment_config = deployment_spec.aws_lambda_operator_config - bento_service_metadata = bento_pb.bento.bento_service_metadata - lambda_s3_bucket = generate_aws_compatible_string( - 'btml-{namespace}-{name}-{random_string}'.format( - namespace=deployment_pb.namespace, - name=deployment_pb.name, - random_string=uuid.uuid4().hex[:6].lower(), - ) - ) - try: - create_s3_bucket_if_not_exists( - lambda_s3_bucket, lambda_deployment_config.region - ) - _deploy_lambda_function( - deployment_pb=deployment_pb, - bento_service_metadata=bento_service_metadata, - deployment_spec=deployment_spec, - lambda_s3_bucket=lambda_s3_bucket, - lambda_deployment_config=lambda_deployment_config, - bento_path=bento_path, - ) - return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb) - except BentoMLException as error: - if lambda_s3_bucket and lambda_deployment_config: - _cleanup_s3_bucket_if_exist( - lambda_s3_bucket, lambda_deployment_config.region - ) - raise error - - def update(self, deployment_pb, previous_deployment): - try: - ensure_sam_available_or_raise() - ensure_docker_available_or_raise() - - deployment_spec = deployment_pb.spec - bento_pb = self.yatai_service.GetBento( - GetBentoRequest( - bento_name=deployment_spec.bento_name, - bento_version=deployment_spec.bento_version, - ) - ) - if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3): - raise BentoMLException( - 'BentoML currently not support {} repository'.format( - BentoUri.StorageType.Name(bento_pb.bento.uri.type) - ) - ) - - return self._update( - deployment_pb, previous_deployment, bento_pb, bento_pb.bento.uri.uri - ) - except BentoMLException as error: - deployment_pb.state.state = DeploymentState.ERROR - deployment_pb.state.error_message = f'Error: {str(error)}' - return ApplyDeploymentResponse( - status=error.status_code, deployment_pb=deployment_pb - ) - - def _update(self, deployment_pb, current_deployment, bento_pb, bento_path): - if loader._is_remote_path(bento_path): - with loader._resolve_remote_bundle_path(bento_path) as local_path: - return self._update( - deployment_pb, current_deployment, bento_pb, local_path - ) - updated_deployment_spec = deployment_pb.spec - updated_lambda_deployment_config = ( - updated_deployment_spec.aws_lambda_operator_config - ) - updated_bento_service_metadata = bento_pb.bento.bento_service_metadata - describe_result = self.describe(deployment_pb) - if describe_result.status.status_code != status_pb2.Status.OK: - error_code, error_message = status_pb_to_error_code_and_message( - describe_result.status - ) - raise YataiDeploymentException( - f'Failed fetching Lambda deployment current status - ' - f'{error_code}:{error_message}' - ) - latest_deployment_state = json.loads(describe_result.state.info_json) - if 's3_bucket' in latest_deployment_state: - lambda_s3_bucket = latest_deployment_state['s3_bucket'] - else: - raise BentoMLException( - 'S3 Bucket is missing in the AWS Lambda deployment, please make sure ' - 'it exists and try again' - ) - - _deploy_lambda_function( - deployment_pb=deployment_pb, - bento_service_metadata=updated_bento_service_metadata, - deployment_spec=updated_deployment_spec, - lambda_s3_bucket=lambda_s3_bucket, - lambda_deployment_config=updated_lambda_deployment_config, - bento_path=bento_path, - ) - - return ApplyDeploymentResponse(deployment=deployment_pb, status=Status.OK()) - - def delete(self, deployment_pb): - try: - logger.debug('Deleting AWS Lambda deployment') - - deployment_spec = deployment_pb.spec - lambda_deployment_config = deployment_spec.aws_lambda_operator_config - lambda_deployment_config.region = ( - lambda_deployment_config.region or get_default_aws_region() - ) - if not lambda_deployment_config.region: - raise InvalidArgument('AWS region is missing') - - cf_client = boto3.client('cloudformation', lambda_deployment_config.region) - stack_name = generate_aws_compatible_string( - deployment_pb.namespace, deployment_pb.name - ) - if deployment_pb.state.info_json: - deployment_info_json = json.loads(deployment_pb.state.info_json) - bucket_name = deployment_info_json.get('s3_bucket') - if bucket_name: - _cleanup_s3_bucket_if_exist( - bucket_name, lambda_deployment_config.region - ) - - logger.debug( - 'Deleting AWS CloudFormation: %s that includes Lambda function ' - 'and related resources', - stack_name, - ) - cf_client.delete_stack(StackName=stack_name) - return DeleteDeploymentResponse(status=Status.OK()) - - except BentoMLException as error: - return DeleteDeploymentResponse(status=error.status_proto) - - def describe(self, deployment_pb): - try: - deployment_spec = deployment_pb.spec - lambda_deployment_config = deployment_spec.aws_lambda_operator_config - lambda_deployment_config.region = ( - lambda_deployment_config.region or get_default_aws_region() - ) - if not lambda_deployment_config.region: - raise InvalidArgument('AWS region is missing') - - bento_pb = self.yatai_service.GetBento( - GetBentoRequest( - bento_name=deployment_spec.bento_name, - bento_version=deployment_spec.bento_version, - ) - ) - bento_service_metadata = bento_pb.bento.bento_service_metadata - api_names = ( - [lambda_deployment_config.api_name] - if lambda_deployment_config.api_name - else [api.name for api in bento_service_metadata.apis] - ) - - try: - cf_client = boto3.client( - 'cloudformation', lambda_deployment_config.region - ) - cloud_formation_stack_result = cf_client.describe_stacks( - StackName='{ns}-{name}'.format( - ns=deployment_pb.namespace, name=deployment_pb.name - ) - ) - stack_result = cloud_formation_stack_result.get('Stacks')[0] - # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/\ - # using-cfn-describing-stacks.html - success_status = ['CREATE_COMPLETE', 'UPDATE_COMPLETE'] - if stack_result['StackStatus'] in success_status: - if stack_result.get('Outputs'): - outputs = stack_result['Outputs'] - else: - return DescribeDeploymentResponse( - status=Status.ABORTED('"Outputs" field is not present'), - state=DeploymentState( - state=DeploymentState.ERROR, - error_message='"Outputs" field is not present', - ), - ) - elif stack_result['StackStatus'] in FAILED_CLOUDFORMATION_STACK_STATUS: - state = DeploymentState(state=DeploymentState.FAILED) - state.timestamp.GetCurrentTime() - return DescribeDeploymentResponse(status=Status.OK(), state=state) - else: - state = DeploymentState(state=DeploymentState.PENDING) - state.timestamp.GetCurrentTime() - return DescribeDeploymentResponse(status=Status.OK(), state=state) - except Exception as error: # pylint: disable=broad-except - state = DeploymentState( - state=DeploymentState.ERROR, error_message=str(error) - ) - state.timestamp.GetCurrentTime() - return DescribeDeploymentResponse( - status=Status.INTERNAL(str(error)), state=state - ) - outputs = {o['OutputKey']: o['OutputValue'] for o in outputs} - info_json = {} - - if 'EndpointUrl' in outputs: - info_json['endpoints'] = [ - outputs['EndpointUrl'] + '/' + api_name for api_name in api_names - ] - if 'S3Bucket' in outputs: - info_json['s3_bucket'] = outputs['S3Bucket'] - - state = DeploymentState( - state=DeploymentState.RUNNING, info_json=json.dumps(info_json) - ) - state.timestamp.GetCurrentTime() - return DescribeDeploymentResponse(status=Status.OK(), state=state) - except BentoMLException as error: - return DescribeDeploymentResponse(status=error.status_proto) diff --git a/bentoml/yatai/deployment/aws_lambda/operator.py b/bentoml/yatai/deployment/aws_lambda/operator.py new file mode 100644 index 00000000000..24dd54362d9 --- /dev/null +++ b/bentoml/yatai/deployment/aws_lambda/operator.py @@ -0,0 +1,546 @@ +# Copyright 2019 Atalaya Tech, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import shutil +import uuid +import logging +from pathlib import Path + +import boto3 +from botocore.exceptions import ClientError +from ruamel.yaml import YAML + +from bentoml.exceptions import ( + BentoMLException, + InvalidArgument, + YataiDeploymentException, +) +from bentoml.saved_bundle import loader +from bentoml.utils import status_pb_to_error_code_and_message +from bentoml.utils.s3 import create_s3_bucket_if_not_exists +from bentoml.utils.tempdir import TempDirectory + +from bentoml.yatai.deployment.aws_lambda.utils import ( + ensure_sam_available_or_raise, + init_sam_project, + lambda_deploy, + lambda_package, + validate_lambda_template, + reduce_bundle_size_and_upload_extra_resources_to_s3, + total_file_or_directory_size, + LAMBDA_FUNCTION_LIMIT, + LAMBDA_FUNCTION_MAX_LIMIT, + FAILED_CLOUDFORMATION_STACK_STATUS, +) + +from bentoml.yatai.deployment.operator import DeploymentOperatorBase +from bentoml.yatai.deployment.utils import ( + raise_if_api_names_not_found_in_bento_service_metadata, + generate_aws_compatible_string, + get_default_aws_region, + ensure_docker_available_or_raise, +) +from bentoml.yatai.proto import status_pb2 +from bentoml.yatai.proto.deployment_pb2 import ( + DeploymentState, + ApplyDeploymentResponse, + DeleteDeploymentResponse, + DescribeDeploymentResponse, +) +from bentoml.yatai.proto.repository_pb2 import GetBentoRequest, BentoUri +from bentoml.yatai.status import Status + + +logger = logging.getLogger(__name__) + + +def _create_aws_lambda_cloudformation_template_file( + project_dir, + namespace, + deployment_name, + deployment_path_prefix, + api_names, + bento_service_name, + s3_bucket_name, + py_runtime, + memory_size, + timeout, +): + template_file_path = os.path.join(project_dir, 'template.yaml') + yaml = YAML() + sam_config = { + 'AWSTemplateFormatVersion': '2010-09-09', + 'Transform': 'AWS::Serverless-2016-10-31', + 'Globals': { + 'Function': {'Timeout': timeout, 'Runtime': py_runtime}, + 'Api': { + 'BinaryMediaTypes': ['image~1*'], + 'Cors': {'AllowOrigin': "'*'"}, + 'Auth': { + 'ApiKeyRequired': False, + 'DefaultAuthorizer': 'NONE', + 'AddDefaultAuthorizerToCorsPreflight': False, + }, + }, + }, + 'Resources': {}, + 'Outputs': { + 'S3Bucket': { + 'Value': s3_bucket_name, + 'Description': 'S3 Bucket for saving artifacts and lambda bundle', + } + }, + } + for api_name in api_names: + sam_config['Resources'][api_name] = { + 'Type': 'AWS::Serverless::Function', + 'Properties': { + 'Runtime': py_runtime, + 'CodeUri': deployment_name + '/', + 'Handler': 'app.{}'.format(api_name), + 'FunctionName': f'{namespace}-{deployment_name}-{api_name}', + 'Timeout': timeout, + 'MemorySize': memory_size, + 'Events': { + 'Api': { + 'Type': 'Api', + 'Properties': { + 'Path': '/{}'.format(api_name), + 'Method': 'post', + }, + } + }, + 'Policies': [{'S3ReadPolicy': {'BucketName': s3_bucket_name}}], + 'Environment': { + 'Variables': { + 'BENTOML_BENTO_SERVICE_NAME': bento_service_name, + 'BENTOML_API_NAME': api_name, + 'BENTOML_S3_BUCKET': s3_bucket_name, + 'BENTOML_DEPLOYMENT_PATH_PREFIX': deployment_path_prefix, + } + }, + }, + } + + yaml.dump(sam_config, Path(template_file_path)) + + # We add Outputs section separately, because the value should not + # have "'" around !Sub + with open(template_file_path, 'a') as f: + f.write( + """\ + EndpointUrl: + Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.\ +amazonaws.com/Prod" + Description: URL for endpoint +""" + ) + return template_file_path + + +def _cleanup_s3_bucket_if_exist(bucket_name, region): + s3_client = boto3.client('s3', region) + s3 = boto3.resource('s3') + try: + logger.debug('Removing all objects inside bucket %s', bucket_name) + s3.Bucket(bucket_name).objects.all().delete() + logger.debug('Deleting bucket %s', bucket_name) + s3_client.delete_bucket(Bucket=bucket_name) + except ClientError as e: + if e.response and e.response['Error']['Code'] == 'NoSuchBucket': + # If there is no bucket, we just let it silently fail, dont have to do + # any thing + return + else: + raise e + + +def _deploy_lambda_function( + deployment_pb, + bento_service_metadata, + deployment_spec, + lambda_s3_bucket, + lambda_deployment_config, + bento_path, +): + deployment_path_prefix = os.path.join(deployment_pb.namespace, deployment_pb.name) + + py_major, py_minor, _ = bento_service_metadata.env.python_version.split('.') + if py_major != '3': + raise BentoMLException('Python 2 is not supported for Lambda Deployment') + python_runtime = 'python{}.{}'.format(py_major, py_minor) + + artifact_types = [item.artifact_type for item in bento_service_metadata.artifacts] + if any( + i in ['TensorflowSavedModelArtifact', 'KerasModelArtifact'] + for i in artifact_types + ) and (py_major, py_minor) != ('3', '6'): + raise BentoMLException( + 'For Tensorflow and Keras model, only python3.6 is ' + 'supported for AWS Lambda deployment' + ) + + api_names = ( + [lambda_deployment_config.api_name] + if lambda_deployment_config.api_name + else [api.name for api in bento_service_metadata.apis] + ) + + raise_if_api_names_not_found_in_bento_service_metadata( + bento_service_metadata, api_names + ) + + with TempDirectory() as lambda_project_dir: + logger.debug( + 'Generating cloudformation template.yaml for lambda project at %s', + lambda_project_dir, + ) + template_file_path = _create_aws_lambda_cloudformation_template_file( + project_dir=lambda_project_dir, + namespace=deployment_pb.namespace, + deployment_name=deployment_pb.name, + deployment_path_prefix=deployment_path_prefix, + api_names=api_names, + bento_service_name=deployment_spec.bento_name, + s3_bucket_name=lambda_s3_bucket, + py_runtime=python_runtime, + memory_size=lambda_deployment_config.memory_size, + timeout=lambda_deployment_config.timeout, + ) + logger.debug('Validating generated template.yaml') + validate_lambda_template( + template_file_path, lambda_deployment_config.region, lambda_project_dir, + ) + logger.debug( + 'Initializing lambda project in directory: %s ...', lambda_project_dir, + ) + init_sam_project( + lambda_project_dir, + bento_path, + deployment_pb.name, + deployment_spec.bento_name, + api_names, + aws_region=lambda_deployment_config.region, + ) + for api_name in api_names: + build_directory = os.path.join( + lambda_project_dir, '.aws-sam', 'build', api_name + ) + logger.debug( + 'Checking is function "%s" bundle under lambda size ' 'limit', api_name, + ) + # Since we only use s3 get object in lambda function, and + # lambda function pack their own boto3/botocore modules, + # we will just delete those modules from function bundle + # directory + delete_list = ['boto3', 'botocore'] + for name in delete_list: + logger.debug('Remove module "%s" from build directory', name) + shutil.rmtree(os.path.join(build_directory, name)) + total_build_dir_size = total_file_or_directory_size(build_directory) + if total_build_dir_size > LAMBDA_FUNCTION_MAX_LIMIT: + raise BentoMLException( + 'Build function size is over 700MB, max size ' + 'capable for AWS Lambda function' + ) + if total_build_dir_size >= LAMBDA_FUNCTION_LIMIT: + logger.debug( + 'Function %s is over lambda size limit, attempting ' 'reduce it', + api_name, + ) + reduce_bundle_size_and_upload_extra_resources_to_s3( + build_directory=build_directory, + region=lambda_deployment_config.region, + s3_bucket=lambda_s3_bucket, + deployment_prefix=deployment_path_prefix, + function_name=api_name, + lambda_project_dir=lambda_project_dir, + ) + else: + logger.debug( + 'Function bundle is within Lambda limit, removing ' + 'download_extra_resources.py file from function bundle' + ) + os.remove(os.path.join(build_directory, 'download_extra_resources.py')) + logger.info('Packaging AWS Lambda project at %s ...', lambda_project_dir) + lambda_package( + lambda_project_dir, + lambda_deployment_config.region, + lambda_s3_bucket, + deployment_path_prefix, + ) + logger.info('Deploying lambda project') + stack_name = generate_aws_compatible_string( + deployment_pb.namespace + '-' + deployment_pb.name + ) + lambda_deploy( + lambda_project_dir, lambda_deployment_config.region, stack_name=stack_name, + ) + + +class AwsLambdaDeploymentOperator(DeploymentOperatorBase): + def add(self, deployment_pb): + try: + deployment_spec = deployment_pb.spec + deployment_spec.aws_lambda_operator_config.region = ( + deployment_spec.aws_lambda_operator_config.region + or get_default_aws_region() + ) + if not deployment_spec.aws_lambda_operator_config.region: + raise InvalidArgument('AWS region is missing') + + ensure_sam_available_or_raise() + ensure_docker_available_or_raise() + bento_pb = self.yatai_service.GetBento( + GetBentoRequest( + bento_name=deployment_spec.bento_name, + bento_version=deployment_spec.bento_version, + ) + ) + if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3): + raise BentoMLException( + 'BentoML currently not support {} repository'.format( + BentoUri.StorageType.Name(bento_pb.bento.uri.type) + ) + ) + + return self._add(deployment_pb, bento_pb, bento_pb.bento.uri.uri) + except BentoMLException as error: + deployment_pb.state.state = DeploymentState.ERROR + deployment_pb.state.error_message = f'Error: {str(error)}' + return ApplyDeploymentResponse( + status=error.status_proto, deployment=deployment_pb + ) + + def _add(self, deployment_pb, bento_pb, bento_path): + if loader._is_remote_path(bento_path): + with loader._resolve_remote_bundle_path(bento_path) as local_path: + return self._add(deployment_pb, bento_pb, local_path) + + deployment_spec = deployment_pb.spec + lambda_deployment_config = deployment_spec.aws_lambda_operator_config + bento_service_metadata = bento_pb.bento.bento_service_metadata + lambda_s3_bucket = generate_aws_compatible_string( + 'btml-{namespace}-{name}-{random_string}'.format( + namespace=deployment_pb.namespace, + name=deployment_pb.name, + random_string=uuid.uuid4().hex[:6].lower(), + ) + ) + try: + create_s3_bucket_if_not_exists( + lambda_s3_bucket, lambda_deployment_config.region + ) + _deploy_lambda_function( + deployment_pb=deployment_pb, + bento_service_metadata=bento_service_metadata, + deployment_spec=deployment_spec, + lambda_s3_bucket=lambda_s3_bucket, + lambda_deployment_config=lambda_deployment_config, + bento_path=bento_path, + ) + return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb) + except BentoMLException as error: + if lambda_s3_bucket and lambda_deployment_config: + _cleanup_s3_bucket_if_exist( + lambda_s3_bucket, lambda_deployment_config.region + ) + raise error + + def update(self, deployment_pb, previous_deployment): + try: + ensure_sam_available_or_raise() + ensure_docker_available_or_raise() + + deployment_spec = deployment_pb.spec + bento_pb = self.yatai_service.GetBento( + GetBentoRequest( + bento_name=deployment_spec.bento_name, + bento_version=deployment_spec.bento_version, + ) + ) + if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3): + raise BentoMLException( + 'BentoML currently not support {} repository'.format( + BentoUri.StorageType.Name(bento_pb.bento.uri.type) + ) + ) + + return self._update( + deployment_pb, previous_deployment, bento_pb, bento_pb.bento.uri.uri + ) + except BentoMLException as error: + deployment_pb.state.state = DeploymentState.ERROR + deployment_pb.state.error_message = f'Error: {str(error)}' + return ApplyDeploymentResponse( + status=error.status_code, deployment_pb=deployment_pb + ) + + def _update(self, deployment_pb, current_deployment, bento_pb, bento_path): + if loader._is_remote_path(bento_path): + with loader._resolve_remote_bundle_path(bento_path) as local_path: + return self._update( + deployment_pb, current_deployment, bento_pb, local_path + ) + updated_deployment_spec = deployment_pb.spec + updated_lambda_deployment_config = ( + updated_deployment_spec.aws_lambda_operator_config + ) + updated_bento_service_metadata = bento_pb.bento.bento_service_metadata + describe_result = self.describe(deployment_pb) + if describe_result.status.status_code != status_pb2.Status.OK: + error_code, error_message = status_pb_to_error_code_and_message( + describe_result.status + ) + raise YataiDeploymentException( + f'Failed fetching Lambda deployment current status - ' + f'{error_code}:{error_message}' + ) + latest_deployment_state = json.loads(describe_result.state.info_json) + if 's3_bucket' in latest_deployment_state: + lambda_s3_bucket = latest_deployment_state['s3_bucket'] + else: + raise BentoMLException( + 'S3 Bucket is missing in the AWS Lambda deployment, please make sure ' + 'it exists and try again' + ) + + _deploy_lambda_function( + deployment_pb=deployment_pb, + bento_service_metadata=updated_bento_service_metadata, + deployment_spec=updated_deployment_spec, + lambda_s3_bucket=lambda_s3_bucket, + lambda_deployment_config=updated_lambda_deployment_config, + bento_path=bento_path, + ) + + return ApplyDeploymentResponse(deployment=deployment_pb, status=Status.OK()) + + def delete(self, deployment_pb): + try: + logger.debug('Deleting AWS Lambda deployment') + + deployment_spec = deployment_pb.spec + lambda_deployment_config = deployment_spec.aws_lambda_operator_config + lambda_deployment_config.region = ( + lambda_deployment_config.region or get_default_aws_region() + ) + if not lambda_deployment_config.region: + raise InvalidArgument('AWS region is missing') + + cf_client = boto3.client('cloudformation', lambda_deployment_config.region) + stack_name = generate_aws_compatible_string( + deployment_pb.namespace, deployment_pb.name + ) + if deployment_pb.state.info_json: + deployment_info_json = json.loads(deployment_pb.state.info_json) + bucket_name = deployment_info_json.get('s3_bucket') + if bucket_name: + _cleanup_s3_bucket_if_exist( + bucket_name, lambda_deployment_config.region + ) + + logger.debug( + 'Deleting AWS CloudFormation: %s that includes Lambda function ' + 'and related resources', + stack_name, + ) + cf_client.delete_stack(StackName=stack_name) + return DeleteDeploymentResponse(status=Status.OK()) + + except BentoMLException as error: + return DeleteDeploymentResponse(status=error.status_proto) + + def describe(self, deployment_pb): + try: + deployment_spec = deployment_pb.spec + lambda_deployment_config = deployment_spec.aws_lambda_operator_config + lambda_deployment_config.region = ( + lambda_deployment_config.region or get_default_aws_region() + ) + if not lambda_deployment_config.region: + raise InvalidArgument('AWS region is missing') + + bento_pb = self.yatai_service.GetBento( + GetBentoRequest( + bento_name=deployment_spec.bento_name, + bento_version=deployment_spec.bento_version, + ) + ) + bento_service_metadata = bento_pb.bento.bento_service_metadata + api_names = ( + [lambda_deployment_config.api_name] + if lambda_deployment_config.api_name + else [api.name for api in bento_service_metadata.apis] + ) + + try: + cf_client = boto3.client( + 'cloudformation', lambda_deployment_config.region + ) + cloud_formation_stack_result = cf_client.describe_stacks( + StackName='{ns}-{name}'.format( + ns=deployment_pb.namespace, name=deployment_pb.name + ) + ) + stack_result = cloud_formation_stack_result.get('Stacks')[0] + # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/\ + # using-cfn-describing-stacks.html + success_status = ['CREATE_COMPLETE', 'UPDATE_COMPLETE'] + if stack_result['StackStatus'] in success_status: + if stack_result.get('Outputs'): + outputs = stack_result['Outputs'] + else: + return DescribeDeploymentResponse( + status=Status.ABORTED('"Outputs" field is not present'), + state=DeploymentState( + state=DeploymentState.ERROR, + error_message='"Outputs" field is not present', + ), + ) + elif stack_result['StackStatus'] in FAILED_CLOUDFORMATION_STACK_STATUS: + state = DeploymentState(state=DeploymentState.FAILED) + state.timestamp.GetCurrentTime() + return DescribeDeploymentResponse(status=Status.OK(), state=state) + else: + state = DeploymentState(state=DeploymentState.PENDING) + state.timestamp.GetCurrentTime() + return DescribeDeploymentResponse(status=Status.OK(), state=state) + except Exception as error: # pylint: disable=broad-except + state = DeploymentState( + state=DeploymentState.ERROR, error_message=str(error) + ) + state.timestamp.GetCurrentTime() + return DescribeDeploymentResponse( + status=Status.INTERNAL(str(error)), state=state + ) + outputs = {o['OutputKey']: o['OutputValue'] for o in outputs} + info_json = {} + + if 'EndpointUrl' in outputs: + info_json['endpoints'] = [ + outputs['EndpointUrl'] + '/' + api_name for api_name in api_names + ] + if 'S3Bucket' in outputs: + info_json['s3_bucket'] = outputs['S3Bucket'] + + state = DeploymentState( + state=DeploymentState.RUNNING, info_json=json.dumps(info_json) + ) + state.timestamp.GetCurrentTime() + return DescribeDeploymentResponse(status=Status.OK(), state=state) + except BentoMLException as error: + return DescribeDeploymentResponse(status=error.status_proto) diff --git a/bentoml/yatai/deployment/operator.py b/bentoml/yatai/deployment/operator.py index 2ce97d20a02..192c8d02f68 100644 --- a/bentoml/yatai/deployment/operator.py +++ b/bentoml/yatai/deployment/operator.py @@ -22,11 +22,15 @@ def get_deployment_operator(yatai_service, deployment_pb): operator = deployment_pb.spec.operator if operator == DeploymentSpec.AWS_SAGEMAKER: - from bentoml.yatai.deployment.sagemaker import SageMakerDeploymentOperator + from bentoml.yatai.deployment.sagemaker.operator import ( + SageMakerDeploymentOperator, + ) return SageMakerDeploymentOperator(yatai_service) elif operator == DeploymentSpec.AWS_LAMBDA: - from bentoml.yatai.deployment.aws_lambda import AwsLambdaDeploymentOperator + from bentoml.yatai.deployment.aws_lambda.operator import ( + AwsLambdaDeploymentOperator, + ) return AwsLambdaDeploymentOperator(yatai_service) elif operator == DeploymentSpec.GCP_FUNCTION: diff --git a/bentoml/yatai/deployment/sagemaker/__init__.py b/bentoml/yatai/deployment/sagemaker/__init__.py index 462359b60f4..4b378e85c85 100644 --- a/bentoml/yatai/deployment/sagemaker/__init__.py +++ b/bentoml/yatai/deployment/sagemaker/__init__.py @@ -11,741 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import os -import shutil -import base64 -import logging -import json -from urllib.parse import urlparse - -import boto3 -import docker - -from botocore.exceptions import ClientError - -from bentoml.saved_bundle import loader -from bentoml.yatai.deployment.utils import ( - process_docker_api_line, - ensure_docker_available_or_raise, - generate_aws_compatible_string, - raise_if_api_names_not_found_in_bento_service_metadata, - get_default_aws_region, -) -from bentoml.yatai.proto.repository_pb2 import GetBentoRequest, BentoUri -from bentoml.yatai.status import Status -from bentoml.utils.tempdir import TempDirectory -from bentoml.exceptions import ( - YataiDeploymentException, - BentoMLException, - AWSServiceError, - InvalidArgument, -) -from bentoml.yatai.deployment.operator import DeploymentOperatorBase -from bentoml.yatai.proto.deployment_pb2 import ( - ApplyDeploymentResponse, - DeleteDeploymentResponse, - DescribeDeploymentResponse, - DeploymentState, -) - -logger = logging.getLogger(__name__) - -# This should be kept in sync with BENTO_SERVICE_DOCKERFILE_CPU_TEMPLATE -BENTO_SERVICE_SAGEMAKER_DOCKERFILE = """\ -FROM {docker_base_image} - -# the env var $PORT is required by heroku container runtime -ENV PORT 8080 -EXPOSE $PORT - -RUN apt-get update --fix-missing && \ - apt-get install -y nginx && \ - apt-get clean - -# gevent required by AWS Sagemaker -RUN pip install gevent - -# copy over model files -COPY . /opt/program -WORKDIR /opt/program - -RUN if [ -f /opt/program/bentoml-init.sh ]; then /bin/bash -c /opt/program/bentoml-init.sh; fi - -ENV PATH="/opt/program:$PATH" -""" # noqa: E501 - - -def strip_scheme(url): - """ Stripe url's schema - e.g. http://some.url/path -> some.url/path - :param url: String - :return: String - """ - parsed = urlparse(url) - scheme = "%s://" % parsed.scheme - return parsed.geturl().replace(scheme, "", 1) - - -def get_arn_role_from_current_aws_user(): - sts_client = boto3.client("sts") - identity = sts_client.get_caller_identity() - sts_arn = identity["Arn"] - sts_arn_list = sts_arn.split(":") - type_role = sts_arn_list[-1].split("/") - iam_client = boto3.client("iam") - if type_role[0] in ("user", "root"): - role_list = iam_client.list_roles() - arn = None - for role in role_list["Roles"]: - policy_document = role["AssumeRolePolicyDocument"] - statement = policy_document["Statement"][0] - if ( - statement["Effect"] == "Allow" - and statement["Principal"].get("Service", None) - == "sagemaker.amazonaws.com" - ): - arn = role["Arn"] - if arn is None: - raise YataiDeploymentException( - "Can't find proper Arn role for Sagemaker, please create one and try " - "again" - ) - return arn - elif type_role[0] == "role": - role_response = iam_client.get_role(RoleName=type_role[1]) - return role_response["Role"]["Arn"] - - raise YataiDeploymentException( - "Not supported role type {}; sts arn is {}".format(type_role[0], sts_arn) - ) - - -def create_and_push_docker_image_to_ecr( - region, bento_name, bento_version, snapshot_path -): - """Create BentoService sagemaker image and push to AWS ECR - - Example: https://github.com/awslabs/amazon-sagemaker-examples/blob/\ - master/advanced_functionality/scikit_bring_your_own/container/build_and_push.sh - 1. get aws account info and login ecr - 2. create ecr repository, if not exist - 3. build tag and push docker image - - Args: - region(String) - bento_name(String) - bento_version(String) - snapshot_path(Path) - - Returns: - str: AWS ECR Tag - """ - ecr_client = boto3.client("ecr", region) - token = ecr_client.get_authorization_token() - logger.debug("Getting docker login info from AWS") - username, password = ( - base64.b64decode(token["authorizationData"][0]["authorizationToken"]) - .decode("utf-8") - .split(":") - ) - registry_url = token["authorizationData"][0]["proxyEndpoint"] - auth_config_payload = {"username": username, "password": password} - - docker_api = docker.APIClient() - - image_name = bento_name.lower() + "-sagemaker" - ecr_tag = strip_scheme( - "{registry_url}/{image_name}:{version}".format( - registry_url=registry_url, image_name=image_name, version=bento_version - ) - ) - - logger.debug("Building docker image: %s", ecr_tag) - for line in docker_api.build( - path=snapshot_path, dockerfile="Dockerfile-sagemaker", tag=ecr_tag - ): - process_docker_api_line(line) - - try: - ecr_client.describe_repositories(repositoryNames=[image_name])["repositories"] - except ecr_client.exceptions.RepositoryNotFoundException: - ecr_client.create_repository(repositoryName=image_name) - - logger.debug("Pushing image to AWS ECR at %s", ecr_tag) - for line in docker_api.push(ecr_tag, stream=True, auth_config=auth_config_payload): - process_docker_api_line(line) - logger.debug("Finished pushing image: %s", ecr_tag) - return ecr_tag - - -# Sagemaker response status: 'OutOfService'|'Creating'|'Updating'| -# 'SystemUpdating'|'RollingBack'|'InService'| -# 'Deleting'|'Failed' -ENDPOINT_STATUS_TO_STATE = { - "InService": DeploymentState.RUNNING, - "Deleting": DeploymentState.INACTIVATED, - "Creating": DeploymentState.PENDING, - "Updating": DeploymentState.PENDING, - "RollingBack": DeploymentState.PENDING, - "SystemUpdating": DeploymentState.PENDING, - "OutOfService": DeploymentState.INACTIVATED, - "Failed": DeploymentState.ERROR, -} - - -def _aws_client_error_to_bentoml_exception(e, message_prefix=None): - """parse botocore.exceptions.ClientError into Bento StatusProto - - We handle two most common errors when deploying to Sagemaker. - 1. Authenication issue/invalid access(InvalidSignatureException) - 2. resources not found (ValidationException) - It will return correlated StatusProto(NOT_FOUND, UNAUTHENTICATED) - - Args: - e: ClientError from botocore.exceptions - Returns: - StatusProto - """ - error_response = e.response.get('Error', {}) - error_code = error_response.get('Code', 'Unknown') - error_message = error_response.get('Message', 'Unknown') - error_log_message = ( - f'AWS ClientError - operation: {e.operation_name}, ' - f'code: {error_code}, message: {error_message}' - ) - if message_prefix: - error_log_message = f'{message_prefix}; {error_log_message}' - logger.error(error_log_message) - return AWSServiceError(error_log_message) - - -def _get_sagemaker_resource_names(deployment_pb): - sagemaker_model_name = generate_aws_compatible_string( - (deployment_pb.namespace, 10), - (deployment_pb.name, 12), - (deployment_pb.spec.bento_name, 20), - (deployment_pb.spec.bento_version, 18), - ) - sagemaker_endpoint_config_name = generate_aws_compatible_string( - (deployment_pb.namespace, 10), - (deployment_pb.name, 12), - (deployment_pb.spec.bento_name, 20), - (deployment_pb.spec.bento_version, 18), - ) - sagemaker_endpoint_name = generate_aws_compatible_string( - deployment_pb.namespace, deployment_pb.name - ) - return sagemaker_model_name, sagemaker_endpoint_config_name, sagemaker_endpoint_name - - -def _delete_sagemaker_model_if_exist(sagemaker_client, sagemaker_model_name): - try: - delete_model_response = sagemaker_client.delete_model( - ModelName=sagemaker_model_name - ) - logger.debug("AWS delete model response: %s", delete_model_response) - except ClientError as e: - error_response = e.response.get('Error', {}) - error_code = error_response.get('Code', 'Unknown') - error_message = error_response.get('Message', 'Unknown') - if ( - error_code == 'ValidationException' - and "Could not find model" in error_message - ): - # sagemaker model does not exist - return - - raise _aws_client_error_to_bentoml_exception( - e, f"Failed to cleanup sagemaker model '{sagemaker_model_name}'" - ) - - return - - -def _delete_sagemaker_endpoint_config_if_exist( - sagemaker_client, sagemaker_endpoint_config_name -): - try: - delete_endpoint_config_response = sagemaker_client.delete_endpoint_config( - EndpointConfigName=sagemaker_endpoint_config_name - ) - logger.debug( - "AWS delete endpoint config response: %s", delete_endpoint_config_response - ) - except ClientError as e: - error_response = e.response.get('Error', {}) - error_code = error_response.get('Code', 'Unknown') - error_message = error_response.get('Message', 'Unknown') - if ( - error_code == 'ValidationException' - and "Could not find endpoint configuration" in error_message - ): - # endpoint config does not exist - return - - raise _aws_client_error_to_bentoml_exception( - e, - f"Failed to cleanup sagemaker endpoint config " - f"'{sagemaker_endpoint_config_name}' after creation failed", - ) - return - - -def _delete_sagemaker_endpoint_if_exist(sagemaker_client, sagemaker_endpoint_name): - try: - delete_endpoint_response = sagemaker_client.delete_endpoint( - EndpointName=sagemaker_endpoint_name - ) - logger.debug("AWS delete endpoint response: %s", delete_endpoint_response) - except ClientError as e: - error_response = e.response.get('Error', {}) - error_code = error_response.get('Code', 'Unknown') - error_message = error_response.get('Message', 'Unknown') - if ( - error_code == 'ValidationException' - and "Could not find endpoint" in error_message - ): - # sagemaker endpoint does not exist - return - - raise _aws_client_error_to_bentoml_exception( - e, f"Failed to delete sagemaker endpoint '{sagemaker_endpoint_name}'" - ) - - -def delete_sagemaker_deployment_resources_if_exist(deployment_pb): - sagemaker_config = deployment_pb.spec.sagemaker_operator_config - sagemaker_client = boto3.client('sagemaker', sagemaker_config.region) - - ( - sagemaker_model_name, - sagemaker_endpoint_config_name, - sagemaker_endpoint_name, - ) = _get_sagemaker_resource_names(deployment_pb) - - _delete_sagemaker_model_if_exist(sagemaker_client, sagemaker_model_name) - _delete_sagemaker_endpoint_config_if_exist( - sagemaker_client, sagemaker_endpoint_config_name - ) - _delete_sagemaker_endpoint_if_exist(sagemaker_client, sagemaker_endpoint_name) - - -def _init_sagemaker_project(sagemaker_project_dir, bento_path, docker_base_image): - shutil.copytree(bento_path, sagemaker_project_dir) - - with open(os.path.join(sagemaker_project_dir, 'Dockerfile-sagemaker'), "w") as f: - f.write( - BENTO_SERVICE_SAGEMAKER_DOCKERFILE.format( - docker_base_image=docker_base_image - ) - ) - - nginx_conf_path = os.path.join(os.path.dirname(__file__), 'sagemaker_nginx.conf') - shutil.copy(nginx_conf_path, os.path.join(sagemaker_project_dir, 'nginx.conf')) - - wsgi_py_path = os.path.join(os.path.dirname(__file__), 'sagemaker_wsgi.py') - shutil.copy(wsgi_py_path, os.path.join(sagemaker_project_dir, 'wsgi.py')) - - serve_file_path = os.path.join(os.path.dirname(__file__), 'sagemaker_serve.py') - shutil.copy(serve_file_path, os.path.join(sagemaker_project_dir, 'serve')) - - # permission 755 is required for entry script 'serve' - permission = "755" - octal_permission = int(permission, 8) - os.chmod(os.path.join(sagemaker_project_dir, "serve"), octal_permission) - return sagemaker_project_dir - - -def _create_sagemaker_model( - sagemaker_client, sagemaker_model_name, ecr_image_path, spec -): - execution_role_arn = get_arn_role_from_current_aws_user() - - sagemaker_model_info = { - "ModelName": sagemaker_model_name, - "PrimaryContainer": { - "ContainerHostname": sagemaker_model_name, - "Image": ecr_image_path, - "Environment": { - "API_NAME": spec.api_name, - 'BENTOML_GUNICORN_TIMEOUT': str(spec.timeout), - }, - }, - "ExecutionRoleArn": execution_role_arn, - } - - # Will set envvar, if user defined gunicorn workers per instance. EnvVar needs - # to be string instead of the int. - if spec.num_of_gunicorn_workers_per_instance: - sagemaker_model_info['PrimaryContainer']['Environment'][ - 'BENTOML_GUNICORN_NUM_OF_WORKERS' - ] = str(spec.num_of_gunicorn_workers_per_instance) - - try: - create_model_response = sagemaker_client.create_model(**sagemaker_model_info) - except ClientError as e: - raise _aws_client_error_to_bentoml_exception( - e, "Failed to create sagemaker model" - ) - logger.debug("AWS create model response: %s", create_model_response) - - -def _create_sagemaker_endpoint_config( - sagemaker_client, sagemaker_model_name, endpoint_config_name, sagemaker_config -): - production_variants = [ - { - "VariantName": sagemaker_model_name, - "ModelName": sagemaker_model_name, - "InitialInstanceCount": sagemaker_config.instance_count, - "InstanceType": sagemaker_config.instance_type, - } - ] - - logger.debug("Creating Sagemaker endpoint %s configuration", endpoint_config_name) - try: - create_config_response = sagemaker_client.create_endpoint_config( - EndpointConfigName=endpoint_config_name, - ProductionVariants=production_variants, - ) - except ClientError as e: - raise _aws_client_error_to_bentoml_exception( - e, "Failed to create sagemaker endpoint config" - ) - logger.debug("AWS create endpoint config response: %s", create_config_response) - - -def _create_sagemaker_endpoint(sagemaker_client, endpoint_name, endpoint_config_name): - try: - logger.debug("Creating sagemaker endpoint %s", endpoint_name) - create_endpoint_response = sagemaker_client.create_endpoint( - EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name - ) - logger.debug("AWS create endpoint response: %s", create_endpoint_response) - except ClientError as e: - raise _aws_client_error_to_bentoml_exception( - e, "Failed to create sagemaker endpoint" - ) - - -def _update_sagemaker_endpoint(sagemaker_client, endpoint_name, endpoint_config_name): - try: - logger.debug("Updating sagemaker endpoint %s", endpoint_name) - update_endpoint_response = sagemaker_client.update_endpoint( - EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name - ) - logger.debug("AWS update endpoint response: %s", str(update_endpoint_response)) - except ClientError as e: - raise _aws_client_error_to_bentoml_exception( - e, "Failed to update sagemaker endpoint" - ) - - -class SageMakerDeploymentOperator(DeploymentOperatorBase): - def add(self, deployment_pb): - try: - deployment_spec = deployment_pb.spec - sagemaker_config = deployment_spec.sagemaker_operator_config - sagemaker_config.region = ( - sagemaker_config.region or get_default_aws_region() - ) - if not sagemaker_config.region: - raise InvalidArgument('AWS region is missing') - - ensure_docker_available_or_raise() - if sagemaker_config is None: - raise YataiDeploymentException('Sagemaker configuration is missing.') - - bento_pb = self.yatai_service.GetBento( - GetBentoRequest( - bento_name=deployment_spec.bento_name, - bento_version=deployment_spec.bento_version, - ) - ) - if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3): - raise BentoMLException( - 'BentoML currently not support {} repository'.format( - BentoUri.StorageType.Name(bento_pb.bento.uri.type) - ) - ) - return self._add(deployment_pb, bento_pb, bento_pb.bento.uri.uri) - - except BentoMLException as error: - deployment_pb.state.state = DeploymentState.ERROR - deployment_pb.state.error_message = ( - f'Error creating SageMaker deployment: {str(error)}' - ) - return ApplyDeploymentResponse( - status=error.status_proto, deployment=deployment_pb - ) - - def _add(self, deployment_pb, bento_pb, bento_path): - if loader._is_remote_path(bento_path): - with loader._resolve_remote_bundle_path(bento_path) as local_path: - return self._add(deployment_pb, bento_pb, local_path) - - deployment_spec = deployment_pb.spec - sagemaker_config = deployment_spec.sagemaker_operator_config - - raise_if_api_names_not_found_in_bento_service_metadata( - bento_pb.bento.bento_service_metadata, [sagemaker_config.api_name] - ) - - sagemaker_client = boto3.client('sagemaker', sagemaker_config.region) - - with TempDirectory() as temp_dir: - sagemaker_project_dir = os.path.join(temp_dir, deployment_spec.bento_name) - _init_sagemaker_project( - sagemaker_project_dir, - bento_path, - bento_pb.bento.bento_service_metadata.env.docker_base_image, - ) - ecr_image_path = create_and_push_docker_image_to_ecr( - sagemaker_config.region, - deployment_spec.bento_name, - deployment_spec.bento_version, - sagemaker_project_dir, - ) - - try: - ( - sagemaker_model_name, - sagemaker_endpoint_config_name, - sagemaker_endpoint_name, - ) = _get_sagemaker_resource_names(deployment_pb) - - _create_sagemaker_model( - sagemaker_client, sagemaker_model_name, ecr_image_path, sagemaker_config - ) - _create_sagemaker_endpoint_config( - sagemaker_client, - sagemaker_model_name, - sagemaker_endpoint_config_name, - sagemaker_config, - ) - _create_sagemaker_endpoint( - sagemaker_client, - sagemaker_endpoint_name, - sagemaker_endpoint_config_name, - ) - except AWSServiceError as e: - delete_sagemaker_deployment_resources_if_exist(deployment_pb) - raise e - - return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb) - - def update(self, deployment_pb, previous_deployment): - try: - ensure_docker_available_or_raise() - deployment_spec = deployment_pb.spec - bento_pb = self.yatai_service.GetBento( - GetBentoRequest( - bento_name=deployment_spec.bento_name, - bento_version=deployment_spec.bento_version, - ) - ) - if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3): - raise BentoMLException( - 'BentoML currently not support {} repository'.format( - BentoUri.StorageType.Name(bento_pb.bento.uri.type) - ) - ) - return self._update( - deployment_pb, previous_deployment, bento_pb, bento_pb.bento.uri.uri - ) - except BentoMLException as error: - deployment_pb.state.state = DeploymentState.ERROR - deployment_pb.state.error_message = ( - f'Error updating SageMaker deployment: {str(error)}' - ) - return ApplyDeploymentResponse( - status=error.status_proto, deployment=deployment_pb - ) - - def _update(self, deployment_pb, current_deployment, bento_pb, bento_path): - if loader._is_remote_path(bento_path): - with loader._resolve_remote_bundle_path(bento_path) as local_path: - return self._update( - deployment_pb, current_deployment, bento_pb, local_path - ) - updated_deployment_spec = deployment_pb.spec - updated_sagemaker_config = updated_deployment_spec.sagemaker_operator_config - sagemaker_client = boto3.client('sagemaker', updated_sagemaker_config.region) - - try: - raise_if_api_names_not_found_in_bento_service_metadata( - bento_pb.bento.bento_service_metadata, - [updated_sagemaker_config.api_name], - ) - describe_latest_deployment_state = self.describe(deployment_pb) - current_deployment_spec = current_deployment.spec - current_sagemaker_config = current_deployment_spec.sagemaker_operator_config - latest_deployment_state = json.loads( - describe_latest_deployment_state.state.info_json - ) - - current_ecr_image_tag = latest_deployment_state['ProductionVariants'][0][ - 'DeployedImages' - ][0]['SpecifiedImage'] - if ( - updated_deployment_spec.bento_name != current_deployment_spec.bento_name - or updated_deployment_spec.bento_version - != current_deployment_spec.bento_version - ): - logger.debug( - 'BentoService tag is different from current deployment, ' - 'creating new docker image and push to ECR' - ) - with TempDirectory() as temp_dir: - sagemaker_project_dir = os.path.join( - temp_dir, updated_deployment_spec.bento_name - ) - _init_sagemaker_project( - sagemaker_project_dir, - bento_path, - bento_pb.bento.bento_service_metadata.env.docker_base_image, - ) - ecr_image_path = create_and_push_docker_image_to_ecr( - updated_sagemaker_config.region, - updated_deployment_spec.bento_name, - updated_deployment_spec.bento_version, - sagemaker_project_dir, - ) - else: - logger.debug('Using existing ECR image for Sagemaker model') - ecr_image_path = current_ecr_image_tag - - ( - updated_sagemaker_model_name, - updated_sagemaker_endpoint_config_name, - sagemaker_endpoint_name, - ) = _get_sagemaker_resource_names(deployment_pb) - ( - current_sagemaker_model_name, - current_sagemaker_endpoint_config_name, - _, - ) = _get_sagemaker_resource_names(current_deployment) - - if ( - updated_sagemaker_config.api_name != current_sagemaker_config.api_name - or updated_sagemaker_config.num_of_gunicorn_workers_per_instance - != current_sagemaker_config.num_of_gunicorn_workers_per_instance - or ecr_image_path != current_ecr_image_tag - ): - logger.debug( - 'Sagemaker model requires update. Delete current sagemaker model %s' - 'and creating new model %s', - current_sagemaker_model_name, - updated_sagemaker_model_name, - ) - _delete_sagemaker_model_if_exist( - sagemaker_client, current_sagemaker_model_name - ) - _create_sagemaker_model( - sagemaker_client, - updated_sagemaker_model_name, - ecr_image_path, - updated_sagemaker_config, - ) - # When bento service tag is not changed, we need to delete the current - # endpoint configuration in order to create new one to avoid name collation - if ( - current_sagemaker_endpoint_config_name - == updated_sagemaker_endpoint_config_name - ): - logger.debug( - 'Current sagemaker config name %s is same as updated one, ' - 'delete it before create new endpoint config', - current_sagemaker_endpoint_config_name, - ) - _delete_sagemaker_endpoint_config_if_exist( - sagemaker_client, current_sagemaker_endpoint_config_name - ) - logger.debug( - 'Create new endpoint configuration %s', - updated_sagemaker_endpoint_config_name, - ) - _create_sagemaker_endpoint_config( - sagemaker_client, - updated_sagemaker_model_name, - updated_sagemaker_endpoint_config_name, - updated_sagemaker_config, - ) - logger.debug( - 'Updating endpoint to new endpoint configuration %s', - updated_sagemaker_endpoint_config_name, - ) - _update_sagemaker_endpoint( - sagemaker_client, - sagemaker_endpoint_name, - updated_sagemaker_endpoint_config_name, - ) - logger.debug( - 'Delete old sagemaker endpoint config %s', - current_sagemaker_endpoint_config_name, - ) - _delete_sagemaker_endpoint_config_if_exist( - sagemaker_client, current_sagemaker_endpoint_config_name - ) - except AWSServiceError as e: - delete_sagemaker_deployment_resources_if_exist(deployment_pb) - raise e - - return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb) - - def delete(self, deployment_pb): - try: - deployment_spec = deployment_pb.spec - sagemaker_config = deployment_spec.sagemaker_operator_config - sagemaker_config.region = ( - sagemaker_config.region or get_default_aws_region() - ) - if not sagemaker_config.region: - raise InvalidArgument('AWS region is missing') - - delete_sagemaker_deployment_resources_if_exist(deployment_pb) - - return DeleteDeploymentResponse(status=Status.OK()) - except BentoMLException as error: - return DeleteDeploymentResponse(status=error.status_proto) - - def describe(self, deployment_pb): - try: - deployment_spec = deployment_pb.spec - sagemaker_config = deployment_spec.sagemaker_operator_config - sagemaker_config.region = ( - sagemaker_config.region or get_default_aws_region() - ) - if not sagemaker_config.region: - raise InvalidArgument('AWS region is missing') - sagemaker_client = boto3.client('sagemaker', sagemaker_config.region) - _, _, sagemaker_endpoint_name = _get_sagemaker_resource_names(deployment_pb) - - try: - endpoint_status_response = sagemaker_client.describe_endpoint( - EndpointName=sagemaker_endpoint_name - ) - except ClientError as e: - raise _aws_client_error_to_bentoml_exception( - e, - f"Failed to fetch current status of sagemaker endpoint " - f"'{sagemaker_endpoint_name}'", - ) - - logger.debug("AWS describe endpoint response: %s", endpoint_status_response) - endpoint_status = endpoint_status_response["EndpointStatus"] - - service_state = ENDPOINT_STATUS_TO_STATE[endpoint_status] - - deployment_state = DeploymentState( - state=service_state, - info_json=json.dumps(endpoint_status_response, default=str), - ) - deployment_state.timestamp.GetCurrentTime() - - return DescribeDeploymentResponse( - state=deployment_state, status=Status.OK() - ) - except BentoMLException as error: - return DescribeDeploymentResponse(status=error.status_proto) diff --git a/bentoml/yatai/deployment/sagemaker/sagemaker_nginx.conf b/bentoml/yatai/deployment/sagemaker/nginx.conf similarity index 100% rename from bentoml/yatai/deployment/sagemaker/sagemaker_nginx.conf rename to bentoml/yatai/deployment/sagemaker/nginx.conf diff --git a/bentoml/yatai/deployment/sagemaker/operator.py b/bentoml/yatai/deployment/sagemaker/operator.py new file mode 100644 index 00000000000..1b2a44b1615 --- /dev/null +++ b/bentoml/yatai/deployment/sagemaker/operator.py @@ -0,0 +1,734 @@ +import base64 +import json +import logging +import os +import shutil +from urllib.parse import urlparse + +import boto3 +import docker +from botocore.exceptions import ClientError + +from bentoml.exceptions import ( + YataiDeploymentException, + AWSServiceError, + InvalidArgument, + BentoMLException, +) +from bentoml.saved_bundle import loader +from bentoml.utils.tempdir import TempDirectory +from bentoml.yatai.deployment.operator import DeploymentOperatorBase +from bentoml.yatai.deployment.utils import ( + process_docker_api_line, + generate_aws_compatible_string, + get_default_aws_region, + ensure_docker_available_or_raise, + raise_if_api_names_not_found_in_bento_service_metadata, +) +from bentoml.yatai.proto.deployment_pb2 import ( + DeploymentState, + ApplyDeploymentResponse, + DeleteDeploymentResponse, + DescribeDeploymentResponse, +) +from bentoml.yatai.proto.repository_pb2 import GetBentoRequest, BentoUri +from bentoml.yatai.status import Status + +logger = logging.getLogger(__name__) + + +BENTO_SERVICE_SAGEMAKER_DOCKERFILE = """\ +FROM {docker_base_image} + +# the env var $PORT is required by heroku container runtime +ENV PORT 8080 +EXPOSE $PORT + +RUN apt-get update --fix-missing && \ + apt-get install -y nginx && \ + apt-get clean + +# gevent required by AWS Sagemaker +RUN pip install gevent + +# copy over model files +COPY . /bento +WORKDIR /bento + +RUN if [ -f /bento/bentoml-init.sh ]; then bash -c /bento/bentoml-init.sh; fi + +ENV PATH="/bento:$PATH" +""" # noqa: E501 + + +def strip_scheme(url): + """ Stripe url's schema + e.g. http://some.url/path -> some.url/path + :param url: String + :return: String + """ + parsed = urlparse(url) + scheme = "%s://" % parsed.scheme + return parsed.geturl().replace(scheme, "", 1) + + +def get_arn_role_from_current_aws_user(): + sts_client = boto3.client("sts") + identity = sts_client.get_caller_identity() + sts_arn = identity["Arn"] + sts_arn_list = sts_arn.split(":") + type_role = sts_arn_list[-1].split("/") + iam_client = boto3.client("iam") + if type_role[0] in ("user", "root"): + role_list = iam_client.list_roles() + arn = None + for role in role_list["Roles"]: + policy_document = role["AssumeRolePolicyDocument"] + statement = policy_document["Statement"][0] + if ( + statement["Effect"] == "Allow" + and statement["Principal"].get("Service", None) + == "sagemaker.amazonaws.com" + ): + arn = role["Arn"] + if arn is None: + raise YataiDeploymentException( + "Can't find proper Arn role for Sagemaker, please create one and try " + "again" + ) + return arn + elif type_role[0] == "role": + role_response = iam_client.get_role(RoleName=type_role[1]) + return role_response["Role"]["Arn"] + + raise YataiDeploymentException( + "Not supported role type {}; sts arn is {}".format(type_role[0], sts_arn) + ) + + +def create_and_push_docker_image_to_ecr( + region, bento_name, bento_version, snapshot_path +): + """Create BentoService sagemaker image and push to AWS ECR + + Example: https://github.com/awslabs/amazon-sagemaker-examples/blob/\ + master/advanced_functionality/scikit_bring_your_own/container/build_and_push.sh + 1. get aws account info and login ecr + 2. create ecr repository, if not exist + 3. build tag and push docker image + + Args: + region(String) + bento_name(String) + bento_version(String) + snapshot_path(Path) + + Returns: + str: AWS ECR Tag + """ + ecr_client = boto3.client("ecr", region) + token = ecr_client.get_authorization_token() + logger.debug("Getting docker login info from AWS") + username, password = ( + base64.b64decode(token["authorizationData"][0]["authorizationToken"]) + .decode("utf-8") + .split(":") + ) + registry_url = token["authorizationData"][0]["proxyEndpoint"] + auth_config_payload = {"username": username, "password": password} + + docker_api = docker.APIClient() + + image_name = bento_name.lower() + "-sagemaker" + ecr_tag = strip_scheme( + "{registry_url}/{image_name}:{version}".format( + registry_url=registry_url, image_name=image_name, version=bento_version + ) + ) + + logger.debug("Building docker image: %s", ecr_tag) + for line in docker_api.build( + path=snapshot_path, dockerfile="Dockerfile-sagemaker", tag=ecr_tag + ): + process_docker_api_line(line) + + try: + ecr_client.describe_repositories(repositoryNames=[image_name])["repositories"] + except ecr_client.exceptions.RepositoryNotFoundException: + ecr_client.create_repository(repositoryName=image_name) + + logger.debug("Pushing image to AWS ECR at %s", ecr_tag) + for line in docker_api.push(ecr_tag, stream=True, auth_config=auth_config_payload): + process_docker_api_line(line) + logger.debug("Finished pushing image: %s", ecr_tag) + return ecr_tag + + +# Sagemaker response status: 'OutOfService'|'Creating'|'Updating'| +# 'SystemUpdating'|'RollingBack'|'InService'| +# 'Deleting'|'Failed' +ENDPOINT_STATUS_TO_STATE = { + "InService": DeploymentState.RUNNING, + "Deleting": DeploymentState.INACTIVATED, + "Creating": DeploymentState.PENDING, + "Updating": DeploymentState.PENDING, + "RollingBack": DeploymentState.PENDING, + "SystemUpdating": DeploymentState.PENDING, + "OutOfService": DeploymentState.INACTIVATED, + "Failed": DeploymentState.ERROR, +} + + +def _aws_client_error_to_bentoml_exception(e, message_prefix=None): + """parse botocore.exceptions.ClientError into Bento StatusProto + + We handle two most common errors when deploying to Sagemaker. + 1. Authenication issue/invalid access(InvalidSignatureException) + 2. resources not found (ValidationException) + It will return correlated StatusProto(NOT_FOUND, UNAUTHENTICATED) + + Args: + e: ClientError from botocore.exceptions + Returns: + StatusProto + """ + error_response = e.response.get('Error', {}) + error_code = error_response.get('Code', 'Unknown') + error_message = error_response.get('Message', 'Unknown') + error_log_message = ( + f'AWS ClientError - operation: {e.operation_name}, ' + f'code: {error_code}, message: {error_message}' + ) + if message_prefix: + error_log_message = f'{message_prefix}; {error_log_message}' + logger.error(error_log_message) + return AWSServiceError(error_log_message) + + +def _get_sagemaker_resource_names(deployment_pb): + sagemaker_model_name = generate_aws_compatible_string( + (deployment_pb.namespace, 10), + (deployment_pb.name, 12), + (deployment_pb.spec.bento_name, 20), + (deployment_pb.spec.bento_version, 18), + ) + sagemaker_endpoint_config_name = generate_aws_compatible_string( + (deployment_pb.namespace, 10), + (deployment_pb.name, 12), + (deployment_pb.spec.bento_name, 20), + (deployment_pb.spec.bento_version, 18), + ) + sagemaker_endpoint_name = generate_aws_compatible_string( + deployment_pb.namespace, deployment_pb.name + ) + return sagemaker_model_name, sagemaker_endpoint_config_name, sagemaker_endpoint_name + + +def _delete_sagemaker_model_if_exist(sagemaker_client, sagemaker_model_name): + try: + delete_model_response = sagemaker_client.delete_model( + ModelName=sagemaker_model_name + ) + logger.debug("AWS delete model response: %s", delete_model_response) + except ClientError as e: + error_response = e.response.get('Error', {}) + error_code = error_response.get('Code', 'Unknown') + error_message = error_response.get('Message', 'Unknown') + if ( + error_code == 'ValidationException' + and "Could not find model" in error_message + ): + # sagemaker model does not exist + return + + raise _aws_client_error_to_bentoml_exception( + e, f"Failed to cleanup sagemaker model '{sagemaker_model_name}'" + ) + + return + + +def _delete_sagemaker_endpoint_config_if_exist( + sagemaker_client, sagemaker_endpoint_config_name +): + try: + delete_endpoint_config_response = sagemaker_client.delete_endpoint_config( + EndpointConfigName=sagemaker_endpoint_config_name + ) + logger.debug( + "AWS delete endpoint config response: %s", delete_endpoint_config_response + ) + except ClientError as e: + error_response = e.response.get('Error', {}) + error_code = error_response.get('Code', 'Unknown') + error_message = error_response.get('Message', 'Unknown') + if ( + error_code == 'ValidationException' + and "Could not find endpoint configuration" in error_message + ): + # endpoint config does not exist + return + + raise _aws_client_error_to_bentoml_exception( + e, + f"Failed to cleanup sagemaker endpoint config " + f"'{sagemaker_endpoint_config_name}' after creation failed", + ) + return + + +def _delete_sagemaker_endpoint_if_exist(sagemaker_client, sagemaker_endpoint_name): + try: + delete_endpoint_response = sagemaker_client.delete_endpoint( + EndpointName=sagemaker_endpoint_name + ) + logger.debug("AWS delete endpoint response: %s", delete_endpoint_response) + except ClientError as e: + error_response = e.response.get('Error', {}) + error_code = error_response.get('Code', 'Unknown') + error_message = error_response.get('Message', 'Unknown') + if ( + error_code == 'ValidationException' + and "Could not find endpoint" in error_message + ): + # sagemaker endpoint does not exist + return + + raise _aws_client_error_to_bentoml_exception( + e, f"Failed to delete sagemaker endpoint '{sagemaker_endpoint_name}'" + ) + + +def delete_sagemaker_deployment_resources_if_exist(deployment_pb): + sagemaker_config = deployment_pb.spec.sagemaker_operator_config + sagemaker_client = boto3.client('sagemaker', sagemaker_config.region) + + ( + sagemaker_model_name, + sagemaker_endpoint_config_name, + sagemaker_endpoint_name, + ) = _get_sagemaker_resource_names(deployment_pb) + + _delete_sagemaker_model_if_exist(sagemaker_client, sagemaker_model_name) + _delete_sagemaker_endpoint_config_if_exist( + sagemaker_client, sagemaker_endpoint_config_name + ) + _delete_sagemaker_endpoint_if_exist(sagemaker_client, sagemaker_endpoint_name) + + +def _init_sagemaker_project(sagemaker_project_dir, bento_path, docker_base_image): + shutil.copytree(bento_path, sagemaker_project_dir) + + with open(os.path.join(sagemaker_project_dir, 'Dockerfile-sagemaker'), "w") as f: + f.write( + BENTO_SERVICE_SAGEMAKER_DOCKERFILE.format( + docker_base_image=docker_base_image + ) + ) + + nginx_conf_path = os.path.join(os.path.dirname(__file__), 'nginx.conf') + shutil.copy(nginx_conf_path, os.path.join(sagemaker_project_dir, 'nginx.conf')) + + wsgi_py_path = os.path.join(os.path.dirname(__file__), 'wsgi.py') + shutil.copy(wsgi_py_path, os.path.join(sagemaker_project_dir, 'wsgi.py')) + + serve_file_path = os.path.join(os.path.dirname(__file__), 'serve') + shutil.copy(serve_file_path, os.path.join(sagemaker_project_dir, 'serve')) + + # permission 755 is required for entry script 'serve' + os.chmod(os.path.join(sagemaker_project_dir, "serve"), 0o755) + return sagemaker_project_dir + + +def _create_sagemaker_model( + sagemaker_client, sagemaker_model_name, ecr_image_path, spec +): + execution_role_arn = get_arn_role_from_current_aws_user() + + sagemaker_model_info = { + "ModelName": sagemaker_model_name, + "PrimaryContainer": { + "ContainerHostname": sagemaker_model_name, + "Image": ecr_image_path, + "Environment": { + "API_NAME": spec.api_name, + 'BENTOML_GUNICORN_TIMEOUT': str(spec.timeout), + }, + }, + "ExecutionRoleArn": execution_role_arn, + } + + # Will set envvar, if user defined gunicorn workers per instance. EnvVar needs + # to be string instead of the int. + if spec.num_of_gunicorn_workers_per_instance: + sagemaker_model_info['PrimaryContainer']['Environment'][ + 'BENTOML_GUNICORN_NUM_OF_WORKERS' + ] = str(spec.num_of_gunicorn_workers_per_instance) + + try: + create_model_response = sagemaker_client.create_model(**sagemaker_model_info) + except ClientError as e: + raise _aws_client_error_to_bentoml_exception( + e, "Failed to create sagemaker model" + ) + logger.debug("AWS create model response: %s", create_model_response) + + +def _create_sagemaker_endpoint_config( + sagemaker_client, sagemaker_model_name, endpoint_config_name, sagemaker_config +): + production_variants = [ + { + "VariantName": sagemaker_model_name, + "ModelName": sagemaker_model_name, + "InitialInstanceCount": sagemaker_config.instance_count, + "InstanceType": sagemaker_config.instance_type, + } + ] + + logger.debug("Creating Sagemaker endpoint %s configuration", endpoint_config_name) + try: + create_config_response = sagemaker_client.create_endpoint_config( + EndpointConfigName=endpoint_config_name, + ProductionVariants=production_variants, + ) + except ClientError as e: + raise _aws_client_error_to_bentoml_exception( + e, "Failed to create sagemaker endpoint config" + ) + logger.debug("AWS create endpoint config response: %s", create_config_response) + + +def _create_sagemaker_endpoint(sagemaker_client, endpoint_name, endpoint_config_name): + try: + logger.debug("Creating sagemaker endpoint %s", endpoint_name) + create_endpoint_response = sagemaker_client.create_endpoint( + EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name + ) + logger.debug("AWS create endpoint response: %s", create_endpoint_response) + except ClientError as e: + raise _aws_client_error_to_bentoml_exception( + e, "Failed to create sagemaker endpoint" + ) + + +def _update_sagemaker_endpoint(sagemaker_client, endpoint_name, endpoint_config_name): + try: + logger.debug("Updating sagemaker endpoint %s", endpoint_name) + update_endpoint_response = sagemaker_client.update_endpoint( + EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name + ) + logger.debug("AWS update endpoint response: %s", str(update_endpoint_response)) + except ClientError as e: + raise _aws_client_error_to_bentoml_exception( + e, "Failed to update sagemaker endpoint" + ) + + +class SageMakerDeploymentOperator(DeploymentOperatorBase): + def add(self, deployment_pb): + try: + deployment_spec = deployment_pb.spec + sagemaker_config = deployment_spec.sagemaker_operator_config + sagemaker_config.region = ( + sagemaker_config.region or get_default_aws_region() + ) + if not sagemaker_config.region: + raise InvalidArgument('AWS region is missing') + + ensure_docker_available_or_raise() + if sagemaker_config is None: + raise YataiDeploymentException('Sagemaker configuration is missing.') + + bento_pb = self.yatai_service.GetBento( + GetBentoRequest( + bento_name=deployment_spec.bento_name, + bento_version=deployment_spec.bento_version, + ) + ) + if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3): + raise BentoMLException( + 'BentoML currently not support {} repository'.format( + BentoUri.StorageType.Name(bento_pb.bento.uri.type) + ) + ) + return self._add(deployment_pb, bento_pb, bento_pb.bento.uri.uri) + + except BentoMLException as error: + deployment_pb.state.state = DeploymentState.ERROR + deployment_pb.state.error_message = ( + f'Error creating SageMaker deployment: {str(error)}' + ) + return ApplyDeploymentResponse( + status=error.status_proto, deployment=deployment_pb + ) + + def _add(self, deployment_pb, bento_pb, bento_path): + if loader._is_remote_path(bento_path): + with loader._resolve_remote_bundle_path(bento_path) as local_path: + return self._add(deployment_pb, bento_pb, local_path) + + deployment_spec = deployment_pb.spec + sagemaker_config = deployment_spec.sagemaker_operator_config + + raise_if_api_names_not_found_in_bento_service_metadata( + bento_pb.bento.bento_service_metadata, [sagemaker_config.api_name] + ) + + sagemaker_client = boto3.client('sagemaker', sagemaker_config.region) + + with TempDirectory() as temp_dir: + sagemaker_project_dir = os.path.join(temp_dir, deployment_spec.bento_name) + _init_sagemaker_project( + sagemaker_project_dir, + bento_path, + bento_pb.bento.bento_service_metadata.env.docker_base_image, + ) + ecr_image_path = create_and_push_docker_image_to_ecr( + sagemaker_config.region, + deployment_spec.bento_name, + deployment_spec.bento_version, + sagemaker_project_dir, + ) + + try: + ( + sagemaker_model_name, + sagemaker_endpoint_config_name, + sagemaker_endpoint_name, + ) = _get_sagemaker_resource_names(deployment_pb) + + _create_sagemaker_model( + sagemaker_client, sagemaker_model_name, ecr_image_path, sagemaker_config + ) + _create_sagemaker_endpoint_config( + sagemaker_client, + sagemaker_model_name, + sagemaker_endpoint_config_name, + sagemaker_config, + ) + _create_sagemaker_endpoint( + sagemaker_client, + sagemaker_endpoint_name, + sagemaker_endpoint_config_name, + ) + except AWSServiceError as e: + delete_sagemaker_deployment_resources_if_exist(deployment_pb) + raise e + + return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb) + + def update(self, deployment_pb, previous_deployment): + try: + ensure_docker_available_or_raise() + deployment_spec = deployment_pb.spec + bento_pb = self.yatai_service.GetBento( + GetBentoRequest( + bento_name=deployment_spec.bento_name, + bento_version=deployment_spec.bento_version, + ) + ) + if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3): + raise BentoMLException( + 'BentoML currently not support {} repository'.format( + BentoUri.StorageType.Name(bento_pb.bento.uri.type) + ) + ) + return self._update( + deployment_pb, previous_deployment, bento_pb, bento_pb.bento.uri.uri + ) + except BentoMLException as error: + deployment_pb.state.state = DeploymentState.ERROR + deployment_pb.state.error_message = ( + f'Error updating SageMaker deployment: {str(error)}' + ) + return ApplyDeploymentResponse( + status=error.status_proto, deployment=deployment_pb + ) + + def _update(self, deployment_pb, current_deployment, bento_pb, bento_path): + if loader._is_remote_path(bento_path): + with loader._resolve_remote_bundle_path(bento_path) as local_path: + return self._update( + deployment_pb, current_deployment, bento_pb, local_path + ) + updated_deployment_spec = deployment_pb.spec + updated_sagemaker_config = updated_deployment_spec.sagemaker_operator_config + sagemaker_client = boto3.client('sagemaker', updated_sagemaker_config.region) + + try: + raise_if_api_names_not_found_in_bento_service_metadata( + bento_pb.bento.bento_service_metadata, + [updated_sagemaker_config.api_name], + ) + describe_latest_deployment_state = self.describe(deployment_pb) + current_deployment_spec = current_deployment.spec + current_sagemaker_config = current_deployment_spec.sagemaker_operator_config + latest_deployment_state = json.loads( + describe_latest_deployment_state.state.info_json + ) + + current_ecr_image_tag = latest_deployment_state['ProductionVariants'][0][ + 'DeployedImages' + ][0]['SpecifiedImage'] + if ( + updated_deployment_spec.bento_name != current_deployment_spec.bento_name + or updated_deployment_spec.bento_version + != current_deployment_spec.bento_version + ): + logger.debug( + 'BentoService tag is different from current deployment, ' + 'creating new docker image and push to ECR' + ) + with TempDirectory() as temp_dir: + sagemaker_project_dir = os.path.join( + temp_dir, updated_deployment_spec.bento_name + ) + _init_sagemaker_project( + sagemaker_project_dir, + bento_path, + bento_pb.bento.bento_service_metadata.env.docker_base_image, + ) + ecr_image_path = create_and_push_docker_image_to_ecr( + updated_sagemaker_config.region, + updated_deployment_spec.bento_name, + updated_deployment_spec.bento_version, + sagemaker_project_dir, + ) + else: + logger.debug('Using existing ECR image for Sagemaker model') + ecr_image_path = current_ecr_image_tag + + ( + updated_sagemaker_model_name, + updated_sagemaker_endpoint_config_name, + sagemaker_endpoint_name, + ) = _get_sagemaker_resource_names(deployment_pb) + ( + current_sagemaker_model_name, + current_sagemaker_endpoint_config_name, + _, + ) = _get_sagemaker_resource_names(current_deployment) + + if ( + updated_sagemaker_config.api_name != current_sagemaker_config.api_name + or updated_sagemaker_config.num_of_gunicorn_workers_per_instance + != current_sagemaker_config.num_of_gunicorn_workers_per_instance + or ecr_image_path != current_ecr_image_tag + ): + logger.debug( + 'Sagemaker model requires update. Delete current sagemaker model %s' + 'and creating new model %s', + current_sagemaker_model_name, + updated_sagemaker_model_name, + ) + _delete_sagemaker_model_if_exist( + sagemaker_client, current_sagemaker_model_name + ) + _create_sagemaker_model( + sagemaker_client, + updated_sagemaker_model_name, + ecr_image_path, + updated_sagemaker_config, + ) + # When bento service tag is not changed, we need to delete the current + # endpoint configuration in order to create new one to avoid name collation + if ( + current_sagemaker_endpoint_config_name + == updated_sagemaker_endpoint_config_name + ): + logger.debug( + 'Current sagemaker config name %s is same as updated one, ' + 'delete it before create new endpoint config', + current_sagemaker_endpoint_config_name, + ) + _delete_sagemaker_endpoint_config_if_exist( + sagemaker_client, current_sagemaker_endpoint_config_name + ) + logger.debug( + 'Create new endpoint configuration %s', + updated_sagemaker_endpoint_config_name, + ) + _create_sagemaker_endpoint_config( + sagemaker_client, + updated_sagemaker_model_name, + updated_sagemaker_endpoint_config_name, + updated_sagemaker_config, + ) + logger.debug( + 'Updating endpoint to new endpoint configuration %s', + updated_sagemaker_endpoint_config_name, + ) + _update_sagemaker_endpoint( + sagemaker_client, + sagemaker_endpoint_name, + updated_sagemaker_endpoint_config_name, + ) + logger.debug( + 'Delete old sagemaker endpoint config %s', + current_sagemaker_endpoint_config_name, + ) + _delete_sagemaker_endpoint_config_if_exist( + sagemaker_client, current_sagemaker_endpoint_config_name + ) + except AWSServiceError as e: + delete_sagemaker_deployment_resources_if_exist(deployment_pb) + raise e + + return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb) + + def delete(self, deployment_pb): + try: + deployment_spec = deployment_pb.spec + sagemaker_config = deployment_spec.sagemaker_operator_config + sagemaker_config.region = ( + sagemaker_config.region or get_default_aws_region() + ) + if not sagemaker_config.region: + raise InvalidArgument('AWS region is missing') + + delete_sagemaker_deployment_resources_if_exist(deployment_pb) + + return DeleteDeploymentResponse(status=Status.OK()) + except BentoMLException as error: + return DeleteDeploymentResponse(status=error.status_proto) + + def describe(self, deployment_pb): + try: + deployment_spec = deployment_pb.spec + sagemaker_config = deployment_spec.sagemaker_operator_config + sagemaker_config.region = ( + sagemaker_config.region or get_default_aws_region() + ) + if not sagemaker_config.region: + raise InvalidArgument('AWS region is missing') + sagemaker_client = boto3.client('sagemaker', sagemaker_config.region) + _, _, sagemaker_endpoint_name = _get_sagemaker_resource_names(deployment_pb) + + try: + endpoint_status_response = sagemaker_client.describe_endpoint( + EndpointName=sagemaker_endpoint_name + ) + except ClientError as e: + raise _aws_client_error_to_bentoml_exception( + e, + f"Failed to fetch current status of sagemaker endpoint " + f"'{sagemaker_endpoint_name}'", + ) + + logger.debug("AWS describe endpoint response: %s", endpoint_status_response) + endpoint_status = endpoint_status_response["EndpointStatus"] + + service_state = ENDPOINT_STATUS_TO_STATE[endpoint_status] + + deployment_state = DeploymentState( + state=service_state, + info_json=json.dumps(endpoint_status_response, default=str), + ) + deployment_state.timestamp.GetCurrentTime() + + return DescribeDeploymentResponse( + state=deployment_state, status=Status.OK() + ) + except BentoMLException as error: + return DescribeDeploymentResponse(status=error.status_proto) diff --git a/bentoml/yatai/deployment/sagemaker/sagemaker_serve.py b/bentoml/yatai/deployment/sagemaker/serve similarity index 96% rename from bentoml/yatai/deployment/sagemaker/sagemaker_serve.py rename to bentoml/yatai/deployment/sagemaker/serve index 5dba98a8b83..d354092f21e 100644 --- a/bentoml/yatai/deployment/sagemaker/sagemaker_serve.py +++ b/bentoml/yatai/deployment/sagemaker/serve @@ -37,7 +37,7 @@ def _serve(): subprocess.check_call(['ln', '-sf', '/dev/stdout', '/var/log/nginx/access.log']) subprocess.check_call(['ln', '-sf', '/dev/stderr', '/var/log/nginx/error.log']) - nginx = subprocess.Popen(['nginx', '-c', '/opt/program/nginx.conf']) + nginx = subprocess.Popen(['nginx', '-c', '/bento/nginx.conf']) gunicorn_app = subprocess.Popen( [ 'gunicorn', diff --git a/bentoml/yatai/deployment/sagemaker/sagemaker_wsgi.py b/bentoml/yatai/deployment/sagemaker/wsgi.py similarity index 95% rename from bentoml/yatai/deployment/sagemaker/sagemaker_wsgi.py rename to bentoml/yatai/deployment/sagemaker/wsgi.py index e4036a9a56e..98626fe3187 100644 --- a/bentoml/yatai/deployment/sagemaker/sagemaker_wsgi.py +++ b/bentoml/yatai/deployment/sagemaker/wsgi.py @@ -18,6 +18,6 @@ from bentoml.yatai.deployment.sagemaker.model_server import BentomlSagemakerServer api_name = os.environ.get('API_NAME', None) -model_service = load('/opt/program') +model_service = load('/bento') server = BentomlSagemakerServer(model_service, api_name) app = server.app diff --git a/bentoml/yatai/repository/__init__.py b/bentoml/yatai/repository/__init__.py index 18a33024079..4b378e85c85 100644 --- a/bentoml/yatai/repository/__init__.py +++ b/bentoml/yatai/repository/__init__.py @@ -11,281 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import os -import shutil -import boto3 -import logging -from pathlib import Path -from abc import abstractmethod, ABCMeta -from urllib.parse import urlparse - -from botocore.exceptions import ClientError - -from bentoml import config -from bentoml.exceptions import YataiRepositoryException -from bentoml.utils.s3 import is_s3_url -from bentoml.yatai.proto.repository_pb2 import BentoUri - - -logger = logging.getLogger(__name__) - - -class BentoRepositoryBase(object): - """ - BentoRepository is the interface for managing saved Bentos over file system or - cloud storage systems. - - A Bento is a BentoService serialized into a standard file format that can be - easily load back to a Python session, installed as PyPI package, or run in Conda - or docker environment with all dependencies configured automatically - """ - - __metaclass__ = ABCMeta - - @abstractmethod - def add(self, bento_name, bento_version): - """ - Proposing to add a saved BentoService to target repository by providing the - bento name and version. - Return value is an URL(file path or s3 path for example), that is ready for - the client to upload saved Bento files. - """ - - @abstractmethod - def get(self, bento_name, bento_version): - """ - Get a file path to the saved Bento files, path must be accessible form local - machine either through NFS or pre-downloaded to local machine - """ - - @abstractmethod - def dangerously_delete(self, bento_name, bento_version): - """ - Deleting the Bento files that was added to this repository earlier, this may - break existing deployments or create issues when doing deployment rollback - """ - - -class _LocalBentoRepository(BentoRepositoryBase): - def __init__(self, base_url): - if not os.path.exists(base_url): - # make sure local repo base path exist - os.mkdir(base_url) - - self.base_path = base_url - self.uri_type = BentoUri.LOCAL - - def add(self, bento_name, bento_version): - # Full path containing saved BentoService bundle, it the base path with service - # name and service version as prefix. e.g.: - # with base_path = '/tmp/my_bento_repo/', the saved bento will resolve in - # the directory: '/tmp/my_bento_repo/service_name/version/' - target_dir = os.path.join(self.base_path, bento_name, bento_version) - - # Ensure parent directory exist - Path(os.path.join(self.base_path), bento_name).mkdir( - parents=True, exist_ok=True - ) - - # Raise if target bento version already exist in storage - if os.path.exists(target_dir): - raise YataiRepositoryException( - "Existing BentoService bundle {name}:{version} found in repository: " - "{target_dir}".format( - name=bento_name, version=bento_version, target_dir=target_dir - ) - ) - - # Create target directory for upload - os.mkdir(target_dir) - - return BentoUri(type=self.uri_type, uri=target_dir) - - def get(self, bento_name, bento_version): - saved_path = os.path.join(self.base_path, bento_name, bento_version) - if not os.path.exists(saved_path): - raise YataiRepositoryException( - "Bento {}:{} not found in target repository".format( - bento_name, bento_version - ) - ) - return saved_path - - def dangerously_delete(self, bento_name, bento_version): - saved_path = os.path.join(self.base_path, bento_name, bento_version) - try: - return shutil.rmtree(saved_path) - except FileNotFoundError: - logger.warning( - "BentoService %s:%s has already been deleted from local storage", - bento_name, - bento_version, - ) - return - - -class _S3BentoRepository(BentoRepositoryBase): - def __init__(self, base_url, s3_endpoint_url=None): - self.uri_type = BentoUri.S3 - - parse_result = urlparse(base_url) - self.bucket = parse_result.netloc - self.base_path = parse_result.path.lstrip('/') - - s3_client_args = {} - signature_version = config('yatai_service').get('S3_SIGNATURE_VERSION') - s3_client_args['config'] = boto3.session.Config( - signature_version=signature_version - ) - if s3_endpoint_url is not None: - s3_client_args['endpoint_url'] = s3_endpoint_url - self.s3_client = boto3.client("s3", **s3_client_args) - - @property - def _expiration(self): - return config('yatai').getint('bento_uri_default_expiration') - - def _get_object_name(self, bento_name, bento_version): - if self.base_path: - return "/".join([self.base_path, bento_name, bento_version]) + '.tar.gz' - else: - return "/".join([bento_name, bento_version]) + '.tar.gz' - - def add(self, bento_name, bento_version): - # Generate pre-signed s3 path for upload - - object_name = self._get_object_name(bento_name, bento_version) - try: - response = self.s3_client.generate_presigned_url( - 'put_object', - Params={'Bucket': self.bucket, 'Key': object_name}, - ExpiresIn=self._expiration, - ) - except Exception as e: - raise YataiRepositoryException( - "Not able to get pre-signed URL on S3. Error: {}".format(e) - ) - - return BentoUri( - type=self.uri_type, - uri='s3://{}/{}'.format(self.bucket, object_name), - s3_presigned_url=response, - ) - - def get(self, bento_name, bento_version): - # Return s3 path containing uploaded Bento files - - object_name = self._get_object_name(bento_name, bento_version) - - try: - response = self.s3_client.generate_presigned_url( - 'get_object', - Params={'Bucket': self.bucket, 'Key': object_name}, - ExpiresIn=self._expiration, - ) - return response - except Exception: # pylint: disable=broad-except - logger.error( - "Failed generating presigned URL for downloading saved bundle from s3," - "falling back to using s3 path and client side credential for" - "downloading with boto3" - ) - return 's3://{}/{}'.format(self.bucket, object_name) - - def dangerously_delete(self, bento_name, bento_version): - # Remove s3 path containing related Bento files - - object_name = self._get_object_name(bento_name, bento_version) - - try: - response = self.s3_client.delete_object(Bucket=self.bucket, Key=object_name) - DELETE_MARKER = 'DeleteMarker' # whether object is successfully deleted. - - # Note: as of boto3 v1.13.13. delete_object returns an incorrect format as - # expected from documentation. - # Expected format: - # { - # 'DeleteMarker': True|False, - # 'VersionId': 'string', - # 'RequestCharged': 'requester' - # } - # Current return: - # { - # 'ResponseMetadata': { - # 'RequestId': '****************', - # 'HostId': '*****/******', - # 'HTTPStatusCode': 204, - # 'HTTPHeaders': { - # 'x-amz-id-2': '*****/xxxxx', - # 'x-amz-request-id': '332EE9F7AB555D2B', - # 'date': 'Tue, 19 May 2020 19:46:57 GMT', - # 'server': 'AmazonS3' - # }, - # 'RetryAttempts': 0 - # } - # } - # An open issue on github: https://github.com/boto/boto3/issues/759 - if DELETE_MARKER in response: - if response[DELETE_MARKER]: - return - else: - logger.warning( - f"BentoML has deleted service '{bento_name}:{bento_version}' " - f"from YataiService records, but it failed to delete the saved " - f"bundle files stored in s3://{self.bucket}/{object_name}, " - f"the files may have already been deleted by the user." - ) - return - elif 'ResponseMetadata' in response: - # Note: Use head_object to 'check' is the object deleted or not. - # head_object only try to retrieve the metadata without returning - # the object itself. - try: - self.s3_client.head_object(Bucket=self.bucket, Key=object_name) - logger.warning( - f"BentoML has deleted service '{bento_name}:{bento_version}' " - f"from YataiService records, but it failed to delete the saved " - f"bundle files stored in s3://{self.bucket}/{object_name}, " - f"the files may have already been deleted by the user." - ) - except ClientError as e: - # expected ClientError with Code 404, as target object should be - # deleted and 'head_object' should raise - error_response = e.response.get('Error', {}) - error_code = error_response.get('Code', None) - if error_code == '404': - # Error code 404 means target file object does not exist, as - # expected after delete_object call - return - else: - # unexpected boto3 ClientError - raise e - else: - raise YataiRepositoryException( - 'Unrecognized response format from s3 delete_object' - ) - except Exception as e: - raise YataiRepositoryException( - "Not able to delete object on S3. Error: {}".format(e) - ) - - -class BentoRepository(BentoRepositoryBase): - def __init__(self, base_url=None, s3_endpoint_url=None): - if base_url is None: - base_url = config().get('default_repository_base_url') - - if is_s3_url(base_url): - self._repo = _S3BentoRepository(base_url, s3_endpoint_url) - else: - self._repo = _LocalBentoRepository(base_url) - - def add(self, bento_name, bento_version): - return self._repo.add(bento_name, bento_version) - - def get(self, bento_name, bento_version): - return self._repo.get(bento_name, bento_version) - - def dangerously_delete(self, bento_name, bento_version): - return self._repo.dangerously_delete(bento_name, bento_version) diff --git a/bentoml/yatai/repository/base_repository.py b/bentoml/yatai/repository/base_repository.py new file mode 100644 index 00000000000..f53e913ee5d --- /dev/null +++ b/bentoml/yatai/repository/base_repository.py @@ -0,0 +1,52 @@ +# Copyright 2019 Atalaya Tech, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from abc import ABCMeta, abstractmethod + + +class BaseRepository(object): + """ + BaseRepository is the interface for managing BentoML saved bundle files over either + a file system or a cloud blob storage systems such as AWS S3 or MinIO + + A BentoML saved bundle is a standard file format that contains trained model files + as well as serving endpoint code, input/output spec, dependency specs and deployment + configs. + """ + + __metaclass__ = ABCMeta + + @abstractmethod + def add(self, bento_name, bento_version): + """ + Proposing to add a saved BentoService to target repository by providing the + bento name and version. + Return value is an URL(file path or s3 path for example), that is ready for + the client to upload saved Bento files. + """ + + @abstractmethod + def get(self, bento_name, bento_version): + """ + Get a file path to the saved Bento files, path must be accessible form local + machine either through NFS or pre-downloaded to local machine + """ + + @abstractmethod + def dangerously_delete(self, bento_name, bento_version): + """ + Deleting the Bento files that was added to this repository earlier, this may + break existing deployments or create issues when doing deployment rollback + """ diff --git a/bentoml/yatai/repository/local_repository.py b/bentoml/yatai/repository/local_repository.py new file mode 100644 index 00000000000..d170ff401ba --- /dev/null +++ b/bentoml/yatai/repository/local_repository.py @@ -0,0 +1,87 @@ +# Copyright 2019 Atalaya Tech, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +import logging +from pathlib import Path + +from bentoml.exceptions import YataiRepositoryException +from bentoml.yatai.proto.repository_pb2 import BentoUri +from bentoml.yatai.repository.base_repository import BaseRepository + + +logger = logging.getLogger(__name__) + + +class LocalRepository(BaseRepository): + def __init__(self, base_url): + """ + :param base_url: local file system path that will be used as the root directory + of this saved bundle repository + """ + if not os.path.exists(base_url): + # make sure local repo base path exist + os.mkdir(base_url) + + self.base_path = base_url + self.uri_type = BentoUri.LOCAL + + def add(self, bento_name, bento_version): + # Full path containing saved BentoService bundle, it the base path with service + # name and service version as prefix. e.g.: + # with base_path = '/tmp/my_bento_repo/', the saved bento will resolve in + # the directory: '/tmp/my_bento_repo/service_name/version/' + target_dir = os.path.join(self.base_path, bento_name, bento_version) + + # Ensure parent directory exist + Path(os.path.join(self.base_path), bento_name).mkdir( + parents=True, exist_ok=True + ) + + # Raise if target bento version already exist in storage + if os.path.exists(target_dir): + raise YataiRepositoryException( + "Existing BentoService bundle {name}:{version} found in repository: " + "{target_dir}".format( + name=bento_name, version=bento_version, target_dir=target_dir + ) + ) + + # Create target directory for upload + os.mkdir(target_dir) + + return BentoUri(type=self.uri_type, uri=target_dir) + + def get(self, bento_name, bento_version): + saved_path = os.path.join(self.base_path, bento_name, bento_version) + if not os.path.exists(saved_path): + raise YataiRepositoryException( + "Bento {}:{} not found in target repository".format( + bento_name, bento_version + ) + ) + return saved_path + + def dangerously_delete(self, bento_name, bento_version): + saved_path = os.path.join(self.base_path, bento_name, bento_version) + try: + return shutil.rmtree(saved_path) + except FileNotFoundError: + logger.warning( + "BentoService %s:%s has already been deleted from local storage", + bento_name, + bento_version, + ) + return diff --git a/bentoml/yatai/repository/repository.py b/bentoml/yatai/repository/repository.py new file mode 100644 index 00000000000..a78f2a3e4c3 --- /dev/null +++ b/bentoml/yatai/repository/repository.py @@ -0,0 +1,46 @@ +# Copyright 2019 Atalaya Tech, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bentoml import config +from bentoml.utils.s3 import is_s3_url +from bentoml.yatai.repository.base_repository import BaseRepository +from bentoml.yatai.repository.local_repository import LocalRepository +from bentoml.yatai.repository.s3_repository import S3Repository + + +class Repository(BaseRepository): + def __init__(self, base_url=None, s3_endpoint_url=None): + """ + :param base_url: either a local file system path or a s3-compatible path such as + s3://my-bucket/some-prefix/ + :param s3_endpoint_url: configuring S3Repository to talk to a specific s3 + endpoint + """ + + if base_url is None: + base_url = config().get('default_repository_base_url') + + if is_s3_url(base_url): + self._repo = S3Repository(base_url, s3_endpoint_url) + else: + self._repo = LocalRepository(base_url) + + def add(self, bento_name, bento_version): + return self._repo.add(bento_name, bento_version) + + def get(self, bento_name, bento_version): + return self._repo.get(bento_name, bento_version) + + def dangerously_delete(self, bento_name, bento_version): + return self._repo.dangerously_delete(bento_name, bento_version) diff --git a/bentoml/yatai/repository/s3_repository.py b/bentoml/yatai/repository/s3_repository.py new file mode 100644 index 00000000000..25097b62a41 --- /dev/null +++ b/bentoml/yatai/repository/s3_repository.py @@ -0,0 +1,172 @@ +# Copyright 2019 Atalaya Tech, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from urllib.parse import urlparse + +import boto3 +from botocore.exceptions import ClientError + +from bentoml import config +from bentoml.exceptions import YataiRepositoryException +from bentoml.yatai.proto.repository_pb2 import BentoUri +from bentoml.yatai.repository.base_repository import BaseRepository + +logger = logging.getLogger(__name__) + + +class S3Repository(BaseRepository): + def __init__(self, base_url, s3_endpoint_url=None): + self.uri_type = BentoUri.S3 + + parse_result = urlparse(base_url) + self.bucket = parse_result.netloc + self.base_path = parse_result.path.lstrip('/') + + s3_client_args = {} + signature_version = config('yatai_service').get('S3_SIGNATURE_VERSION') + s3_client_args['config'] = boto3.session.Config( + signature_version=signature_version + ) + if s3_endpoint_url is not None: + s3_client_args['endpoint_url'] = s3_endpoint_url + self.s3_client = boto3.client("s3", **s3_client_args) + + @property + def _expiration(self): + return config('yatai').getint('bento_uri_default_expiration') + + def _get_object_name(self, bento_name, bento_version): + if self.base_path: + return "/".join([self.base_path, bento_name, bento_version]) + '.tar.gz' + else: + return "/".join([bento_name, bento_version]) + '.tar.gz' + + def add(self, bento_name, bento_version): + # Generate pre-signed s3 path for upload + + object_name = self._get_object_name(bento_name, bento_version) + try: + response = self.s3_client.generate_presigned_url( + 'put_object', + Params={'Bucket': self.bucket, 'Key': object_name}, + ExpiresIn=self._expiration, + ) + except Exception as e: + raise YataiRepositoryException( + "Not able to get pre-signed URL on S3. Error: {}".format(e) + ) + + return BentoUri( + type=self.uri_type, + uri='s3://{}/{}'.format(self.bucket, object_name), + s3_presigned_url=response, + ) + + def get(self, bento_name, bento_version): + # Return s3 path containing uploaded Bento files + + object_name = self._get_object_name(bento_name, bento_version) + + try: + response = self.s3_client.generate_presigned_url( + 'get_object', + Params={'Bucket': self.bucket, 'Key': object_name}, + ExpiresIn=self._expiration, + ) + return response + except Exception: # pylint: disable=broad-except + logger.error( + "Failed generating presigned URL for downloading saved bundle from s3," + "falling back to using s3 path and client side credential for" + "downloading with boto3" + ) + return 's3://{}/{}'.format(self.bucket, object_name) + + def dangerously_delete(self, bento_name, bento_version): + # Remove s3 path containing related Bento files + + object_name = self._get_object_name(bento_name, bento_version) + + try: + response = self.s3_client.delete_object(Bucket=self.bucket, Key=object_name) + DELETE_MARKER = 'DeleteMarker' # whether object is successfully deleted. + + # Note: as of boto3 v1.13.13. delete_object returns an incorrect format as + # expected from documentation. + # Expected format: + # { + # 'DeleteMarker': True|False, + # 'VersionId': 'string', + # 'RequestCharged': 'requester' + # } + # Current return: + # { + # 'ResponseMetadata': { + # 'RequestId': '****************', + # 'HostId': '*****/******', + # 'HTTPStatusCode': 204, + # 'HTTPHeaders': { + # 'x-amz-id-2': '*****/xxxxx', + # 'x-amz-request-id': '332EE9F7AB555D2B', + # 'date': 'Tue, 19 May 2020 19:46:57 GMT', + # 'server': 'AmazonS3' + # }, + # 'RetryAttempts': 0 + # } + # } + # An open issue on github: https://github.com/boto/boto3/issues/759 + if DELETE_MARKER in response: + if response[DELETE_MARKER]: + return + else: + logger.warning( + f"BentoML has deleted service '{bento_name}:{bento_version}' " + f"from YataiService records, but it failed to delete the saved " + f"bundle files stored in s3://{self.bucket}/{object_name}, " + f"the files may have already been deleted by the user." + ) + return + elif 'ResponseMetadata' in response: + # Note: Use head_object to 'check' is the object deleted or not. + # head_object only try to retrieve the metadata without returning + # the object itself. + try: + self.s3_client.head_object(Bucket=self.bucket, Key=object_name) + logger.warning( + f"BentoML has deleted service '{bento_name}:{bento_version}' " + f"from YataiService records, but it failed to delete the saved " + f"bundle files stored in s3://{self.bucket}/{object_name}, " + f"the files may have already been deleted by the user." + ) + except ClientError as e: + # expected ClientError with Code 404, as target object should be + # deleted and 'head_object' should raise + error_response = e.response.get('Error', {}) + error_code = error_response.get('Code', None) + if error_code == '404': + # Error code 404 means target file object does not exist, as + # expected after delete_object call + return + else: + # unexpected boto3 ClientError + raise e + else: + raise YataiRepositoryException( + 'Unrecognized response format from s3 delete_object' + ) + except Exception as e: + raise YataiRepositoryException( + "Not able to delete object on S3. Error: {}".format(e) + ) diff --git a/bentoml/yatai/validator/__init__.py b/bentoml/yatai/validator/__init__.py index e1b276e1b80..e268516fbb2 100644 --- a/bentoml/yatai/validator/__init__.py +++ b/bentoml/yatai/validator/__init__.py @@ -12,131 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cerberus import Validator +from bentoml.yatai.validator.deployment_pb_validator import validate_deployment_pb -from bentoml.utils import ProtoMessageToDict -from bentoml.yatai.proto.deployment_pb2 import DeploymentSpec, DeploymentState - -deployment_schema = { - 'name': {'type': 'string', 'required': True, 'minlength': 4}, - # namespace is optional - YataiService will fill-in the default namespace configured - # when it is missing in the apply deployment request - 'namespace': {'type': 'string', 'required': False, 'minlength': 3}, - 'labels': {'type': 'dict', 'allow_unknown': True}, - 'annotations': {'type': 'dict', 'allow_unknown': True}, - 'created_at': {'type': 'string'}, - 'last_updated_at': {'type': 'string'}, - 'spec': { - 'type': 'dict', - 'required': True, - 'schema': { - 'operator': { - 'type': 'string', - 'required': True, - 'allowed': DeploymentSpec.DeploymentOperator.keys(), - }, - 'bento_name': {'type': 'string', 'required': True}, - 'bento_version': { - 'type': 'string', - 'required': True, - 'bento_service_version': True, - }, - 'custom_operator_config': { - 'type': 'dict', - 'schema': { - 'name': {'type': 'string'}, - 'config': {'type': 'dict', 'allow_unknown': True}, - }, - }, - 'sagemaker_operator_config': { - 'type': 'dict', - 'schema': { - 'api_name': {'type': 'string', 'required': True, 'minlength': 3}, - 'instance_type': {'type': 'string', 'required': True}, - 'instance_count': {'type': 'integer', 'min': 1, 'required': True}, - 'region': {'type': 'string'}, - 'num_of_gunicorn_workers_per_instance': { - 'type': 'integer', - 'min': 1, - }, - 'timeout': {'type': 'integer', 'min': 1}, - }, - }, - 'aws_lambda_operator_config': { - 'type': 'dict', - 'schema': { - 'region': {'type': 'string'}, - 'api_name': {'type': 'string', 'minlength': 3}, - 'memory_size': {'type': 'integer', 'aws_lambda_memory': True}, - 'timeout': {'type': 'integer', 'min': 1, 'max': 900}, - }, - }, - 'kubernetes_operator_config': { - 'type': 'dict', - 'schema': { - 'kube_namespace': {'type': 'string'}, - 'replicas': {'type': 'integer', 'min': 1}, - 'service_name': {'type': 'string'}, - 'service_type': {'type': 'string'}, - }, - }, - }, - }, - 'state': { - 'type': 'dict', - 'schema': { - 'state': {'type': 'string', 'allowed': DeploymentState.State.keys()}, - 'error_message': {'type': 'string'}, - 'info_json': {'type': 'string'}, - 'timestamp': {'type': 'string'}, - }, - }, -} - - -class YataiDeploymentValidator(Validator): - def _validate_aws_lambda_memory(self, aws_lambda_memory, field, value): - """ Test the memory size restriction for AWS Lambda. - - The rule's arguments are validated against this schema: - {'type': 'boolean'} - """ - if aws_lambda_memory: - if value > 3008 or value < 128: - self._error( - field, - 'AWS Lambda memory must be between 128 MB to 3,008 MB, ' - 'in 64 MB increments.', - ) - if value % 64 > 0: - self._error( - field, - 'AWS Lambda memory must be between 128 MB to 3,008 MB, ' - 'in 64 MB increments.', - ) - - def _validate_bento_service_version(self, bento_service_version, field, value): - """ Test the given BentoService version is not "latest" - - The rule's arguments are validated against this schema: - {'type': 'boolean'} - """ - if bento_service_version and value.lower() == "latest": - self._error( - field, - 'Must use specific "bento_version" in deployment, using "latest" is ' - 'an anti-pattern.', - ) - - -def validate_pb_schema(pb, schema): - pb_dict = ProtoMessageToDict(pb) - v = YataiDeploymentValidator(schema) - if v.validate(pb_dict): - return None - else: - return v.errors - - -def validate_deployment_pb_schema(pb): - return validate_pb_schema(pb, deployment_schema) +__all__ = ["validate_deployment_pb"] diff --git a/bentoml/yatai/validator/deployment_pb_validator.py b/bentoml/yatai/validator/deployment_pb_validator.py new file mode 100644 index 00000000000..dd15cf40c71 --- /dev/null +++ b/bentoml/yatai/validator/deployment_pb_validator.py @@ -0,0 +1,138 @@ +# Copyright 2019 Atalaya Tech, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cerberus import Validator + +from bentoml.utils import ProtoMessageToDict +from bentoml.yatai.proto.deployment_pb2 import DeploymentSpec, DeploymentState + +deployment_schema = { + 'name': {'type': 'string', 'required': True, 'minlength': 4}, + # namespace is optional - YataiService will fill-in the default namespace configured + # when it is missing in the apply deployment request + 'namespace': {'type': 'string', 'required': False, 'minlength': 3}, + 'labels': {'type': 'dict', 'allow_unknown': True}, + 'annotations': {'type': 'dict', 'allow_unknown': True}, + 'created_at': {'type': 'string'}, + 'last_updated_at': {'type': 'string'}, + 'spec': { + 'type': 'dict', + 'required': True, + 'schema': { + 'operator': { + 'type': 'string', + 'required': True, + 'allowed': DeploymentSpec.DeploymentOperator.keys(), + }, + 'bento_name': {'type': 'string', 'required': True}, + 'bento_version': { + 'type': 'string', + 'required': True, + 'bento_service_version': True, + }, + 'custom_operator_config': { + 'type': 'dict', + 'schema': { + 'name': {'type': 'string'}, + 'config': {'type': 'dict', 'allow_unknown': True}, + }, + }, + 'sagemaker_operator_config': { + 'type': 'dict', + 'schema': { + 'api_name': {'type': 'string', 'required': True, 'minlength': 3}, + 'instance_type': {'type': 'string', 'required': True}, + 'instance_count': {'type': 'integer', 'min': 1, 'required': True}, + 'region': {'type': 'string'}, + 'num_of_gunicorn_workers_per_instance': { + 'type': 'integer', + 'min': 1, + }, + 'timeout': {'type': 'integer', 'min': 1}, + }, + }, + 'aws_lambda_operator_config': { + 'type': 'dict', + 'schema': { + 'region': {'type': 'string'}, + 'api_name': {'type': 'string', 'minlength': 3}, + 'memory_size': {'type': 'integer', 'aws_lambda_memory': True}, + 'timeout': {'type': 'integer', 'min': 1, 'max': 900}, + }, + }, + 'kubernetes_operator_config': { + 'type': 'dict', + 'schema': { + 'kube_namespace': {'type': 'string'}, + 'replicas': {'type': 'integer', 'min': 1}, + 'service_name': {'type': 'string'}, + 'service_type': {'type': 'string'}, + }, + }, + }, + }, + 'state': { + 'type': 'dict', + 'schema': { + 'state': {'type': 'string', 'allowed': DeploymentState.State.keys()}, + 'error_message': {'type': 'string'}, + 'info_json': {'type': 'string'}, + 'timestamp': {'type': 'string'}, + }, + }, +} + + +class YataiDeploymentValidator(Validator): + def _validate_aws_lambda_memory(self, aws_lambda_memory, field, value): + """ Test the memory size restriction for AWS Lambda. + + The rule's arguments are validated against this schema: + {'type': 'boolean'} + """ + if aws_lambda_memory: + if value > 3008 or value < 128: + self._error( + field, + 'AWS Lambda memory must be between 128 MB to 3,008 MB, ' + 'in 64 MB increments.', + ) + if value % 64 > 0: + self._error( + field, + 'AWS Lambda memory must be between 128 MB to 3,008 MB, ' + 'in 64 MB increments.', + ) + + def _validate_bento_service_version(self, bento_service_version, field, value): + """ Test the given BentoService version is not "latest" + + The rule's arguments are validated against this schema: + {'type': 'boolean'} + """ + if bento_service_version and value.lower() == "latest": + self._error( + field, + 'Must use specific "bento_version" in deployment, using "latest" is ' + 'an anti-pattern.', + ) + + +def validate_deployment_pb(pb): + pb_dict = ProtoMessageToDict(pb) + v = YataiDeploymentValidator(deployment_schema) + if v.validate(pb_dict): + return None + else: + return v.errors diff --git a/bentoml/yatai/yatai_service_impl.py b/bentoml/yatai/yatai_service_impl.py index d6227f44dc5..a0a6d1130e2 100644 --- a/bentoml/yatai/yatai_service_impl.py +++ b/bentoml/yatai/yatai_service_impl.py @@ -38,13 +38,13 @@ from bentoml.yatai.deployment.operator import get_deployment_operator from bentoml.yatai.deployment.store import DeploymentStore from bentoml.exceptions import BentoMLException, InvalidArgument -from bentoml.yatai.repository import BentoRepository +from bentoml.yatai.repository.repository import Repository from bentoml.yatai.repository.metadata_store import BentoMetadataStore from bentoml.yatai.db import init_db from bentoml.yatai.status import Status from bentoml.yatai.proto import status_pb2 from bentoml.utils import ProtoMessageToDict -from bentoml.yatai.validator import validate_deployment_pb_schema +from bentoml.yatai.validator import validate_deployment_pb from bentoml import __version__ as BENTOML_VERSION @@ -70,7 +70,7 @@ def __init__( default_namespace = default_namespace or cfg.get('default_namespace') self.default_namespace = default_namespace - self.repo = BentoRepository(repo_base_url, s3_endpoint_url) + self.repo = Repository(repo_base_url, s3_endpoint_url) self.sess_maker = init_db(db_url) self.deployment_store = DeploymentStore(self.sess_maker) self.bento_metadata_store = BentoMetadataStore(self.sess_maker) @@ -88,7 +88,7 @@ def ApplyDeployment(self, request, context=None): request.deployment.namespace or self.default_namespace ) - validation_errors = validate_deployment_pb_schema(request.deployment) + validation_errors = validate_deployment_pb(request.deployment) if validation_errors: raise InvalidArgument( 'Failed to validate deployment. {errors}'.format( diff --git a/tests/deployment/aws_lambda/test_aws_lambda_deployment_operator.py b/tests/deployment/aws_lambda/test_aws_lambda_deployment_operator.py index 01955fa256c..7b18367261c 100644 --- a/tests/deployment/aws_lambda/test_aws_lambda_deployment_operator.py +++ b/tests/deployment/aws_lambda/test_aws_lambda_deployment_operator.py @@ -6,10 +6,10 @@ from moto import mock_s3 from ruamel.yaml import YAML -from bentoml.yatai.deployment.aws_lambda import ( - AwsLambdaDeploymentOperator, +from bentoml.yatai.deployment.aws_lambda.utils import init_sam_project +from bentoml.yatai.deployment.aws_lambda.operator import ( _create_aws_lambda_cloudformation_template_file, - init_sam_project, + AwsLambdaDeploymentOperator, ) from bentoml.yatai.proto.deployment_pb2 import Deployment, DeploymentState from bentoml.yatai.proto.repository_pb2 import ( @@ -176,7 +176,7 @@ def mock_lambda_related_operations(func): @patch('subprocess.check_output', MagicMock()) @mock_s3 @patch( - 'bentoml.yatai.deployment.aws_lambda.get_default_aws_region', + 'bentoml.yatai.deployment.aws_lambda.operator.get_default_aws_region', MagicMock(return_value='mock_region'), ) def mock_wrapper(*args, **kwargs): @@ -192,17 +192,18 @@ def mock_wrapper(*args, **kwargs): @patch('shutil.copytree', MagicMock()) @patch('shutil.copy', MagicMock()) @patch('os.listdir', MagicMock()) -@patch('bentoml.yatai.deployment.aws_lambda.init_sam_project', MagicMock()) -@patch('bentoml.yatai.deployment.aws_lambda.lambda_package', MagicMock()) +@patch('bentoml.yatai.deployment.aws_lambda.operator.init_sam_project', MagicMock()) +@patch('bentoml.yatai.deployment.aws_lambda.operator.lambda_package', MagicMock()) @patch( - 'bentoml.yatai.deployment.aws_lambda.validate_lambda_template', + 'bentoml.yatai.deployment.aws_lambda.operator.validate_lambda_template', MagicMock(return_value=None), ) @patch( - 'bentoml.yatai.deployment.aws_lambda.lambda_deploy', MagicMock(return_value=None) + 'bentoml.yatai.deployment.aws_lambda.operator.lambda_deploy', + MagicMock(return_value=None), ) @patch( - 'bentoml.yatai.deployment.aws_lambda.total_file_or_directory_size', + 'bentoml.yatai.deployment.aws_lambda.operator.total_file_or_directory_size', MagicMock(return_value=250), ) @patch('os.remove', MagicMock()) @@ -222,21 +223,22 @@ def test_aws_lambda_apply_under_bundle_size_limit_success(): @patch('shutil.copytree', MagicMock()) @patch('shutil.copy', MagicMock()) @patch('os.listdir', MagicMock()) -@patch('bentoml.yatai.deployment.aws_lambda.init_sam_project', MagicMock()) -@patch('bentoml.yatai.deployment.aws_lambda.lambda_package', MagicMock()) +@patch('bentoml.yatai.deployment.aws_lambda.operator.init_sam_project', MagicMock()) +@patch('bentoml.yatai.deployment.aws_lambda.operator.lambda_package', MagicMock()) @patch( - 'bentoml.yatai.deployment.aws_lambda.validate_lambda_template', + 'bentoml.yatai.deployment.aws_lambda.operator.validate_lambda_template', MagicMock(return_value=None), ) @patch( - 'bentoml.yatai.deployment.aws_lambda.lambda_deploy', MagicMock(return_value=None) + 'bentoml.yatai.deployment.aws_lambda.operator.lambda_deploy', + MagicMock(return_value=None), ) @patch( - 'bentoml.yatai.deployment.aws_lambda.total_file_or_directory_size', + 'bentoml.yatai.deployment.aws_lambda.operator.total_file_or_directory_size', MagicMock(return_value=249000001), ) @patch( - 'bentoml.yatai.deployment.aws_lambda.' + 'bentoml.yatai.deployment.aws_lambda.operator.' 'reduce_bundle_size_and_upload_extra_resources_to_s3', MagicMock(), ) @@ -261,7 +263,7 @@ def mock_cf_response(self, op_name, kwarg): yatai_service_mock = create_yatai_service_mock() test_deployment_pb = generate_lambda_deployment_pb() with patch( - 'bentoml.yatai.deployment.aws_lambda.get_default_aws_region', + 'bentoml.yatai.deployment.aws_lambda.operator.get_default_aws_region', MagicMock(return_value='mock_region'), ): with patch('botocore.client.BaseClient._make_api_call', new=mock_cf_response): @@ -294,7 +296,7 @@ def mock_cf_response(self, op_name, kwarg): yatai_service_mock = create_yatai_service_mock() test_deployment_pb = generate_lambda_deployment_pb() with patch( - 'bentoml.yatai.deployment.aws_lambda.get_default_aws_region', + 'bentoml.yatai.deployment.aws_lambda.operator.get_default_aws_region', MagicMock(return_value='mock_region'), ): with patch('botocore.client.BaseClient._make_api_call', new=mock_cf_response): diff --git a/tests/deployment/sagemaker/test_sagemaker.py b/tests/deployment/sagemaker/test_sagemaker.py index e5eee1ffa36..0e024bca7d6 100644 --- a/tests/deployment/sagemaker/test_sagemaker.py +++ b/tests/deployment/sagemaker/test_sagemaker.py @@ -9,11 +9,11 @@ import boto3 from moto import mock_ecr, mock_iam, mock_sts -from bentoml.yatai.deployment.sagemaker import ( +from bentoml.yatai.deployment.sagemaker.operator import ( _aws_client_error_to_bentoml_exception, get_arn_role_from_current_aws_user, - SageMakerDeploymentOperator, ) +from bentoml.yatai.deployment.sagemaker.operator import SageMakerDeploymentOperator from bentoml.yatai.proto.deployment_pb2 import Deployment, DeploymentSpec from bentoml.yatai.proto.repository_pb2 import ( Bento, @@ -139,9 +139,12 @@ def mock_sagemaker_deployment_wrapper(func): @patch('subprocess.check_output', MagicMock()) @patch('docker.APIClient.build', MagicMock()) @patch('docker.APIClient.push', MagicMock()) - @patch('bentoml.yatai.deployment.sagemaker._init_sagemaker_project', MagicMock()) @patch( - 'bentoml.yatai.deployment.sagemaker.get_default_aws_region', + 'bentoml.yatai.deployment.sagemaker.operator._init_sagemaker_project', + MagicMock(), + ) + @patch( + 'bentoml.yatai.deployment.sagemaker.operator.get_default_aws_region', MagicMock(return_value='mock_region'), ) def mock_wrapper(*args, **kwargs): diff --git a/tests/yatai/test_validators.py b/tests/yatai/test_validators.py index d2bb94944d7..f11684b2f25 100644 --- a/tests/yatai/test_validators.py +++ b/tests/yatai/test_validators.py @@ -1,4 +1,4 @@ -from bentoml.yatai.validator import validate_deployment_pb_schema +from bentoml.yatai.validator import validate_deployment_pb from bentoml.yatai.proto.deployment_pb2 import Deployment, DeploymentSpec @@ -27,16 +27,16 @@ def _get_test_lambda_deployment_pb(): def test_validate_deployment_pb_schema(): deployment_pb = _get_test_sagemaker_deployment_pb() - assert validate_deployment_pb_schema(deployment_pb) is None + assert validate_deployment_pb(deployment_pb) is None deployment_pb_with_empty_name = _get_test_sagemaker_deployment_pb() deployment_pb_with_empty_name.name = '' - errors = validate_deployment_pb_schema(deployment_pb_with_empty_name) + errors = validate_deployment_pb(deployment_pb_with_empty_name) assert errors == {'name': ['required field']} deployment_pb_with_invalid_service_version = _get_test_sagemaker_deployment_pb() deployment_pb_with_invalid_service_version.spec.bento_version = 'latest' - errors = validate_deployment_pb_schema(deployment_pb_with_invalid_service_version) + errors = validate_deployment_pb(deployment_pb_with_invalid_service_version) assert errors['spec'][0]['bento_version'] == [ 'Must use specific "bento_version" in deployment, using "latest" is an ' 'anti-pattern.' @@ -45,12 +45,12 @@ def test_validate_deployment_pb_schema(): def test_validate_aws_lambda_schema(): deployment_pb = _get_test_lambda_deployment_pb() - assert validate_deployment_pb_schema(deployment_pb) is None + assert validate_deployment_pb(deployment_pb) is None deployment_pb_with_bad_memory_size = _get_test_lambda_deployment_pb() deployment_pb_with_bad_memory_size.spec.aws_lambda_operator_config.timeout = 1000 deployment_pb_with_bad_memory_size.spec.aws_lambda_operator_config.memory_size = 129 - errors = validate_deployment_pb_schema(deployment_pb_with_bad_memory_size) + errors = validate_deployment_pb(deployment_pb_with_bad_memory_size) aws_spec_fail_msg = errors['spec'][0]['aws_lambda_operator_config'][0] assert 'AWS Lambda memory' in aws_spec_fail_msg['memory_size'][0]