/
model.py
1648 lines (1473 loc) · 77.3 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Placeholder docstring"""
from __future__ import absolute_import
import abc
import json
import logging
import os
import copy
from typing import List, Dict, Optional, Union
import sagemaker
from sagemaker import (
fw_utils,
local,
s3,
session,
utils,
git_utils,
)
from sagemaker.session import Session
from sagemaker.model_metrics import ModelMetrics
from sagemaker.deprecations import removed_kwargs
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.metadata_properties import MetadataProperties
from sagemaker.predictor import PredictorBase
from sagemaker.serverless import ServerlessInferenceConfig
from sagemaker.transformer import Transformer
from sagemaker.jumpstart.utils import add_jumpstart_tags, get_jumpstart_base_name_if_jumpstart_model
from sagemaker.utils import (
unique_name_from_base,
update_container_with_inference_params,
to_string,
)
from sagemaker.async_inference import AsyncInferenceConfig
from sagemaker.predictor_async import AsyncPredictor
from sagemaker.workflow import is_pipeline_variable
from sagemaker.workflow.entities import PipelineVariable
from sagemaker.workflow.pipeline_context import runnable_by_pipeline, PipelineSession
from sagemaker.inference_recommender.inference_recommender_mixin import InferenceRecommenderMixin
LOGGER = logging.getLogger("sagemaker")
NEO_ALLOWED_FRAMEWORKS = set(
["mxnet", "tensorflow", "keras", "pytorch", "onnx", "xgboost", "tflite"]
)
NEO_IOC_TARGET_DEVICES = ["ml_c4", "ml_c5", "ml_m4", "ml_m5", "ml_p2", "ml_p3", "ml_g4dn"]
class ModelBase(abc.ABC):
"""An object that encapsulates a trained model.
Models can be deployed to compute services like a SageMaker ``Endpoint``
or Lambda. Deployed models can be used to perform real-time inference.
"""
@abc.abstractmethod
def deploy(self, *args, **kwargs) -> PredictorBase:
"""Deploy this model to a compute service."""
@abc.abstractmethod
def delete_model(self, *args, **kwargs) -> None:
"""Destroy resources associated with this model."""
SCRIPT_PARAM_NAME = "sagemaker_program"
DIR_PARAM_NAME = "sagemaker_submit_directory"
CONTAINER_LOG_LEVEL_PARAM_NAME = "sagemaker_container_log_level"
JOB_NAME_PARAM_NAME = "sagemaker_job_name"
MODEL_SERVER_WORKERS_PARAM_NAME = "sagemaker_model_server_workers"
SAGEMAKER_REGION_PARAM_NAME = "sagemaker_region"
SAGEMAKER_OUTPUT_LOCATION = "sagemaker_s3_output"
class Model(ModelBase, InferenceRecommenderMixin):
"""A SageMaker ``Model`` that can be deployed to an ``Endpoint``."""
def __init__(
self,
image_uri: Union[str, PipelineVariable],
model_data: Optional[Union[str, PipelineVariable]] = None,
role: Optional[str] = None,
predictor_cls: Optional[callable] = None,
env: Optional[Dict[str, Union[str, PipelineVariable]]] = None,
name: Optional[str] = None,
vpc_config: Optional[Dict[str, List[Union[str, PipelineVariable]]]] = None,
sagemaker_session: Optional[Session] = None,
enable_network_isolation: Union[bool, PipelineVariable] = False,
model_kms_key: Optional[str] = None,
image_config: Optional[Dict[str, Union[str, PipelineVariable]]] = None,
source_dir: Optional[str] = None,
code_location: Optional[str] = None,
entry_point: Optional[str] = None,
container_log_level: Union[int, PipelineVariable] = logging.INFO,
dependencies: Optional[List[str]] = None,
git_config: Optional[Dict[str, str]] = None,
):
"""Initialize an SageMaker ``Model``.
Args:
image_uri (str or PipelineVariable): A Docker image URI.
model_data (str or PipelineVariable): The S3 location of a SageMaker
model data ``.tar.gz`` file (default: None).
role (str): An AWS IAM role (either name or full ARN). The Amazon
SageMaker training jobs and APIs that create Amazon SageMaker
endpoints use this role to access training data and model
artifacts. After the endpoint is created, the inference code
might use the IAM role if it needs to access some AWS resources.
It can be null if this is being used to create a Model to pass
to a ``PipelineModel`` which has its own Role field. (default:
None)
predictor_cls (callable[string, sagemaker.session.Session]): A
function to call to create a predictor (default: None). If not
None, ``deploy`` will return the result of invoking this
function on the created endpoint name.
env (dict[str, str] or dict[str, PipelineVariable]): Environment variables
to run with ``image_uri`` when hosted in SageMaker (default: None).
name (str): The model name. If None, a default model name will be
selected on each ``deploy``.
vpc_config (dict[str, list[str]] or dict[str, list[PipelineVariable]]):
The VpcConfig set on the model (default: None)
* 'Subnets' (list[str]): List of subnet ids.
* 'SecurityGroupIds' (list[str]): List of security group ids.
sagemaker_session (sagemaker.session.Session): A SageMaker Session
object, used for SageMaker interactions (default: None). If not
specified, one is created using the default AWS configuration
chain.
enable_network_isolation (Boolean or PipelineVariable): Default False.
if True, enables network isolation in the endpoint, isolating the model
container. No inbound or outbound network calls can be made to
or from the model container.
model_kms_key (str): KMS key ARN used to encrypt the repacked
model archive file if the model is repacked
image_config (dict[str, str] or dict[str, PipelineVariable]): Specifies
whether the image of model container is pulled from ECR, or private
registry in your VPC. By default it is set to pull model container
image from ECR. (default: None).
source_dir (str): The absolute, relative, or S3 URI Path to a directory
with any other training source code dependencies aside from the entry
point file (default: None). If ``source_dir`` is an S3 URI, it must
point to a tar.gz file. Structure within this directory is preserved
when training on Amazon SageMaker. If 'git_config' is provided,
'source_dir' should be a relative location to a directory in the Git repo.
If the directory points to S3, no code is uploaded and the S3 location
is used instead.
.. admonition:: Example
With the following GitHub repo directory structure:
>>> |----- README.md
>>> |----- src
>>> |----- inference.py
>>> |----- test.py
You can assign entry_point='inference.py', source_dir='src'.
code_location (str): Name of the S3 bucket where custom code is
uploaded (default: None). If not specified, the default bucket
created by ``sagemaker.session.Session`` is used.
entry_point (str): The absolute or relative path to the local Python
source file that should be executed as the entry point to
model hosting. (Default: None). If ``source_dir`` is specified, then ``entry_point``
must point to a file located at the root of ``source_dir``.
If 'git_config' is provided, 'entry_point' should be
a relative location to the Python source file in the Git repo.
Example:
With the following GitHub repo directory structure:
>>> |----- README.md
>>> |----- src
>>> |----- inference.py
>>> |----- test.py
You can assign entry_point='src/inference.py'.
container_log_level (int or PipelineVariable): Log level to use within the
container (default: logging.INFO). Valid values are defined in the Python
logging module.
dependencies (list[str]): A list of absolute or relative paths to directories
with any additional libraries that should be exported
to the container (default: []). The library folders are
copied to SageMaker in the same folder where the entrypoint is
copied. If 'git_config' is provided, 'dependencies' should be a
list of relative locations to directories with any additional
libraries needed in the Git repo. If the ```source_dir``` points
to S3, code will be uploaded and the S3 location will be used
instead.
.. admonition:: Example
The following call
>>> Model(entry_point='inference.py',
... dependencies=['my/libs/common', 'virtual-env'])
results in the following structure inside the container:
>>> $ ls
>>> opt/ml/code
>>> |------ inference.py
>>> |------ common
>>> |------ virtual-env
This is not supported with "local code" in Local Mode.
git_config (dict[str, str]): Git configurations used for cloning
files, including ``repo``, ``branch``, ``commit``,
``2FA_enabled``, ``username``, ``password`` and ``token``. The
``repo`` field is required. All other fields are optional.
``repo`` specifies the Git repository where your training script
is stored. If you don't provide ``branch``, the default value
'master' is used. If you don't provide ``commit``, the latest
commit in the specified branch is used.
.. admonition:: Example
The following config:
>>> git_config = {'repo': 'https://github.com/aws/sagemaker-python-sdk.git',
>>> 'branch': 'test-branch-git-config',
>>> 'commit': '329bfcf884482002c05ff7f44f62599ebc9f445a'}
results in cloning the repo specified in 'repo', then
checking out the 'master' branch, and checking out the specified
commit.
``2FA_enabled``, ``username``, ``password`` and ``token`` are
used for authentication. For GitHub (or other Git) accounts, set
``2FA_enabled`` to 'True' if two-factor authentication is
enabled for the account, otherwise set it to 'False'. If you do
not provide a value for ``2FA_enabled``, a default value of
'False' is used. CodeCommit does not support two-factor
authentication, so do not provide "2FA_enabled" with CodeCommit
repositories.
For GitHub and other Git repos, when SSH URLs are provided, it
doesn't matter whether 2FA is enabled or disabled. You should
either have no passphrase for the SSH key pairs or have the
ssh-agent configured so that you will not be prompted for the SSH
passphrase when you run the 'git clone' command with SSH URLs. When
HTTPS URLs are provided, if 2FA is disabled, then either ``token``
or ``username`` and ``password`` are be used for authentication if provided.
``Token`` is prioritized. If 2FA is enabled, only ``token`` is used
for authentication if provided. If required authentication info
is not provided, the SageMaker Python SDK attempts to use local credentials
to authenticate. If that fails, an error message is thrown.
For CodeCommit repos, 2FA is not supported, so ``2FA_enabled``
should not be provided. There is no token in CodeCommit, so
``token`` should also not be provided. When ``repo`` is an SSH URL,
the requirements are the same as GitHub repos. When ``repo``
is an HTTPS URL, ``username`` and ``password`` are used for
authentication if they are provided. If they are not provided,
the SageMaker Python SDK attempts to use either the CodeCommit
credential helper or local credential storage for authentication.
"""
self.model_data = model_data
self.image_uri = image_uri
self.role = role
self.predictor_cls = predictor_cls
self.env = env or {}
self.name = name
self._base_name = None
self.vpc_config = vpc_config
self.sagemaker_session = sagemaker_session
self.endpoint_name = None
self._is_compiled_model = False
self._compilation_job_name = None
self._is_edge_packaged_model = False
self.inference_recommender_job_results = None
self.inference_recommendations = None
self._enable_network_isolation = enable_network_isolation
self.model_kms_key = model_kms_key
self.image_config = image_config
self.entry_point = entry_point
self.source_dir = source_dir
self.dependencies = dependencies or []
self.git_config = git_config
self.container_log_level = container_log_level
if code_location:
self.bucket, self.key_prefix = s3.parse_s3_url(code_location)
else:
self.bucket, self.key_prefix = None, None
if self.git_config:
updates = git_utils.git_clone_repo(
self.git_config, self.entry_point, self.source_dir, self.dependencies
)
self.entry_point = updates["entry_point"]
self.source_dir = updates["source_dir"]
self.dependencies = updates["dependencies"]
self.uploaded_code = None
self.repacked_model_data = None
@runnable_by_pipeline
def register(
self,
content_types: List[Union[str, PipelineVariable]],
response_types: List[Union[str, PipelineVariable]],
inference_instances: Optional[List[Union[str, PipelineVariable]]] = None,
transform_instances: Optional[List[Union[str, PipelineVariable]]] = None,
model_package_name: Optional[Union[str, PipelineVariable]] = None,
model_package_group_name: Optional[Union[str, PipelineVariable]] = None,
image_uri: Optional[Union[str, PipelineVariable]] = None,
model_metrics: Optional[ModelMetrics] = None,
metadata_properties: Optional[MetadataProperties] = None,
marketplace_cert: bool = False,
approval_status: Optional[Union[str, PipelineVariable]] = None,
description: Optional[str] = None,
drift_check_baselines: Optional[DriftCheckBaselines] = None,
customer_metadata_properties: Optional[Dict[str, Union[str, PipelineVariable]]] = None,
validation_specification: Optional[Union[str, PipelineVariable]] = None,
domain: Optional[Union[str, PipelineVariable]] = None,
task: Optional[Union[str, PipelineVariable]] = None,
sample_payload_url: Optional[Union[str, PipelineVariable]] = None,
framework: Optional[Union[str, PipelineVariable]] = None,
framework_version: Optional[Union[str, PipelineVariable]] = None,
nearest_model_name: Optional[Union[str, PipelineVariable]] = None,
data_input_configuration: Optional[Union[str, PipelineVariable]] = None,
):
"""Creates a model package for creating SageMaker models or listing on Marketplace.
Args:
content_types (list[str] or list[PipelineVariable]): The supported MIME types
for the input data.
response_types (list[str] or list[PipelineVariable]): The supported MIME types
for the output data.
inference_instances (list[str] or list[PipelineVariable]): A list of the instance
types that are used to generate inferences in real-time (default: None).
transform_instances (list[str] or list[PipelineVariable]): A list of the instance
types on which a transformation job can be run or on which an endpoint can be
deployed (default: None).
model_package_name (str or PipelineVariable): Model Package name, exclusive to
`model_package_group_name`, using `model_package_name` makes the Model Package
un-versioned (default: None).
model_package_group_name (str or PipelineVariable): Model Package Group name,
exclusive to `model_package_name`, using `model_package_group_name` makes
the Model Package versioned (default: None).
image_uri (str or PipelineVariable): Inference image uri for the container.
Model class' self.image will be used if it is None (default: None).
model_metrics (ModelMetrics): ModelMetrics object (default: None).
metadata_properties (MetadataProperties): MetadataProperties object (default: None).
marketplace_cert (bool): A boolean value indicating if the Model Package is certified
for AWS Marketplace (default: False).
approval_status (str or PipelineVariable): Model Approval Status, values can be
"Approved", "Rejected", or "PendingManualApproval"
(default: "PendingManualApproval").
description (str): Model Package description (default: None).
drift_check_baselines (DriftCheckBaselines): DriftCheckBaselines object (default: None).
customer_metadata_properties (dict[str, str] or dict[str, PipelineVariable]):
A dictionary of key-value paired metadata properties (default: None).
domain (str or PipelineVariable): Domain values can be "COMPUTER_VISION",
"NATURAL_LANGUAGE_PROCESSING", "MACHINE_LEARNING" (default: None).
task (str or PipelineVariable): Task values which are supported by Inference Recommender
are "FILL_MASK", "IMAGE_CLASSIFICATION", "OBJECT_DETECTION", "TEXT_GENERATION",
"IMAGE_SEGMENTATION", "CLASSIFICATION", "REGRESSION", "OTHER" (default: None).
sample_payload_url (str or PipelineVariable): The S3 path where the sample
payload is stored (default: None).
framework (str or PipelineVariable): Machine learning framework of the model package
container image (default: None).
framework_version (str or PipelineVariable): Framework version of the Model Package
Container Image (default: None).
nearest_model_name (str or PipelineVariable): Name of a pre-trained machine learning
benchmarked by Amazon SageMaker Inference Recommender (default: None).
data_input_configuration (str or PipelineVariable): Input object for the model
(default: None).
Returns:
A `sagemaker.model.ModelPackage` instance or pipeline step arguments
in case the Model instance is built with
:class:`~sagemaker.workflow.pipeline_context.PipelineSession`
"""
if self.model_data is None:
raise ValueError("SageMaker Model Package cannot be created without model data.")
if image_uri is not None:
self.image_uri = image_uri
if model_package_group_name is not None:
container_def = self.prepare_container_def()
container_def = update_container_with_inference_params(
framework=framework,
framework_version=framework_version,
nearest_model_name=nearest_model_name,
data_input_configuration=data_input_configuration,
container_def=container_def,
)
else:
container_def = {
"Image": self.image_uri,
"ModelDataUrl": self.model_data,
}
model_pkg_args = sagemaker.get_model_package_args(
content_types,
response_types,
inference_instances=inference_instances,
transform_instances=transform_instances,
model_package_name=model_package_name,
model_package_group_name=model_package_group_name,
model_metrics=model_metrics,
metadata_properties=metadata_properties,
marketplace_cert=marketplace_cert,
approval_status=approval_status,
description=description,
container_def_list=[container_def],
drift_check_baselines=drift_check_baselines,
customer_metadata_properties=customer_metadata_properties,
validation_specification=validation_specification,
domain=domain,
sample_payload_url=sample_payload_url,
task=task,
)
model_package = self.sagemaker_session.create_model_package_from_containers(
**model_pkg_args
)
if isinstance(self.sagemaker_session, PipelineSession):
return None
return ModelPackage(
role=self.role,
model_data=self.model_data,
model_package_arn=model_package.get("ModelPackageArn"),
)
@runnable_by_pipeline
def create(
self,
instance_type: Optional[str] = None,
accelerator_type: Optional[str] = None,
serverless_inference_config: Optional[ServerlessInferenceConfig] = None,
tags: Optional[List[Dict[str, Union[str, PipelineVariable]]]] = None,
):
"""Create a SageMaker Model Entity
Args:
instance_type (str): The EC2 instance type that this Model will be
used for, this is only used to determine if the image needs GPU
support or not (default: None).
accelerator_type (str): Type of Elastic Inference accelerator to
attach to an endpoint for model loading and inference, for
example, 'ml.eia1.medium'. If not specified, no Elastic
Inference accelerator will be attached to the endpoint (default: None).
serverless_inference_config (ServerlessInferenceConfig):
Specifies configuration related to serverless endpoint. Instance type is
not provided in serverless inference. So this is used to find image URIs
(default: None).
tags (list[dict[str, str] or list[dict[str, PipelineVariable]]): The list of
tags to add to the model (default: None). Example::
tags = [{'Key': 'tagname', 'Value':'tagvalue'}]
For more information about tags, see
`boto3 documentation <https://boto3.amazonaws.com/v1/documentation/\
api/latest/reference/services/sagemaker.html#SageMaker.Client.add_tags>`_
Returns:
None or pipeline step arguments in case the Model instance is built with
:class:`~sagemaker.workflow.pipeline_context.PipelineSession`
"""
# TODO: we should replace _create_sagemaker_model() with create()
self._create_sagemaker_model(
instance_type=instance_type,
accelerator_type=accelerator_type,
tags=tags,
serverless_inference_config=serverless_inference_config,
)
def _init_sagemaker_session_if_does_not_exist(self, instance_type=None):
"""Set ``self.sagemaker_session`` to ``LocalSession`` or ``Session`` if it's not already.
The type of session object is determined by the instance type.
"""
if self.sagemaker_session:
return
if instance_type in ("local", "local_gpu"):
self.sagemaker_session = local.LocalSession()
else:
self.sagemaker_session = session.Session()
def prepare_container_def(
self,
instance_type=None,
accelerator_type=None,
serverless_inference_config=None,
): # pylint: disable=unused-argument
"""Return a dict created by ``sagemaker.container_def()``.
It is used for deploying this model to a specified instance type.
Subclasses can override this to provide custom container definitions
for deployment to a specific instance type. Called by ``deploy()``.
Args:
instance_type (str): The EC2 instance type to deploy this Model to.
For example, 'ml.p2.xlarge'.
accelerator_type (str): The Elastic Inference accelerator type to
deploy to the instance for loading and making inferences to the
model. For example, 'ml.eia1.medium'.
serverless_inference_config (sagemaker.serverless.ServerlessInferenceConfig):
Specifies configuration related to serverless endpoint. Instance type is
not provided in serverless inference. So this is used to find image URIs.
Returns:
dict: A container definition object usable with the CreateModel API.
"""
deploy_key_prefix = fw_utils.model_code_key_prefix(
self.key_prefix, self.name, self.image_uri
)
deploy_env = copy.deepcopy(self.env)
if self.source_dir or self.dependencies or self.entry_point or self.git_config:
is_repack = (
self.source_dir and self.entry_point and not (self.key_prefix or self.git_config)
)
self._upload_code(deploy_key_prefix, repack=is_repack)
deploy_env.update(self._script_mode_env_vars())
return sagemaker.container_def(
self.image_uri,
self.repacked_model_data or self.model_data,
deploy_env,
image_config=self.image_config,
)
def _upload_code(self, key_prefix: str, repack: bool = False) -> None:
"""Uploads code to S3 to be used with script mode with SageMaker inference.
Args:
key_prefix (str): The S3 key associated with the ``code_location`` parameter of the
``Model`` class.
repack (bool): Optional. Set to ``True`` to indicate that the source code and model
artifact should be repackaged into a new S3 object. (default: False).
"""
local_code = utils.get_config_value("local.local_code", self.sagemaker_session.config)
bucket = self.bucket or self.sagemaker_session.default_bucket()
if (self.sagemaker_session.local_mode and local_code) or self.entry_point is None:
self.uploaded_code = None
elif not repack:
self.uploaded_code = fw_utils.tar_and_upload_dir(
session=self.sagemaker_session.boto_session,
bucket=bucket,
s3_key_prefix=key_prefix,
script=self.entry_point,
directory=self.source_dir,
dependencies=self.dependencies,
settings=self.sagemaker_session.settings,
)
if repack and self.model_data is not None and self.entry_point is not None:
if is_pipeline_variable(self.model_data):
# model is not yet there, defer repacking to later during pipeline execution
if not isinstance(self.sagemaker_session, PipelineSession):
logging.warning(
"The model_data is a Pipeline variable of type %s, "
"which should be used under `PipelineSession` and "
"leverage `ModelStep` to create or register model. "
"Otherwise some functionalities e.g. "
"runtime repack may be missing. For more, see: "
"https://sagemaker.readthedocs.io/en/stable/"
"amazon_sagemaker_model_building_pipeline.html#model-step",
type(self.model_data),
)
return
self.sagemaker_session.context.need_runtime_repack.add(id(self))
self.sagemaker_session.context.runtime_repack_output_prefix = s3.s3_path_join(
"s3://", bucket, key_prefix
)
# Add the uploaded_code and repacked_model_data to update the container env
self.repacked_model_data = self.model_data
self.uploaded_code = fw_utils.UploadedCode(
s3_prefix=self.repacked_model_data,
script_name=os.path.basename(self.entry_point),
)
return
if local_code and self.model_data.startswith("file://"):
repacked_model_data = self.model_data
else:
repacked_model_data = "s3://" + "/".join([bucket, key_prefix, "model.tar.gz"])
self.uploaded_code = fw_utils.UploadedCode(
s3_prefix=repacked_model_data, script_name=os.path.basename(self.entry_point)
)
utils.repack_model(
inference_script=self.entry_point,
source_directory=self.source_dir,
dependencies=self.dependencies,
model_uri=self.model_data,
repacked_model_uri=repacked_model_data,
sagemaker_session=self.sagemaker_session,
kms_key=self.model_kms_key,
)
self.repacked_model_data = repacked_model_data
def _script_mode_env_vars(self):
"""Returns a mapping of environment variables for script mode execution"""
script_name = None
dir_name = None
if self.uploaded_code:
script_name = self.uploaded_code.script_name
if self.repacked_model_data or self.enable_network_isolation():
dir_name = "/opt/ml/model/code"
else:
dir_name = self.uploaded_code.s3_prefix
elif self.entry_point is not None:
script_name = self.entry_point
if self.source_dir is not None:
dir_name = (
self.source_dir
if self.source_dir.startswith("s3://")
else "file://" + self.source_dir
)
return {
SCRIPT_PARAM_NAME.upper(): script_name or str(),
DIR_PARAM_NAME.upper(): dir_name or str(),
CONTAINER_LOG_LEVEL_PARAM_NAME.upper(): to_string(self.container_log_level),
SAGEMAKER_REGION_PARAM_NAME.upper(): self.sagemaker_session.boto_region_name,
}
def enable_network_isolation(self):
"""Whether to enable network isolation when creating this Model
Returns:
bool: If network isolation should be enabled or not.
"""
return self._enable_network_isolation
def _create_sagemaker_model(
self, instance_type=None, accelerator_type=None, tags=None, serverless_inference_config=None
):
"""Create a SageMaker Model Entity
Args:
instance_type (str): The EC2 instance type that this Model will be
used for, this is only used to determine if the image needs GPU
support or not.
accelerator_type (str): Type of Elastic Inference accelerator to
attach to an endpoint for model loading and inference, for
example, 'ml.eia1.medium'. If not specified, no Elastic
Inference accelerator will be attached to the endpoint.
tags (List[dict[str, str]]): Optional. The list of tags to add to
the model. Example: >>> tags = [{'Key': 'tagname', 'Value':
'tagvalue'}] For more information about tags, see
https://boto3.amazonaws.com/v1/documentation
/api/latest/reference/services/sagemaker.html#SageMaker.Client.add_tags
serverless_inference_config (sagemaker.serverless.ServerlessInferenceConfig):
Specifies configuration related to serverless endpoint. Instance type is
not provided in serverless inference. So this is used to find image URIs.
"""
container_def = self.prepare_container_def(
instance_type,
accelerator_type=accelerator_type,
serverless_inference_config=serverless_inference_config,
)
if not isinstance(self.sagemaker_session, PipelineSession):
# _base_name, model_name are not needed under PipelineSession.
# the model_data may be Pipeline variable
# which may break the _base_name generation
self._ensure_base_name_if_needed(
image_uri=container_def["Image"],
script_uri=self.source_dir,
model_uri=self.model_data,
)
self._set_model_name_if_needed()
enable_network_isolation = self.enable_network_isolation()
self._init_sagemaker_session_if_does_not_exist(instance_type)
create_model_args = dict(
name=self.name,
role=self.role,
container_defs=container_def,
vpc_config=self.vpc_config,
enable_network_isolation=enable_network_isolation,
tags=tags,
)
self.sagemaker_session.create_model(**create_model_args)
def _ensure_base_name_if_needed(self, image_uri, script_uri, model_uri):
"""Create a base name from the image URI if there is no model name provided.
If a JumpStart script or model uri is used, select the JumpStart base name.
"""
if self.name is None:
self._base_name = (
self._base_name
or get_jumpstart_base_name_if_jumpstart_model(script_uri, model_uri)
or utils.base_name_from_image(image_uri, default_base_name=Model.__name__)
)
def _set_model_name_if_needed(self):
"""Generate a new model name if ``self._base_name`` is present."""
if self._base_name:
self.name = utils.name_from_base(self._base_name)
def _framework(self):
"""Placeholder docstring"""
return getattr(self, "_framework_name", None)
def _get_framework_version(self):
"""Placeholder docstring"""
return getattr(self, "framework_version", None)
def _edge_packaging_job_config(
self,
output_path,
role,
model_name,
model_version,
packaging_job_name,
compilation_job_name,
resource_key,
s3_kms_key,
tags,
):
"""Creates a request object for a packaging job.
Args:
output_path (str): where in S3 to store the output of the job
role (str): what role to use when executing the job
packaging_job_name (str): what to name the packaging job
compilation_job_name (str): what compilation job to source the model from
resource_key (str): the kms key to encrypt the disk with
s3_kms_key (str): the kms key to encrypt the output with
tags (list[dict]): List of tags for labeling an edge packaging job. For
more, see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html.
Returns:
dict: the request object to use when creating a packaging job
"""
output_model_config = {
"S3OutputLocation": output_path,
}
if s3_kms_key is not None:
output_model_config["KmsKeyId"] = s3_kms_key
return {
"output_model_config": output_model_config,
"role": role,
"tags": tags,
"model_name": model_name,
"model_version": model_version,
"job_name": packaging_job_name,
"compilation_job_name": compilation_job_name,
"resource_key": resource_key,
}
def _compilation_job_config(
self,
target_instance_type,
input_shape,
output_path,
role,
compile_max_run,
job_name,
framework,
tags,
target_platform_os=None,
target_platform_arch=None,
target_platform_accelerator=None,
compiler_options=None,
framework_version=None,
):
"""Placeholder Docstring"""
input_model_config = {
"S3Uri": self.model_data,
"DataInputConfig": json.dumps(input_shape)
if isinstance(input_shape, dict)
else input_shape,
"Framework": framework.upper(),
}
def multi_version_compilation_supported(
target_instance_type: str, framework: str, framework_version: str
):
if target_instance_type and framework and framework_version:
framework = framework.lower()
multi_version_frameworks_support_mapping = {
"inferentia": ["pytorch", "tensorflow", "mxnet"],
"neo_ioc_targets": ["pytorch", "tensorflow"],
}
if target_instance_type in NEO_IOC_TARGET_DEVICES:
return framework in multi_version_frameworks_support_mapping["neo_ioc_targets"]
if target_instance_type == "ml_inf1":
return framework in multi_version_frameworks_support_mapping["inferentia"]
return False
if multi_version_compilation_supported(target_instance_type, framework, framework_version):
input_model_config["FrameworkVersion"] = utils.get_short_version(framework_version)
role = self.sagemaker_session.expand_role(role)
output_model_config = {
"S3OutputLocation": output_path,
}
if target_instance_type is not None:
output_model_config["TargetDevice"] = target_instance_type
else:
if target_platform_os is None and target_platform_arch is None:
raise ValueError(
"target_instance_type or (target_platform_os and target_platform_arch) "
"should be provided"
)
target_platform = {
"Os": target_platform_os,
"Arch": target_platform_arch,
}
if target_platform_accelerator is not None:
target_platform["Accelerator"] = target_platform_accelerator
output_model_config["TargetPlatform"] = target_platform
if compiler_options is not None:
output_model_config["CompilerOptions"] = (
json.dumps(compiler_options)
if isinstance(compiler_options, dict)
else compiler_options
)
return {
"input_model_config": input_model_config,
"output_model_config": output_model_config,
"role": role,
"stop_condition": {"MaxRuntimeInSeconds": compile_max_run},
"tags": tags,
"job_name": job_name,
}
def package_for_edge(
self,
output_path,
model_name,
model_version,
role=None,
job_name=None,
resource_key=None,
s3_kms_key=None,
tags=None,
):
"""Package this ``Model`` with SageMaker Edge.
Creates a new EdgePackagingJob and wait for it to finish.
model_data will now point to the packaged artifacts.
Args:
output_path (str): Specifies where to store the packaged model
role (str): Execution role
model_name (str): the name to attach to the model metadata
model_version (str): the version to attach to the model metadata
job_name (str): The name of the edge packaging job
resource_key (str): the kms key to encrypt the disk with
s3_kms_key (str): the kms key to encrypt the output with
tags (list[dict]): List of tags for labeling an edge packaging job. For
more, see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html.
Returns:
sagemaker.model.Model: A SageMaker ``Model`` object. See
:func:`~sagemaker.model.Model` for full details.
"""
if self._compilation_job_name is None:
raise ValueError("You must first compile this model")
if job_name is None:
job_name = f"packaging{self._compilation_job_name[11:]}"
if role is None:
role = self.sagemaker_session.expand_role(role)
self._init_sagemaker_session_if_does_not_exist(None)
config = self._edge_packaging_job_config(
output_path,
role,
model_name,
model_version,
job_name,
self._compilation_job_name,
resource_key,
s3_kms_key,
tags,
)
self.sagemaker_session.package_model_for_edge(**config)
job_status = self.sagemaker_session.wait_for_edge_packaging_job(job_name)
self.model_data = job_status["ModelArtifact"]
self._is_edge_packaged_model = True
return self
def compile(
self,
target_instance_family,
input_shape,
output_path,
role,
tags=None,
job_name=None,
compile_max_run=15 * 60,
framework=None,
framework_version=None,
target_platform_os=None,
target_platform_arch=None,
target_platform_accelerator=None,
compiler_options=None,
):
"""Compile this ``Model`` with SageMaker Neo.
Args:
target_instance_family (str): Identifies the device that you want to
run your model after compilation, for example: ml_c5. For allowed
strings see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_OutputConfig.html.
Alternatively, you can select an OS, Architecture and Accelerator using
``target_platform_os``, ``target_platform_arch``,
and ``target_platform_accelerator``.
input_shape (dict): Specifies the name and shape of the expected
inputs for your trained model in json dictionary form, for
example: {'data': [1,3,1024,1024]}, or {'var1': [1,1,28,28],
'var2': [1,1,28,28]}
output_path (str): Specifies where to store the compiled model
role (str): Execution role
tags (list[dict]): List of tags for labeling a compilation job. For
more, see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html.
job_name (str): The name of the compilation job
compile_max_run (int): Timeout in seconds for compilation (default:
15 * 60). After this amount of time Amazon SageMaker Neo
terminates the compilation job regardless of its current status.
framework (str): The framework that is used to train the original
model. Allowed values: 'mxnet', 'tensorflow', 'keras', 'pytorch',
'onnx', 'xgboost'
framework_version (str): The version of framework, for example:
'1.5' for PyTorch
target_platform_os (str): Target Platform OS, for example: 'LINUX'.
For allowed strings see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_OutputConfig.html.
It can be used instead of target_instance_family by setting target_instance
family to None.
target_platform_arch (str): Target Platform Architecture, for example: 'X86_64'.
For allowed strings see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_OutputConfig.html.
It can be used instead of target_instance_family by setting target_instance
family to None.
target_platform_accelerator (str, optional): Target Platform Accelerator,
for example: 'NVIDIA'. For allowed strings see
https://docs.aws.amazon.com/sagemaker/latest/dg/API_OutputConfig.html.
It can be used instead of target_instance_family by setting target_instance
family to None.
compiler_options (dict, optional): Additional parameters for compiler.
Compiler Options are TargetPlatform / target_instance_family specific. See
https://docs.aws.amazon.com/sagemaker/latest/dg/API_OutputConfig.html for details.
Returns:
sagemaker.model.Model: A SageMaker ``Model`` object. See
:func:`~sagemaker.model.Model` for full details.
"""
framework = framework or self._framework()
if framework is None:
raise ValueError(
"You must specify framework, allowed values {}".format(NEO_ALLOWED_FRAMEWORKS)
)
if framework not in NEO_ALLOWED_FRAMEWORKS:
raise ValueError(
"You must provide valid framework, allowed values {}".format(NEO_ALLOWED_FRAMEWORKS)
)
if job_name is None:
raise ValueError("You must provide a compilation job name")
if self.model_data is None:
raise ValueError("You must provide an S3 path to the compressed model artifacts.")
framework_version = framework_version or self._get_framework_version()
self._init_sagemaker_session_if_does_not_exist(target_instance_family)
config = self._compilation_job_config(
target_instance_family,
input_shape,
output_path,
role,
compile_max_run,
job_name,
framework,
tags,
target_platform_os,
target_platform_arch,
target_platform_accelerator,
compiler_options,
framework_version,
)
self.sagemaker_session.compile_model(**config)
job_status = self.sagemaker_session.wait_for_compilation_job(job_name)
self.model_data = job_status["ModelArtifacts"]["S3ModelArtifacts"]
if target_instance_family is not None:
if target_instance_family == "ml_eia2":