Skip to content
Permalink
Browse files

Implement deployment update for sagemaker (#461)

* add update cli command

* Impl sagemaker update function

* add intergration test for update sagemaker deployment

* refactor func for sagemaker e2e

* add sample_data option for test function

* move the funcs into each script so sagemaker wont complain

* update validation to on par with proto

* move func to scripts

* update

* include previouse deployment for operator update func

* update var name and update sagemaker test script

* add testing for sagemaker update

* add more logger and format

* remove outdated docstring

* update exception message

* fix pep8

* add notes

* move update fields to a list

* move steps into try block

* update python api

* fix linter issues
  • Loading branch information
yubozhao authored and parano committed Jan 9, 2020
1 parent 513a88c commit 78de8cfa056a2966c4b934be495630d548bf54a1
@@ -46,6 +46,7 @@
get_deployment,
describe_deployment,
list_deployments,
update_sagemaker_deployment,
)
from bentoml.yatai import get_yatai_service

@@ -346,6 +347,7 @@ def create(
),
CLI_COLOR_ERROR,
)
return
else:
if wait:
result_state = get_state_after_await_action_complete(
@@ -372,6 +374,123 @@ def create(
_echo('Successfully created deployment {}'.format(name), CLI_COLOR_SUCCESS)
_print_deployment_info(result.deployment, output)

@deployment.command(help='Update existing deployment')
@click.argument("name", type=click.STRING, required=True)
@click.option(
'-n',
'--namespace',
type=click.STRING,
help='Deployment namespace managed by BentoML, default value is "default" which'
'can be changed in BentoML configuration file',
)
@click.option(
'-b',
'--bento',
'--bento-service-bundle',
type=click.STRING,
callback=parse_bento_tag_callback,
help='Target BentoService to be deployed, referenced by its name and version '
'in format of name:version. For example: "iris_classifier:v1.2.0"',
)
@click.option(
'--instance-type',
help='Type of instance will be used for inference. Option applicable to '
'platform: AWS SageMaker. Default to "m1.m4.xlarge"',
type=click.STRING,
)
@click.option(
'--instance-count',
help='Number of instance will be used. Option applicable to platform: AWS '
'SageMaker. Default value is 1',
type=click.INT,
)
@click.option(
'--num-of-gunicorn-workers-per-instance',
help='Number of gunicorn worker will be used per instance. Option applicable '
'to platform: AWS SageMaker. Default value for gunicorn worker is '
'based on the instance\' cpu core counts. The formula is num_of_cpu/2 + 1',
type=click.INT,
)
@click.option(
'--api-name',
help='User defined API function will be used for inference. Required for AWS '
'SageMaker',
)
@click.option('-o', '--output', type=click.Choice(['json', 'yaml']), default='json')
@click.option(
'--wait/--no-wait',
default=True,
help='Wait for apply action to complete or encounter an error.'
'If set to no-wait, CLI will return immediately. The default value is wait',
)
def update(
name,
namespace,
bento,
instance_type,
instance_count,
num_of_gunicorn_workers_per_instance,
api_name,
output,
wait,
):
yatai_service = get_yatai_service()
track_cli('deploy-update')
if bento:
bento_name, bento_version = bento.split(':')
else:
bento_name = None
bento_version = None
try:
result = update_sagemaker_deployment(
namespace=namespace,
deployment_name=name,
bento_name=bento_name,
bento_version=bento_version,
instance_count=instance_count,
instance_type=instance_type,
num_of_gunicorn_workers_per_instance=num_of_gunicorn_workers_per_instance, # noqa E501
api_name=api_name,
yatai_service=yatai_service,
)
except BentoMLException as e:
_echo(f'Failed to update deployment {name}: {str(e)}', CLI_COLOR_ERROR)
return
if result.status.status_code != status_pb2.Status.OK:
update_deployment_status = result.status
_echo(
f'Failed to update deployment {name}. '
f'{status_pb2.Status.Code.Name(update_deployment_status.status_code)}:'
f'{update_deployment_status.error_message}',
CLI_COLOR_ERROR,
)
return
else:
if wait:
result_state = get_state_after_await_action_complete(
yatai_service=yatai_service,
name=name,
namespace=namespace,
message='Updating deployment',
)
if result_state.status.status_code != status_pb2.Status.OK:
describe_deployment_status = result_state.status
_echo(
f'Updated deployment {name}. Failed to retrieve latest status. '
f'{status_pb2.Status.Code.Name(describe_deployment_status.status_code)}:' # noqa E501
f'{describe_deployment_status.error_message}'
)
return
result.deployment.state.CopyFrom(result_state.state)
track_cli(
'deploy-update-success',
deploy_platform=DeploymentSpec.DeploymentOperator.Name(
result.deployment.spec.operator
),
)
_echo(f'Successfully updated deployment {name}', CLI_COLOR_SUCCESS)
_print_deployment_info(result.deployment, output)

@deployment.command(help='Apply model service deployment from yaml file')
@click.option(
'-f',
@@ -65,7 +65,7 @@ def add(self, deployment_pb):
"""

@abstractmethod
def update(self, deployment_pb):
def update(self, deployment_pb, previous_deployment):
"""
Update existing deployment based on deployment_pb spec
"""
@@ -437,6 +437,19 @@ def _create_sagemaker_endpoint(sagemaker_client, endpoint_name, endpoint_config_
)


def _update_sagemaker_endpoint(sagemaker_client, endpoint_name, endpoint_config_name):
try:
logger.debug("Updating sagemaker endpoint %s", endpoint_name)
update_endpoint_response = sagemaker_client.update_endpoint(
EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)
logger.debug("AWS update endpoint response: %s", str(update_endpoint_response))
except ClientError as e:
raise _aws_client_error_to_bentoml_exception(
e, "Failed to update sagemaker endpoint"
)


class SageMakerDeploymentOperator(DeploymentOperatorBase):
def add(self, deployment_pb):
try:
@@ -526,11 +539,160 @@ def _add(self, deployment_pb, bento_pb, bento_path):

return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb)

def update(self, deployment_pb):
raise NotImplementedError(
"Updating AWS SageMaker deployment is not supported in current version of "
"BentoML"
)
def update(self, deployment_pb, previous_deployment):
try:
ensure_docker_available_or_raise()
deployment_spec = deployment_pb.spec
bento_pb = self.yatai_service.GetBento(
GetBentoRequest(
bento_name=deployment_spec.bento_name,
bento_version=deployment_spec.bento_version,
)
)
if bento_pb.bento.uri.type not in (BentoUri.LOCAL, BentoUri.S3):
raise BentoMLException(
'BentoML currently not support {} repository'.format(
BentoUri.StorageType.Name(bento_pb.bento.uri.type)
)
)
return self._update(
deployment_pb, previous_deployment, bento_pb, bento_pb.bento.uri.uri
)
except BentoMLException as error:
deployment_pb.state.state = DeploymentState.ERROR
deployment_pb.state.error_message = (
f'Error updating SageMaker deployment: {str(error)}'
)
return ApplyDeploymentResponse(
status=error.status_proto, deployment=deployment_pb
)

def _update(self, deployment_pb, current_deployment, bento_pb, bento_path):
if loader._is_remote_path(bento_path):
with loader._resolve_remote_bundle_path(bento_path) as local_path:
return self._update(
deployment_pb, current_deployment, bento_pb, local_path
)
updated_deployment_spec = deployment_pb.spec
updated_sagemaker_config = updated_deployment_spec.sagemaker_operator_config
sagemaker_client = boto3.client('sagemaker', updated_sagemaker_config.region)

try:
raise_if_api_names_not_found_in_bento_service_metadata(
bento_pb.bento.bento_service_metadata,
[updated_sagemaker_config.api_name],
)
describe_latest_deployment_state = self.describe(deployment_pb)
current_deployment_spec = current_deployment.spec
current_sagemaker_config = current_deployment_spec.sagemaker_operator_config
latest_deployment_state = json.loads(
describe_latest_deployment_state.state.info_json
)

current_ecr_image_tag = latest_deployment_state['ProductionVariants'][0][
'DeployedImages'
][0]['SpecifiedImage']
if (
updated_deployment_spec.bento_name != current_deployment_spec.bento_name
or updated_deployment_spec.bento_version
!= current_deployment_spec.bento_version
):
logger.debug(
'BentoService tag is different from current deployment, '
'creating new docker image and push to ECR'
)
with TempDirectory() as temp_dir:
sagemaker_project_dir = os.path.join(
temp_dir, updated_deployment_spec.bento_name
)
_init_sagemaker_project(sagemaker_project_dir, bento_path)
ecr_image_path = create_and_push_docker_image_to_ecr(
updated_sagemaker_config.region,
updated_deployment_spec.bento_name,
updated_deployment_spec.bento_version,
sagemaker_project_dir,
)
else:
logger.debug('Using existing ECR image for Sagemaker model')
ecr_image_path = current_ecr_image_tag

(
updated_sagemaker_model_name,
updated_sagemaker_endpoint_config_name,
sagemaker_endpoint_name,
) = _get_sagemaker_resource_names(deployment_pb)
(
current_sagemaker_model_name,
current_sagemaker_endpoint_config_name,
_,
) = _get_sagemaker_resource_names(current_deployment)

if (
updated_sagemaker_config.api_name != current_sagemaker_config.api_name
or updated_sagemaker_config.num_of_gunicorn_workers_per_instance
!= current_sagemaker_config.num_of_gunicorn_workers_per_instance
or ecr_image_path != current_ecr_image_tag
):
logger.debug(
'Sagemaker model requires update. Delete current sagemaker model %s'
'and creating new model %s',
current_sagemaker_model_name,
updated_sagemaker_model_name,
)
_delete_sagemaker_model_if_exist(
sagemaker_client, current_sagemaker_model_name
)
_create_sagemaker_model(
sagemaker_client,
updated_sagemaker_model_name,
ecr_image_path,
updated_sagemaker_config,
)
# When bento service tag is not changed, we need to delete the current
# endpoint configuration in order to create new one to avoid name collation
if (
current_sagemaker_endpoint_config_name
== updated_sagemaker_endpoint_config_name
):
logger.debug(
'Current sagemaker config name %s is same as updated one, '
'delete it before create new endpoint config',
current_sagemaker_endpoint_config_name,
)
_delete_sagemaker_endpoint_config_if_exist(
sagemaker_client, current_sagemaker_endpoint_config_name
)
logger.debug(
'Create new endpoint configuration %s',
updated_sagemaker_endpoint_config_name,
)
_create_sagemaker_endpoint_config(
sagemaker_client,
updated_sagemaker_model_name,
updated_sagemaker_endpoint_config_name,
updated_sagemaker_config,
)
logger.debug(
'Updating endpoint to new endpoint configuration %s',
updated_sagemaker_endpoint_config_name,
)
_update_sagemaker_endpoint(
sagemaker_client,
sagemaker_endpoint_name,
updated_sagemaker_endpoint_config_name,
)
logger.debug(
'Delete old sagemaker endpoint config %s',
current_sagemaker_endpoint_config_name,
)
_delete_sagemaker_endpoint_config_if_exist(
sagemaker_client, current_sagemaker_endpoint_config_name
)
except AWSServiceError as e:
_try_clean_up_sagemaker_deployment_resource(deployment_pb)
raise e

return ApplyDeploymentResponse(status=Status.OK(), deployment=deployment_pb)

def delete(self, deployment_pb):
try:
@@ -19,13 +19,15 @@
from cerberus import Validator

from bentoml.utils import ProtoMessageToDict
from bentoml.proto.deployment_pb2 import DeploymentSpec
from bentoml.proto.deployment_pb2 import DeploymentSpec, DeploymentState

deployment_schema = {
'name': {'type': 'string', 'required': True, 'minlength': 4},
'namespace': {'type': 'string', 'required': True, 'minlength': 3},
'labels': {'type': 'dict', 'allow_unknown': True},
'annotations': {'type': 'dict', 'allow_unknown': True},
'created_at': {'type': 'string'},
'last_updated_at': {'type': 'string'},
'spec': {
'type': 'dict',
'required': True,
@@ -80,9 +82,10 @@
'state': {
'type': 'dict',
'schema': {
'state': {'type': 'integer'},
'state': {'type': 'string', 'allowed': DeploymentState.State.keys()},
'error_message': {'type': 'string'},
'info_json': {'type': 'string'},
'timestamp': {'type': 'string'},
},
},
}
@@ -26,6 +26,15 @@

logger = logging.getLogger(__name__)

SPEC_FIELDS_AVAILABLE_FOR_UPDATE = ['bento_name', 'bento_version']

SAGEMAKER_FIELDS_AVAILABLE_FOR_UPDATE = [
'api_name',
'instance_type',
'instance_count',
'num_of_gunicorn_workers_per_instance',
]


def deployment_dict_to_pb(deployment_dict):
deployment_pb = Deployment()
@@ -62,13 +71,10 @@ def deployment_dict_to_pb(deployment_dict):
'api_name',
'instance_type',
'num_of_gunicorn_workers_per_instance',
'instance_count',
]:
if sagemaker_config.get(field):
sagemaker_config_pb.__setattr__(field, sagemaker_config.get(field))
if sagemaker_config.get('instance_count'):
sagemaker_config_pb.instance_count = int(
sagemaker_config.get('instance_count')
)
elif deployment_pb.spec.operator == DeploymentSpec.AWS_LAMBDA:
lambda_conf = spec_dict.get('aws_lambda_operator_config', {})
for field in ['region', 'api_name', 'memory_size', 'timeout']:

0 comments on commit 78de8cf

Please sign in to comment.
You can’t perform that action at this time.