<a href="https://colab.research.google.com/github/benjaminbrown038/Amazon/blob/main/notebooks/amazon/amazon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sagemaker

- Hugging Face
- PyTorch
- Tensorflow

## Sagemaker

#### Imports

In [None]:
%%capture
!pip3 install sagemaker
import logging
import time

import boto3

import sagemaker
from sagemaker.xgboost import XGBoost
from sagemaker.experiments.run import Run
from sagemaker.analytics import ExperimentAnalytics

#### Role

In [None]:
role = sagemaker.get_execution_role()
role

#### Region

In [None]:
region = boto3.Session().region_name
region

#### Session

In [None]:
sagemaker_session = sagemaker.Session()
sagemaker_session

#### Bucket

In [None]:
bucket_name = sagemaker_session.default_bucket()
bucket_name

In [None]:
prefix = "end-to-end-ml"

## Hugging Face

#### Imports

In [None]:
!pip3 install sagemaker --upgrade
import sagemaker
from sagemaker.huggingface import HuggingFace, TrainingCompilerConfig
import sagemaker
import boto3
import transformers
import datasets
import argparse
import os
from sagemaker.huggingface import HuggingFace
from sagemaker.s3 import S3Downloader

In [None]:
sess = sagemaker.Session()
role = sagemaker.get_execution_role()
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='role-name-of-your-iam-role-with-right-permissions')['Role']['Arn']
sess = sagemaker.Session()

In [None]:
if __name__ == "__main__":

    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument("--epochs", type=int, default=3)
    parser.add_argument("--per_device_train_batch_size", type=int, default=32)
    parser.add_argument("--model_name_or_path", type=str)

    # data, model, and output directories
    parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
    parser.add_argument("--training_dir", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
    parser.add_argument("--test_dir", type=str, default=os.environ["SM_CHANNEL_TEST"])

In [None]:
hyperparameters={'epochs': 1,
                 'per_device_train_batch_size': 32,
                 'model_name_or_path': 'distilbert-base-uncased'}

In [None]:
huggingface_estimator = HuggingFace(
        entry_point='train.py',
        source_dir='./scripts',
        instance_type='ml.p3.2xlarge',
        instance_count=1,
        role=role,
        transformers_version='4.26',
        pytorch_version='1.13',
        py_version='py39',
        hyperparameters = hyperparameters)

In [None]:
huggingface_estimator.fit(
  {'train': 's3://sagemaker-us-east-1-558105141721/samples/datasets/imdb/train',
   'test': 's3://sagemaker-us-east-1-558105141721/samples/datasets/imdb/test'})

In [None]:
/opt/conda/bin/python train.py --epochs 1 --model_name_or_path distilbert-base-uncased --per_device_train_batch_size 32

In [None]:
S3Downloader.download(
    s3_uri=huggingface_estimator.model_data,
    local_path='.',
    sagemaker_session=sess)

In [None]:
distribution = {'smdistributed':{'dataparallel':{ 'enabled': True }}}

In [None]:
huggingface_estimator = HuggingFace(
        entry_point='train.py',
        source_dir='./scripts',
        instance_type='ml.p3dn.24xlarge',
        instance_count=2,
        role=role,
        transformers_version='4.26.0',
        pytorch_version='1.13.1',
        py_version='py39',
        hyperparameters = hyperparameters,
        distribution = distribution)

In [None]:
mpi_options = {
    "enabled" : True,
    "processes_per_host" : 8}

In [None]:
smp_options = {
    "enabled":True,
    "parameters": {
        "microbatches": 4,
        "placement_strategy": "spread",
        "pipeline": "interleaved",
        "optimize": "speed",
        "partitions": 4,
        "ddp": True,
    }
}

In [None]:
distribution={
    "smdistributed": {"modelparallel": smp_options},
    "mpi": mpi_options}

In [None]:
huggingface_estimator = HuggingFace(
        entry_point='train.py',
        source_dir='./scripts',
        instance_type='ml.p3dn.24xlarge',
        instance_count=2,
        role=role,
        transformers_version='4.26.0',
        pytorch_version='1.13.1',
        py_version='py39',
        hyperparameters = hyperparameters,
        distribution = distribution
)

In [None]:
hyperparameters={'epochs': 1,
                 'train_batch_size': 32,
                 'model_name':'distilbert-base-uncased',
                 'output_dir':'/opt/ml/checkpoints'}

In [None]:
huggingface_estimator = HuggingFace(
        entry_point='train.py',
        source_dir='./scripts',
        instance_type='ml.p3.2xlarge',
        instance_count=1,
	      checkpoint_s3_uri=f's3://{sess.default_bucket()}/checkpoints'
        use_spot_instances=True,
        max_wait=3600,
        max_run=1000,
        role=role,
        transformers_version='4.26',
        pytorch_version='1.13',
        py_version='py39',
        hyperparameters = hyperparameters)

In [None]:
git_config = {'repo': 'https://github.com/huggingface/transformers.git','branch': 'v4.4.2'}

In [None]:
huggingface_estimator = HuggingFace(
        entry_point='run_glue.py',
        source_dir='./examples/pytorch/text-classification',
        git_config=git_config,
        instance_type='ml.p3.2xlarge',
        instance_count=1,
        role=role,
        transformers_version='4.26',
        pytorch_version='1.13',
        py_version='py39',
        hyperparameters=hyperparameters
)

In [None]:
metric_definitions = [
    {"Name": "train_runtime", "Regex": "train_runtime.*=\D*(.*?)$"},
    {"Name": "eval_accuracy", "Regex": "eval_accuracy.*=\D*(.*?)$"},
    {"Name": "eval_loss", "Regex": "eval_loss.*=\D*(.*?)$"},
]

In [None]:
huggingface_estimator = HuggingFace(
        entry_point='train.py',
        source_dir='./scripts',
        instance_type='ml.p3.2xlarge',
        instance_count=1,
        role=role,
        transformers_version='4.26',
        pytorch_version='1.13',
        py_version='py39',
        metric_definitions=metric_definitions,
        hyperparameters = hyperparameters)

In [None]:
huggingface_estimator=HuggingFace(
    compiler_config=TrainingCompilerConfig())

# PyTorch
### Use PyTorch with the SageMaker Python SDK

- Train

- Deploy

In [None]:
import argparse
import os

if __name__ =='__main__':

    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument('--epochs', type=int, default=50)
    parser.add_argument('--batch-size', type=int, default=64)
    parser.add_argument('--learning-rate', type=float, default=0.05)
    parser.add_argument('--use-cuda', type=bool, default=False)

    # Data, model, and output directories
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
    parser.add_argument('--test', type=str, default=os.environ['SM_CHANNEL_TEST'])

    args, _ = parser.parse_known_args()

    # ... load from args.train and args.test, train a model, write model to args.model_dir.

In [None]:
import argparse
import os
import torch

if __name__=='__main__':
    # default to the value in environment variable `SM_MODEL_DIR`. Using args makes the script more portable.
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    args, _ = parser.parse_known_args()

    # ... train `model`, then save it to `model_dir`
    with open(os.path.join(args.model_dir, 'model.pth'), 'wb') as f:
        torch.save(model.state_dict(), f)

In [None]:
import os
import torch

# ... train `model`, then save it to `model_dir`
model_dir = os.path.join(model_dir, "model.pt")
torch.jit.save(model, model_dir)

In [None]:
pytorch_estimator = PyTorch('pytorch-train.py',
                            instance_type='ml.p3.2xlarge',
                            instance_count=1,
                            framework_version='1.8.0',
                            py_version='py3',
                            hyperparameters = {'epochs': 20, 'batch-size': 64, 'learning-rate': 0.1})
pytorch_estimator.fit({'train': 's3://my-data-bucket/path/to/my/training/data',
                       'test': 's3://my-data-bucket/path/to/my/test/data'})

In [None]:
{'train':'s3://my-bucket/my-training-data',
 'eval':'s3://my-bucket/my-evaluation-data'}

In [None]:
import torch.distributed as dist

if args.distributed:
    # Initialize the distributed environment.
    world_size = len(args.hosts)
    os.environ['WORLD_SIZE'] = str(world_size)
    host_rank = args.hosts.index(args.current_host)
    dist.init_process_group(backend=args.backend, rank=host_rank)

In [None]:
from sagemaker.pytorch import PyTorch

pt_estimator = PyTorch(
    entry_point="train_ptddp.py",
    role="SageMakerRole",
    framework_version="1.12.0",
    py_version="py38",
    instance_count=2,
    instance_type="ml.p4d.24xlarge",
    distribution={
        "pytorchddp": {
            "enabled": True
        }
    }
)

In [None]:
from sagemaker.pytorch import PyTorch

pt_estimator = PyTorch(
    entry_point="train_ptddp.py",
    role="SageMakerRole",
    framework_version="1.13.1",
    py_version="py38",
    instance_count=2,
    instance_type="ml.p4d.24xlarge",
    distribution={
        "torch_distributed": {
            "enabled": True
        }
    }
)

In [None]:
import torch.distributed as dist

dist.init_process_group('xla')

In [None]:
from sagemaker.pytorch import PyTorch

pt_estimator = PyTorch(
    entry_point="train_torch_distributed.py",
    role="SageMakerRole",
    framework_version="1.11.0",
    py_version="py38",
    instance_count=1,
    instance_type="ml.trn1.2xlarge",
    distribution={
        "torch_distributed": {
            "enabled": True
        }
    }
)

pt_estimator.fit("s3://bucket/path/to/training/data")

In [None]:
from sagemaker.pytorch import PyTorch

pt_estimator = PyTorch(
    entry_point="train_torch_distributed.py",
    role="SageMakerRole",
    framework_version="1.11.0",
    py_version="py38",
    instance_count=2,
    instance_type="ml.trn1.32xlarge",
    distribution={
        "torch_distributed": {
            "enabled": True
        }
    }
)

pt_estimator.fit("s3://bucket/path/to/training/data")

In [None]:
# Train my estimator
pytorch_estimator = PyTorch(entry_point='train_and_deploy.py',
                            instance_type='ml.p3.2xlarge',
                            instance_count=1,
                            framework_version='1.8.0',
                            py_version='py3')
pytorch_estimator.fit('s3://my_bucket/my_training_data/')

# Deploy my estimator to a SageMaker Endpoint and get a Predictor
predictor = pytorch_estimator.deploy(instance_type='ml.m4.xlarge',
                                     initial_instance_count=1)

# `data` is a NumPy array or a Python list.
# `response` is a NumPy array.
response = predictor.predict(data)

In [None]:
predictor = pytorch_estimator.deploy(instance_type='ml.m4.xlarge',
                                     initial_instance_count=1,
                                     accelerator_type='ml.eia2.medium')

In [None]:
def model_fn(model_dir, context)

In [None]:
import torch
import os

def model_fn(model_dir, context):
    model = Your_Model()
    with open(os.path.join(model_dir, 'model.pth'), 'rb') as f:
        model.load_state_dict(torch.load(f))
    return model

In [None]:
import torch


def model_fn(model_dir):
    model = torch.jit.load('model.pth', map_location=torch.device('cpu'))
    if torch.__version__ == '1.5.1':
        import torcheia
        model = model.eval()
        # attach_eia() is introduced in PyTorch Elastic Inference 1.5.1,
        model = torcheia.jit.attach_eia(model, 0)
    return model

In [None]:
# Deserialize the Invoke request body into an object we can perform prediction on
input_object = input_fn(request_body, request_content_type, context)

# Perform prediction on the deserialized object, with the loaded model
prediction = predict_fn(input_object, model, context)

# Serialize the prediction result into the desired response content type
output = output_fn(prediction, response_content_type, context)

In [None]:
def input_fn(request_body, request_content_type, context)

In [None]:
import numpy as np
import torch
from six import BytesIO

def input_fn(request_body, request_content_type):
    """An input_fn that loads a pickled tensor"""
    if request_content_type == 'application/python-pickle':
        return torch.load(BytesIO(request_body))
    else:
        # Handle other content-types here or raise an Exception
        # if the content type is not supported.
        pass

In [None]:
def predict_fn(input_object, model, context)

In [None]:
import torch
import numpy as np

def predict_fn(input_data, model):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    with torch.no_grad():
        return model(input_data.to(device))

In [None]:
import torch
import numpy as np

def predict_fn(input_data, model, context):
    device = torch.device("cuda:" + str(context.system_properties.get("gpu_id")) if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    with torch.no_grad():
        return model(input_data.to(device))

In [None]:
import torch
import numpy as np

def predict_fn(input_data, model):
    device = torch.device("cpu")
    model = model.to(device)
    input_data = data.to(device)
    model.eval()
    with torch.jit.optimized_execution(True, {"target_device": "eia:0"}):
        output = model(input_data)

In [None]:
import numpy as np
import torch


def predict_fn(input_data, model):
    device = torch.device("cpu")
    input_data = data.to(device)
    # make sure torcheia is imported so that Elastic Inference api call will be invoked
    import torcheia
    # we need to set the profiling executor for EIA
    torch._C._jit_set_profiling_executor(False)
    with torch.jit.optimized_execution(True):
        output = model.forward(input_data)

In [None]:
def output_fn(prediction, content_type, context)

In [None]:
from sagemaker import get_execution_role
role = get_execution_role()

pytorch_model = PyTorchModel(model_data='s3://my-bucket/my-path/model.tar.gz', role=role,
                             entry_point='inference.py')

predictor = pytorch_model.deploy(instance_type='ml.c4.xlarge', initial_instance_count=1)

In [None]:
my_training_job_name = 'MyAwesomePyTorchTrainingJob'
pytorch_estimator = PyTorch.attach(my_training_job_name)

## Tensorflow

#### Deploying to TensorFlow Serving Endpoints

In [None]:
from sagemaker.tensorflow import TensorFlow

estimator = TensorFlow(
    entry_point="tf-train.py",
    ...,
    instance_count=1,
    instance_type="ml.c4.xlarge",
    framework_version="2.2",
    py_version="py37",
)

estimator.fit(inputs)

predictor = estimator.deploy(initial_instance_count=1, instance_type="ml.c5.xlarge")

In [None]:
from sagemaker.tensorflow import TensorFlowModel

model = TensorFlowModel(model_data='s3://mybucket/model.tar.gz', role='MySageMakerRole')

predictor = model.deploy(initial_instance_count=1, instance_type='ml.c5.xlarge')

In [None]:
from sagemaker.tensorflow import TensorFlowModel

model = TensorFlowModel(model_data='s3://mybucket/model.tar.gz', role='MySageMakerRole')

predictor = model.deploy(initial_instance_count=1, instance_type='ml.c5.xlarge', accelerator_type='ml.eia1.medium')

In [None]:
input = {
  'instances': [1.0, 2.0, 5.0]
}
result = predictor.predict(input)

In [None]:
{
  'predictions': [3.5, 4.0, 5.5]
}

In [None]:
# input matches the Classify and Regress API
input = {
  'signature_name': 'tensorflow/serving/regress',
  'examples': [{'x': 1.0}, {'x': 2.0}]
}

result = predictor.regress(input)  # or predictor.classify(...)

# result contains:
{
  'results': [3.5, 4.0]
}

In [None]:
input = {
  'instances': [
    [1.0, 2.0, 5.0],
    [1.0, 2.0, 5.0],
    [1.0, 2.0, 5.0]
  ]
}
result = predictor.predict(input)

# result contains:
{
  'predictions': [
    [3.5, 4.0, 5.5],
    [3.5, 4.0, 5.5],
    [3.5, 4.0, 5.5]
  ]
}

In [None]:
input = [
  [1.0, 2.0, 5.0],
  [1.0, 2.0, 5.0]
]
result = predictor.predict(input)

# result contains:
{
  'predictions': [
    [3.5, 4.0, 5.5],
    [3.5, 4.0, 5.5]
  ]
}

In [None]:
# 'x' must match name of input tensor in your SavedModel graph
# for models with multiple named inputs, just include all the keys in the input dict
input = {
  'x': [1.0, 2.0, 5.0]
}

# result contains:
{
  'predictions': [
    [3.5, 4.0, 5.5]
  ]
}

In [None]:
# create a Predictor without JSON serialization

predictor = Predictor('endpoint-name', serializer=None, content_type='application/jsonlines')

input = '''{'x': [1.0, 2.0, 5.0]}
{'x': [1.0, 2.0, 5.0]}
{'x': [1.0, 2.0, 5.0]}'''

result = predictor.predict(input)

# result contains:
{
  'predictions': [
    [3.5, 4.0, 5.5],
    [3.5, 4.0, 5.5],
    [3.5, 4.0, 5.5]
  ]
}

In [None]:
# create a Predictor with JSON serialization

predictor = Predictor('endpoint-name', serializer=sagemaker.serializers.CSVSerializer())

# CSV-formatted string input
input = '1.0,2.0,5.0\n1.0,2.0,5.0\n1.0,2.0,5.0'

result = predictor.predict(input)

# result contains:
{
  'predictions': [
    [3.5, 4.0, 5.5],
    [3.5, 4.0, 5.5],
    [3.5, 4.0, 5.5]
  ]
}

In [None]:
from sagemaker.tensorflow import TensorFlowModel

model = Model(entry_point='inference.py',
              model_data='s3://mybucket/model.tar.gz',
              role='MySageMakerRole')

In [None]:
import json

def input_handler(data, context):
    """ Pre-process request input before it is sent to TensorFlow Serving REST API
    Args:
        data (obj): the request data, in format of dict or string
        context (Context): an object containing request and configuration details
    Returns:
        (dict): a JSON-serializable dict that contains request body and headers
    """
    if context.request_content_type == 'application/json':
        # pass through json (assumes it's correctly formed)
        d = data.read().decode('utf-8')
        return d if len(d) else ''

    if context.request_content_type == 'text/csv':
        # very simple csv handler
        return json.dumps({
            'instances': [float(x) for x in data.read().decode('utf-8').split(',')]
        })

    raise ValueError('{{"error": "unsupported content type {}"}}'.format(
        context.request_content_type or "unknown"))


def output_handler(data, context):
    """Post-process TensorFlow Serving output before it is returned to the client.
    Args:
        data (obj): the TensorFlow serving response
        context (Context): an object containing request and configuration details
    Returns:
        (bytes, string): data to return to client, response content type
    """
    if data.status_code != 200:
        raise ValueError(data.content.decode('utf-8'))

    response_content_type = context.accept_header
    prediction = data.content
    return prediction, response_content_type

In [None]:
import json
import requests


def handler(data, context):
    """Handle request.
    Args:
        data (obj): the request data
        context (Context): an object containing request and configuration details
    Returns:
        (bytes, string): data to return to client, (optional) response content type
    """
    processed_input = _process_input(data, context)
    response = requests.post(context.rest_uri, data=processed_input)
    return _process_output(response, context)


def _process_input(data, context):
    if context.request_content_type == 'application/json':
        # pass through json (assumes it's correctly formed)
        d = data.read().decode('utf-8')
        return d if len(d) else ''

    if context.request_content_type == 'text/csv':
        # very simple csv handler
        return json.dumps({
            'instances': [float(x) for x in data.read().decode('utf-8').split(',')]
        })

    raise ValueError('{{"error": "unsupported content type {}"}}'.format(
        context.request_content_type or "unknown"))


def _process_output(data, context):
    if data.status_code != 200:
        raise ValueError(data.content.decode('utf-8'))

    response_content_type = context.accept_header
    prediction = data.content
    return prediction, response_content_type

In [None]:
from sagemaker.tensorflow import TensorFlowModel

model = Model(entry_point='inference.py',
              source_dir='source/directory',
              model_data='s3://mybucket/model.tar.gz',
              role='MySageMakerRole')

In [None]:
from sagemaker.tensorflow import TensorFlowModel

model = Model(entry_point='inference.py',
              dependencies=['/path/to/folder/named/lib'],
              model_data='s3://mybucket/model.tar.gz',
              role='MySageMakerRole')

In [None]:
aws s3 cp s3://mybucket/models/model1/model.tar.gz model1.tar.gz
aws s3 cp s3://mybucket/models/model2/model.tar.gz model2.tar.gz
mkdir -p multi/model1
mkdir -p multi/model2

tar xvf model1.tar.gz -C ./multi/model1
tar xvf model2.tar.gz -C ./multi/model2

In [None]:
mv multi/model1/export/Servo/* multi/model1/
mv multi/model2/export/Servo/* multi/model2/
rm -fr multi/model1/export
rm -fr multi/model2/export

In [None]:
tar -C "$PWD/multi/" -czvf multi.tar.gz multi/

In [None]:
aws s3 cp multi.tar.gz s3://mybucket/models/multi.tar.gz

In [None]:
from sagemaker.tensorflow import TensorFlowModel, TensorFlowPredictor

# change this to the name or ARN of your SageMaker execution role
role = 'SageMakerRole'

model_data = 's3://mybucket/models/multi.tar.gz'

# For multi-model endpoints, you should set the default model name in
# an environment variable. If it isn't set, the endpoint will work,
# but the model it will select as default is unpredictable.
env = {
  'SAGEMAKER_TFS_DEFAULT_MODEL_NAME': 'model1'
}

model = Model(model_data=model_data, role=role, framework_version='1.11', env=env)
predictor = model.deploy(initial_instance_count=1, instance_type='ml.c5.xlarge')

In [None]:
# ... continuing from the previous example

# get the endpoint name from the default predictor
endpoint = predictor.endpoint_name

# get a predictor for 'model2'
model2_predictor = Predictor(endpoint, model_name='model2')

# note: that will for actual SageMaker endpoints, but if you are using
# local mode you need to create the new Predictor this way:
#
# model2_predictor = Predictor(endpoint, model_name='model2'
#                              sagemaker_session=predictor.sagemaker_session)


# result is prediction from 'model2'
result = model2_predictor.predict(...)

In [None]:
# TensorFlow Serving REST API - predict request
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name my-endpoint \
    --content-type 'application/json' \
    --body '{"instances": [1.0, 2.0, 5.0]}' \
    >(cat) 1>/dev/null

# Predict request for specific model name
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name my-endpoint \
    --content-type 'application/json' \
    --body '{"instances": [1.0, 2.0, 5.0]}' \
    --custom-attributes 'tfs-model-name=other_model' \
    >(cat) 1>/dev/null

# TensorFlow Serving REST API - regress request
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name my-endpoint \
    --content-type 'application/json' \
    --body '{"signature_name": "tensorflow/serving/regress","examples": [{"x": 1.0}]}' \
    --custom-attributes 'tfs-method=regress' \
    >(cat) 1>/dev/null

# Simple json request (2 instances)
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name my-endpoint \
    --content-type 'application/json' \
    --body '[[1.0, 2.0, 5.0],[2.0, 3.0, 4.0]]' \
    >(cat) 1>/dev/null

# CSV request (2 rows)
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name my-endpoint \
    --content-type 'text/csv' \
    --body "1.0,2.0,5.0"$'\n'"2.0,3.0,4.0" \
    >(cat) 1>/dev/null

# Line delimited JSON from an input file
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name my-endpoint \
    --content-type 'application/jsons' \
    --body "$(cat input.jsons)" \
    results.json

# Tensorflow

###