In [2]:
%cd /root/DeepLearningModelDeployment/notebooks/

/root/DeepLearningModelDeployment/notebooks


In [3]:
!pip install --upgrade pip 
!pip install -q sagemaker-experiments

Collecting pip
  Using cached pip-22.1-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 21.1.2
    Uninstalling pip-21.1.2:
      Successfully uninstalled pip-21.1.2
Successfully installed pip-22.1
[0m

In [4]:
import sagemaker
import json
import boto3

role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()
prefix = 'tf-deep-imdb-model'

In [5]:
import numpy as np
import tensorflow as tf
import os
from tensorflow.keras.preprocessing import sequence
from tensorflow.python.keras.datasets import imdb

In [6]:
# Load imdb data and assign to train and test

max_features = 20000
maxlen = 400

(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
print(len(x_train), 'train sequences')
print(len(x_test), 'test sequences')

x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
print('x_train shape:', x_train.shape)
print('x_test shape:', x_test.shape)

25000 train sequences
25000 test sequences
x_train shape: (25000, 400)
x_test shape: (25000, 400)


In [7]:
"""
Create folder structure and copy train and test data to npy files.
npy files contains an array saved in the NumPy (NPY) file format. 
NPY files store all the information required to reconstruct an array on any computer, 
which includes dtype and shape information
"""

data_dir = os.path.join(os.getcwd(), 'imdb_data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'imdb_data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'imdb_data/test')
os.makedirs(test_dir, exist_ok=True)

csv_test_dir = os.path.join(os.getcwd(), 'imdb_data/csv-test')
os.makedirs(csv_test_dir, exist_ok=True)

np.save(os.path.join(train_dir, 'x_train.npy'), x_train)
np.save(os.path.join(train_dir, 'y_train.npy'), y_train)
np.save(os.path.join(test_dir, 'x_test.npy'), x_test)
np.save(os.path.join(test_dir, 'y_test.npy'), y_test)
np.savetxt(os.path.join(csv_test_dir, 'csv-test.csv'), 
           np.array(x_test[:100], dtype=np.int32), fmt='%d', delimiter=",")

In [8]:
# Upload train and test data into default s3 bucket

traindata_s3_prefix = f'{prefix}/imdb_data/train'
testdata_s3_prefix = f'{prefix}/imdb_data/test'

train_s3 = sess.upload_data(path='./imdb_data/train/', key_prefix=traindata_s3_prefix)
test_s3 = sess.upload_data(path='./imdb_data/test/', key_prefix=testdata_s3_prefix)

In [9]:
!mkdir code

mkdir: cannot create directory ‘code’: File exists


In [10]:
%%writefile code/smdp_tensorflow_sentiment.py
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import argparse
import codecs
import json
import numpy as np
import os
import tensorflow as tf

import smdistributed.dataparallel.tensorflow as sdp

max_features = 20000
maxlen = 400
embedding_dims = 300
filters = 256
kernel_size = 3
hidden_dims = 256

def parse_args():
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument('--epochs', type=int, default=1)
    parser.add_argument('--batch_size', type=int, default=64)
    parser.add_argument('--learning_rate', type=float, default=0.01)
    parser.add_argument('--drop_out_rate', type=float, default=0.2)

    # data directories
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--test', type=str, default=os.environ.get('SM_CHANNEL_TEST'))

    # model directory /opt/ml/model default set by SageMaker
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))

    return parser.parse_known_args()


def get_train_data(train_dir, batch_size):
    x_train = np.load(os.path.join(train_dir, 'x_train.npy'))
    y_train = np.load(os.path.join(train_dir, 'y_train.npy'))
    print(f'x train {x_train.shape} y train {y_train.shape}')
    
    dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    dataset = dataset.batch(batch_size, drop_remainder=True)

    return dataset


def get_test_data(test_dir):
    x_test = np.load(os.path.join(test_dir, 'x_test.npy'))
    y_test = np.load(os.path.join(test_dir, 'y_test.npy'))
    print(f'x test {x_test.shape} y test {y_test.shape}')

    return x_test, y_test


def get_model(args):
    embedding_layer = tf.keras.layers.Embedding(max_features,
                                                embedding_dims,
                                                input_length=maxlen)

    sequence_input = tf.keras.Input(shape=(maxlen,), dtype='int32')
    embedded_sequences = embedding_layer(sequence_input)
    x = tf.keras.layers.Dropout(args.drop_out_rate)(embedded_sequences)
    x = tf.keras.layers.Conv1D(filters, kernel_size, padding='valid', activation='relu', strides=1)(x)
    x = tf.keras.layers.MaxPooling1D()(x)
    x = tf.keras.layers.GlobalMaxPooling1D()(x)
    x = tf.keras.layers.Dense(hidden_dims, activation='relu')(x)
    x = tf.keras.layers.Dropout(args.drop_out_rate)(x)
    preds = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    model = tf.keras.Model(sequence_input, preds)

    return model


def train(train_dataset, args):
    model = get_model(args)
    
    loss = tf.losses.BinaryCrossentropy(name = 'binary_crossentropy')
    acc = tf.metrics.BinaryAccuracy(name = 'accuracy')
    optimizer = tf.optimizers.Adam(learning_rate = args.learning_rate)
    
    @tf.function
    def training_step(x_train, y_train, first_batch):
        with tf.GradientTape() as tape:
            probs = model(x_train, training=True)
            loss_value = loss(y_train, probs)
            acc_value = acc(y_train, probs)

        # SMDataParallel: Wrap tf.GradientTape with SMDataParallel's DistributedGradientTape
        tape = sdp.DistributedGradientTape(tape, sparse_as_dense = True)
        grads = tape.gradient(loss_value, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if first_batch:
            print('first batch')
            # SMDataParallel: Broadcast model and optimizer variables
            sdp.broadcast_variables(model.variables, root_rank=0)
            sdp.broadcast_variables(optimizer.variables(), root_rank=0)

        # SMDataParallel: all_reduce call
        loss_value = sdp.oob_allreduce(loss_value)  # Average the loss across workers
        acc_value = sdp.oob_allreduce(acc_value)
 
        return loss_value, acc_value
    
    for epoch in range(args.epochs):
        for batch, (x_train, y_train) in enumerate(train_dataset.take(len(train_dataset)//sdp.size())):
            is_first_batch = (epoch == 0) and (batch == 0)
            loss_value, acc_value = training_step(x_train, y_train, is_first_batch)

            if batch % 10 == 0 and sdp.rank() == 0:
                print('Epoch #%d, Step #%d\tLoss: %.6f, Acc: %.6f (batch_size=%d)' % (epoch, batch, loss_value, acc_value, len(y_train)))

    # SMDataParallel: Save checkpoints only from master node.
    if sdp.rank() == 0:
        model.save(os.path.join(args.model_dir, '1'))
    

if __name__ == "__main__":

    args, _ = parse_args()

    # initialize sagemaker data parallel (dist)
    sdp.init()

    # ping each GPU to a single smdistributed.dataparallel process with local_rank
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
        
    # scale the learning rate by number of workers
    print('sdp.size() = %s' % sdp.size())
    args.learning_rate = args.learning_rate * sdp.size()
    
    train_dataset = get_train_data(args.train, args.batch_size)

    train(train_dataset, args)

Overwriting code/smdp_tensorflow_sentiment.py


In [15]:
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from botocore.exceptions import ClientError
from time import gmtime, strftime
import time

experiment_name = 'imdb-experiment-tfdeepmode2test'

try:
    experiment = Experiment.create(
        experiment_name=experiment_name, 
        description='Training a sentiment classification model using imdb dataset.')
except ClientError as e:
    print(f'{experiment_name} experiment already exists! Reusing the existing experiment.')

In [16]:
from sagemaker.tensorflow import TensorFlow
from time import gmtime, strftime
import time

exp_datetime = strftime('%Y-%m-%d-%H-%M-%S', gmtime())
jobname = f'imdb-tf-deepmodel-job-{exp_datetime}'

s3_output_location = f's3://{bucket}/{prefix}/{jobname}'
code_dir = f's3://{bucket}/{prefix}/{jobname}'

train_instance_type = 'ml.p3.16xlarge'
hyperparameters = {'epochs': 30, 'batch_size': 512, 
                   'learning_rate': 0.001, 'drop_out_rate': 0.2}
distribution = {'smdistributed': {'dataparallel': {'enabled': True}}}

estimator = TensorFlow(source_dir='code',
                       entry_point='smdp_tensorflow_sentiment.py',
                       output_path=s3_output_location,
                       code_location=code_dir,
                       instance_type=train_instance_type,
                       instance_count=1,
                       enable_sagemaker_metrics=True,
                       hyperparameters=hyperparameters,
                       sagemaker_session=sess,
                       role=role,
                       framework_version='2.4',
                       py_version='py37', 
                       distribution=distribution)

data_channels = {'train':train_s3, 'test': test_s3}
print(data_channels)

{'train': 's3://sagemaker-us-east-1-104877823522/tf-deep-imdb-model/imdb_data/train', 'test': 's3://sagemaker-us-east-1-104877823522/tf-deep-imdb-model/imdb_data/test'}


In [17]:
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

exp_trial = Trial.create(experiment_name=experiment_name, 
                         trial_name=jobname)

experiment_config={'ExperimentName': experiment_name,
                   'TrialName': exp_trial.trial_name,
                   'TrialComponentDisplayName': 'Training-Experiment-Trial'}

estimator.fit(inputs=data_channels,
              job_name=jobname,
              experiment_config=experiment_config,
              logs=True)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating training-job with name: imdb-tf-deepmodel-job-2022-05-18-03-44-09


ResourceLimitExceeded: An error occurred (ResourceLimitExceeded) when calling the CreateTrainingJob operation: The account-level service limit 'ml.p3.16xlarge for training job usage' is 0 Instances, with current utilization of 0 Instances and a request delta of 1 Instances. Please contact AWS support to request an increase for this limit.

In [None]:
!mkdir ./imdb_data/model -p
!aws s3 cp {estimator.model_data} ./imdb_data/model.tar.gz
!tar -xzf ./imdb_data/model.tar.gz -C ./imdb_data/model/

In [None]:
my_model=tf.keras.models.load_model('./imdb_data/model/1/')

In [None]:
my_model.summary()

In [None]:
loss, acc=my_model.evaluate(x_test, y_test, verbose=2)
print('Restored model, accuracy: {:5.2f}%'.format(100 * acc))

Below is the deployment process

In [None]:
max_features = 2000
maxlen = 100

(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
print(len(x_train), 'train sequences')
print(len(x_test), 'test sequences')

x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
print('x_train shape:', x_train.shape)
print('x_test shape:', x_test.shape)

csv_test_dir_prefix = 'imdb_data/infernce'
csv_test_filename = 'test.csv'
csv_test_dir = os.path.join(os.getcwd(), csv_test_dir_prefix)
os.makedirs(csv_test_dir, exist_ok=True)

np.savetxt(os.path.join(csv_test_dir, csv_test_filename), 
           np.array(x_test, dtype=np.int32), fmt='%d', delimiter=",")

test_data_s3prefix = f'{prefix}/infernce/csv_test'
test_data_s3 = sess.upload_data(path=csv_test_dir, 
                                key_prefix=test_data_s3prefix)
print(test_data_s3)

In [None]:
# In Experiments and trials, you should see your training job as a trial in the list.
training_job_name='imdb-tf-deepmodel-job-2022-04-30-19-10-49'

# Once you have attached training_job_name and reload estimator, 
# you should see the history of the job printed in the output.

estimator_deploy = TensorFlow.attach(training_job_name) # It gives details about the train

In [None]:
exp_datetime = strftime('%Y-%m-%d-%H-%M-%S', gmtime())

# Creating a new trial for the experiment
exp_trial = Trial.load(trial_name=training_job_name)

experiment_config={
    'ExperimentName': experiment_name,
    'TrialName': exp_trial.trial_name,
    'TrialComponentDisplayName': 'tf-model-inference-batchTransform'}

In [None]:
exp_datetime = strftime('%Y-%m-%d-%H-%M-%S', gmtime())
jobname = f'imdb-tf-deep-model-batach-transform-{exp_datetime}'

s3_output_location = f's3://{bucket}/{prefix}/{jobname}'

# Run SageMaker batch transform
# Below method creates a Transformer object with the compute resource desired for the inference.
# The max_payload argument allows us to control the size of each mini-batch 
# that SageMaker Batch Transform is splitting.
transformer = estimator_deploy.transformer(instance_count=1, 
                                    instance_type='ml.c4.xlarge',
                                    max_payload = 2,
                                    accept = 'application/jsonlines',
                                    output_path = s3_output_location,
                                    assemble_with = 'Line')

transformer.transform(test_data_s3, 
                      content_type='text/csv', 
                      split_type = 'Line', 
                      job_name = jobname,
                      experiment_config = experiment_config)
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)

In [None]:
# the below code for error info
#job_name = 'imdb-tf-deepmodel-job-2022-04-30-19-10-49'
#sage = boto3.client('sagemaker')
#sage.describe_training_job(TrainingJobName=job_name)['FailureReason']

In [None]:
"""
SageMaker batch transform saves the results to the 
specified S3 location with .out appended to the input filename. 
We can access the full S3 path in transformer.output_path attribute. 
SageMaker uses TensorFlow Serving, a model serving framework developed by TensorFlow,
for model serving, the model output is written in JSON format.
The output has the sentiment probabilities in an array with predictions as the JSON key.
We can inspect the batch transform results with the following code:
"""
output = transformer.output_path
output_prefix = 'imdb_data/test_output'
!mkdir -p {output_prefix}
!aws s3 cp --recursive {output} {output_prefix}
!head {output_prefix}/{csv_test_filename}.out

In [None]:
results=[]
with open(f'{output_prefix}/{csv_test_filename}.out', 'r') as f:
    lines = f.readlines()
    for line in lines:
        print(line)
        json_output = json.loads(line)
        result = [float('%.3f'%(item)) for sublist in json_output['predictions'] 
                                       for item in sublist]
        results += result

print(results)

In [None]:
def get_sentiment(score):
    return 'positive' if score > 0.5 else 'negative' 

In [None]:
import re

regex = re.compile(r'^[\?\s]+')
word_index = imdb.get_word_index()

In [None]:
data_index=199
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
first_decoded_review = ' '.join([reverse_word_index.get(i - 3, '?') 
                                 for i in x_test[data_index]])
regex.sub('', first_decoded_review)

In [None]:
print(f'Labeled sentiment for this review is {get_sentiment(y_test[data_index])}')
print(f'Predicted sentiment is {get_sentiment(results[data_index])}')

In [None]:
"""
Fully managed mini-batching helps make inferences on a large dataset efficiently.
You can use a separate SageMaker-managed compute infrastructure that is different from your notebook instance. You can easily run prediction with a cluster of instances for faster prediction.
You only pay for the runtime of a batch transform job, even with a much larger compute cluster.
You can schedule and kick off a model prediction independently in the cloud with SageMaker batch transform. It is not necessary to use a Python notebook in SageMaker Studio to start a prediction job.
"""

Hosting real-time endpoints

In [None]:
# In Experiments and trials, you should see your training job as a trial in the list.
training_job_name='imdb-tf-deepmodel-job-2022-04-30-19-10-49'

# Once you have attached training_job_name and reload estimator, 
# you should see the history of the job printed in the output.

estimator_real_time_deploy = TensorFlow.attach(training_job_name) # It gives details about the train

In [None]:
predictor = estimator_real_time_deploy.deploy(initial_instance_count=1, 
                             instance_type='ml.c4.xlarge')

In [None]:
predictor.endpoint_name

In [None]:
data_index=199

In [None]:
prediction=predictor.predict(x_test[data_index])
print(prediction)

In [None]:
def get_sentiment(score):
    return 'positive' if score > 0.5 else 'negative' 

import re

regex = re.compile(r'^[\?\s]+')
word_index = imdb.get_word_index()

reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])
first_decoded_review = ' '.join([reverse_word_index.get(i - 3, '?') 
                                 for i in x_test[data_index]])
regex.sub('', first_decoded_review)

In [None]:
print(f'Labeled sentiment for this review is {get_sentiment(y_test[data_index])}')
print(f'Predicted sentiment is {get_sentiment(prediction["predictions"][0][0])}')

In [None]:
#predictor.predict(x_test[:5000]) # this would throw an error due to large volume

In [None]:
# Implementing auto scaling 
sagemaker_client = sess.boto_session.client('sagemaker')
autoscaling_client = sess.boto_session.client('application-autoscaling')

endpoint_name = predictor.endpoint_name
response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
print(response)

In [None]:
# application autoscaling references the endpoint using string below
resource_id=f'endpoint/{endpoint_name}/variant/AllTraffic' 
response = autoscaling_client.register_scalable_target(
    ServiceNamespace='sagemaker',
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    MinCapacity=1,
    MaxCapacity=4
)

response = autoscaling_client.put_scaling_policy(
    PolicyName='Invocations-ScalingPolicy',
    ServiceNamespace='sagemaker', # The namespace of the AWS service that provides the resource. 
    ResourceId=resource_id, # Endpoint name 
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling', # Other options are 'StepScaling'|'ScheduledScaling'
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 4000.0, # The target value for the metric below.
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance', 
        },
        'ScaleInCooldown': 600, 
        'ScaleOutCooldown': 300,
        'DisableScaleIn': False # If true, scale-in is disabled.
    }
)

In [None]:
response = autoscaling_client.describe_scaling_policies(
    ServiceNamespace='sagemaker'
)

for i in response['ScalingPolicies']:
    print('')
    print(i['PolicyName'])
    print('')
    if('TargetTrackingScalingPolicyConfiguration' in i):
        print(i['TargetTrackingScalingPolicyConfiguration']) 
    else:
        print(i['StepScalingPolicyConfiguration'])
    print('')

In [None]:
# Typically it's recommended to delete the endpoint to stop incurring cost
predictor.delete_endpoint()

Hosting multi-model endpoints to save costs

In [None]:
"""
A multi-model endpoint is a type of real-time endpoint in SageMaker 
that allows multiple models to be deployed behind the same endpoint.
Hosting models trained for 50 US states in 1 endpoint instead of 50, that's a 98% cost saving
"""

Optimizing instance type and autoscaling with load testing

In [None]:
"""
We use a Python load testing framework called locust to perform the load testing in SageMaker Studio.
"""

In [None]:
import sagemaker
import json
import boto3

role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()
prefix = 'sagemaker-smdb-dataparallel'

In [None]:
import numpy as np
import tensorflow as tf
import os
from tensorflow.keras.preprocessing import sequence
from tensorflow.python.keras.datasets import imdb

In [None]:
max_features = 20000
maxlen = 400

(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
print(len(x_train), 'train sequences')
print(len(x_test), 'test sequences')

x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
print('x_train shape:', x_train.shape)
print('x_test shape:', x_test.shape)

In [None]:
data_dir = os.path.join(os.getcwd(), 'imdb_data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(data_dir, 'train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(data_dir, 'test')
os.makedirs(test_dir, exist_ok=True)

csv_test_dir = os.path.join(data_dir, 'csv-test')
os.makedirs(csv_test_dir, exist_ok=True)

np.save(os.path.join(train_dir, 'x_train.npy'), x_train)
np.save(os.path.join(train_dir, 'y_train.npy'), y_train)
np.save(os.path.join(test_dir, 'x_test.npy'), x_test)
np.save(os.path.join(test_dir, 'y_test.npy'), y_test)
np.savetxt(os.path.join(csv_test_dir, 'csv-test.csv'), 
           np.array(x_test[:100], dtype=np.int32), fmt='%d', delimiter=",")

In [None]:
traindata_s3_prefix = f'{prefix}/imdb_data/train'
testdata_s3_prefix = f'{prefix}/imdb_data/test'

train_s3 = sess.upload_data(path='./imdb_data/train/', key_prefix=traindata_s3_prefix)
test_s3 = sess.upload_data(path='./imdb_data/test/', key_prefix=testdata_s3_prefix)

In [None]:
!mkdir code

In [None]:
%%writefile code/smdp_tensorflow_sentiment.py
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import argparse
import codecs
import json
import numpy as np
import os
import tensorflow as tf

import smdistributed.dataparallel.tensorflow as sdp

max_features = 20000
maxlen = 400
embedding_dims = 300
filters = 256
kernel_size = 3
hidden_dims = 256

def parse_args():
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument('--epochs', type=int, default=1)
    parser.add_argument('--batch_size', type=int, default=64)
    parser.add_argument('--learning_rate', type=float, default=0.01)
    parser.add_argument('--drop_out_rate', type=float, default=0.2)

    # data directories
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--test', type=str, default=os.environ.get('SM_CHANNEL_TEST'))

    # model directory /opt/ml/model default set by SageMaker
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))

    return parser.parse_known_args()


def get_train_data(train_dir, batch_size):
    x_train = np.load(os.path.join(train_dir, 'x_train.npy'))
    y_train = np.load(os.path.join(train_dir, 'y_train.npy'))
    print(f'x train {x_train.shape} y train {y_train.shape}')
    
    dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    dataset = dataset.batch(batch_size, drop_remainder=True)

    return dataset


def get_test_data(test_dir):
    x_test = np.load(os.path.join(test_dir, 'x_test.npy'))
    y_test = np.load(os.path.join(test_dir, 'y_test.npy'))
    print(f'x test {x_test.shape} y test {y_test.shape}')

    return x_test, y_test


def get_model(args):
    embedding_layer = tf.keras.layers.Embedding(max_features,
                                                embedding_dims,
                                                input_length=maxlen)

    sequence_input = tf.keras.Input(shape=(maxlen,), dtype='int32')
    embedded_sequences = embedding_layer(sequence_input)
    x = tf.keras.layers.Dropout(args.drop_out_rate)(embedded_sequences)
    x = tf.keras.layers.Conv1D(filters, kernel_size, padding='valid', activation='relu', strides=1)(x)
    x = tf.keras.layers.MaxPooling1D()(x)
    x = tf.keras.layers.GlobalMaxPooling1D()(x)
    x = tf.keras.layers.Dense(hidden_dims, activation='relu')(x)
    x = tf.keras.layers.Dropout(args.drop_out_rate)(x)
    preds = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    model = tf.keras.Model(sequence_input, preds)

    return model


def train(train_dataset, args):
    model = get_model(args)
    
    loss = tf.losses.BinaryCrossentropy(name = 'binary_crossentropy')
    acc = tf.metrics.BinaryAccuracy(name = 'accuracy')
    optimizer = tf.optimizers.Adam(learning_rate = args.learning_rate)
    
    @tf.function
    def training_step(x_train, y_train, first_batch):
        with tf.GradientTape() as tape:
            probs = model(x_train, training=True)
            loss_value = loss(y_train, probs)
            acc_value = acc(y_train, probs)

        # SMDataParallel: Wrap tf.GradientTape with SMDataParallel's DistributedGradientTape
        tape = sdp.DistributedGradientTape(tape, sparse_as_dense = True)
        grads = tape.gradient(loss_value, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if first_batch:
            print('first batch')
            # SMDataParallel: Broadcast model and optimizer variables
            sdp.broadcast_variables(model.variables, root_rank=0)
            sdp.broadcast_variables(optimizer.variables(), root_rank=0)

        # SMDataParallel: all_reduce call
        loss_value = sdp.oob_allreduce(loss_value)  # Average the loss across workers
        acc_value = sdp.oob_allreduce(acc_value)
 
        return loss_value, acc_value
    
    for epoch in range(args.epochs):
        for batch, (x_train, y_train) in enumerate(train_dataset.take(len(train_dataset)//sdp.size())):
            is_first_batch = (epoch == 0) and (batch == 0)
            loss_value, acc_value = training_step(x_train, y_train, is_first_batch)

            if batch % 10 == 0 and sdp.rank() == 0:
                print('Epoch #%d, Step #%d\tLoss: %.6f, Acc: %.6f (batch_size=%d)' % (epoch, batch, loss_value, acc_value, len(y_train)))

    # SMDataParallel: Save checkpoints only from master node.
    if sdp.rank() == 0:
        model.save(os.path.join(args.model_dir, '1'))
    

if __name__ == "__main__":

    args, _ = parse_args()

    # initialize sagemaker data parallel (dist)
    sdp.init()

    # ping each GPU to a single smdistributed.dataparallel process with local_rank
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
        
    # scale the learning rate by number of workers
    print('sdp.size() = %s' % sdp.size())
    args.learning_rate = args.learning_rate * sdp.size()
    
    train_dataset = get_train_data(args.train, args.batch_size)

    train(train_dataset, args)

In [None]:
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from botocore.exceptions import ClientError

experiment_name = 'imdb-sentiment-analysis'

try:
    experiment = Experiment.create(
        experiment_name=experiment_name, 
        description='Training a sentiment classification model using imdb dataset.')
except ClientError as e:
    print(f'{experiment_name} experiment already exists! Reusing the existing experiment.')

In [None]:
from sagemaker.tensorflow import TensorFlow
from time import gmtime, strftime
import time

exp_datetime = strftime('%Y-%m-%d-%H-%M-%S', gmtime())
jobname = f'imdb-smdp-tf-{exp_datetime}'

s3_output_location = f's3://{bucket}/{prefix}/{jobname}'
code_dir = f's3://{bucket}/{prefix}/{jobname}'

# SMDP supports ml.p4d.24xlarge, ml.p3dn.24xlarge, and ml.p3.16xlarge
train_instance_type = 'ml.p3.16xlarge'
hyperparameters = {'epochs': 30, 'batch_size': 512, 
                   'learning_rate': 0.001, 'drop_out_rate': 0.2}

distribution = {'smdistributed': {'dataparallel': {'enabled': True}}}

estimator = TensorFlow(source_dir='code',
                       entry_point='smdp_tensorflow_sentiment.py',
                       output_path=s3_output_location,
                       code_location=code_dir,
                       instance_type=train_instance_type,
                       instance_count=1,
                       enable_sagemaker_metrics=True,
                       hyperparameters=hyperparameters,
                       sagemaker_session=sess,
                       role=role,
                       framework_version='2.4',
                       py_version='py37', 
                       distribution=distribution)

data_channels = {'train':train_s3, 'test': test_s3}
print(data_channels)

In [None]:
# Creating a new trial for the experiment
exp_trial = Trial.create(experiment_name=experiment_name, 
                         trial_name=jobname)

experiment_config={'ExperimentName': experiment_name,
                   'TrialName': exp_trial.trial_name,
                   'TrialComponentDisplayName': 'Training-smdb'}

estimator.fit(inputs=data_channels,
              job_name=jobname,
              experiment_config=experiment_config,
              wait=True)

Monitoring model training and compute resources with SageMaker Debugger

In [None]:
"""
SageMaker Debugger helps developers monitor the compute resource utilization, 
detect modeling-related issues, profile deep learning operations, 
and identify bottlenecks during the runtime of your training jobs
"""

spot_training_checkpointing

In [81]:
import sagemaker
import json
import boto3

role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()
prefix = 'sagemaker-spottraining-checkpoint'

In [82]:
import numpy as np
import os
from time import gmtime, strftime
import time
import uuid

In [83]:
max_features = 20000
maxlen = 400

data_dir = os.path.join(os.getcwd(), 'imdb_data')
train_dir = os.path.join(data_dir, 'train')
train_file = os.path.join(train_dir, 'x_train.npy')
test_dir = os.path.join(data_dir, 'test')
test_file = os.path.join(test_dir, 'x_test.npy')

if not (os.path.isfile(train_file) and os.path.isfile(test_file)):
    print('Data not available locally. Creating...')
    import tensorflow as tf
    from tensorflow.keras.preprocessing import sequence
    from tensorflow.python.keras.datasets import imdb
    
    (x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
    print(len(x_train), 'train sequences')
    print(len(x_test), 'test sequences')

    x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
    x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
    print('x_train shape:', x_train.shape)
    print('x_test shape:', x_test.shape)
    
    os.makedirs(data_dir, exist_ok=True)
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)
    
    np.save(os.path.join(train_dir, 'x_train.npy'), x_train)
    np.save(os.path.join(train_dir, 'y_train.npy'), y_train)
    np.save(os.path.join(test_dir, 'x_test.npy'), x_test)
    np.save(os.path.join(test_dir, 'y_test.npy'), y_test)
else:
    print('Data available locally.')

Data available locally.


In [84]:
traindata_s3_prefix = f'{prefix}/imdb_data/train'
testdata_s3_prefix = f'{prefix}/imdb_data/test'

train_s3 = sess.upload_data(path='./imdb_data/train/', key_prefix=traindata_s3_prefix)
test_s3 = sess.upload_data(path='./imdb_data/test/', key_prefix=testdata_s3_prefix)

In [86]:
!mkdir code

mkdir: cannot create directory ‘code’: File exists


In [None]:
%%writefile code/tensorflow_sentiment_with_checkpoint.py
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import argparse
import codecs
import json
import numpy as np
import os
import re
import tensorflow as tf

max_features = 20000
maxlen = 400
embedding_dims = 300
filters = 256
kernel_size = 3
hidden_dims = 256

def parse_args():
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script
    parser.add_argument('--epochs', type=int, default=1)
    parser.add_argument('--batch_size', type=int, default=64)
    parser.add_argument('--learning_rate', type=float, default=0.01)
    parser.add_argument('--drop_out_rate', type=float, default=0.2)
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--test', type=str, default=os.environ.get('SM_CHANNEL_TEST'))
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--checkpoint_dir', type=str, default='/opt/ml/checkpoints', 
                        help='Path where checkpoints will be saved.')

    return parser.parse_known_args()


def save_history(path, history):
    history_for_json = {}
    # transform float values that aren't json-serializable
    for key in list(history.history.keys()):
        if type(history.history[key]) == np.ndarray:
            history_for_json[key] == history.history[key].tolist()
        elif type(history.history[key]) == list:
            if type(history.history[key][0]) == np.float32 or type(history.history[key][0]) == np.float64:
                history_for_json[key] = list(map(float, history.history[key]))

    with codecs.open(path, 'w', encoding='utf-8') as f:
        json.dump(history_for_json, f, separators=(',', ':'), sort_keys=True, indent=4) 


def get_train_data(train_dir):
    x_train = np.load(os.path.join(train_dir, 'x_train.npy'))
    y_train = np.load(os.path.join(train_dir, 'y_train.npy'))
    print(f'x train {x_train.shape} y train {y_train.shape}')

    return x_train, y_train


def get_test_data(test_dir):
    x_test = np.load(os.path.join(test_dir, 'x_test.npy'))
    y_test = np.load(os.path.join(test_dir, 'y_test.npy'))
    print(f'x test {x_test.shape} y test {y_test.shape}')

    return x_test, y_test


def get_model(args):
    embedding_layer = tf.keras.layers.Embedding(max_features,
                                                embedding_dims,
                                                input_length=maxlen)

    sequence_input = tf.keras.Input(shape=(maxlen,), dtype='int32')
    embedded_sequences = embedding_layer(sequence_input)
    x = tf.keras.layers.Dropout(args.drop_out_rate)(embedded_sequences)
    x = tf.keras.layers.Conv1D(filters, kernel_size, padding='valid', activation='relu', strides=1)(x)
    x = tf.keras.layers.MaxPooling1D()(x)
    x = tf.keras.layers.GlobalMaxPooling1D()(x)
    x = tf.keras.layers.Dense(hidden_dims, activation='relu')(x)
    x = tf.keras.layers.Dropout(args.drop_out_rate)(x)
    preds = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    model = tf.keras.Model(sequence_input, preds)
    optimizer = tf.keras.optimizers.Adam(args.learning_rate)
    model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    
    return model


def load_model_from_checkpoints(checkpoint_dir):
    checkpoint_files = [file for file in os.listdir(checkpoint_dir) if file.endswith('.' + 'h5')]
    print('------------------------------------------------------')
    print(f'Available checkpoint files: {checkpoint_files}')
    epoch_numbers = [re.search('(\.*([1-9]|[1-9][0-9]|[1-9][0-9][0-9]))(?=\.)', file).group() 
                     for file in checkpoint_files]
      
    max_epoch_number = max(epoch_numbers)
    max_epoch_index = epoch_numbers.index(max_epoch_number)
    max_epoch_filename = checkpoint_files[max_epoch_index]

    print(f'Latest epoch checkpoint file name: {max_epoch_filename}')
    print('Resuming training from epoch: {}'.format(int(max_epoch_number)+1))
    print('------------------------------------------------------')
    
    resumed_model_from_checkpoints = tf.keras.models.load_model(f'{checkpoint_dir}/{max_epoch_filename}')
    return resumed_model_from_checkpoints, int(max_epoch_number)


if __name__ == '__main__':

    args, _ = parse_args()
    print(args)

    if os.path.isdir(args.checkpoint_dir):
        print(f'Checkpointing directory {args.checkpoint_dir} exists.')
    else:
        print(f'Creating Checkpointing directory {args.checkpoint_dir}.')
        os.mkdir(args.checkpoint_dir)
        
    x_train, y_train = get_train_data(args.train)
    x_test, y_test = get_test_data(args.test)

    # Load model
    if not os.listdir(args.checkpoint_dir):
        model = get_model(args)
        initial_epoch_number = 0
    else:    
        model, initial_epoch_number = load_model_from_checkpoints(args.checkpoint_dir)

    callbacks = [tf.keras.callbacks.ModelCheckpoint(args.checkpoint_dir + '/checkpoint-{epoch}.h5')]
    
    history = model.fit(x_train, y_train,
                        batch_size=args.batch_size,
                        epochs=args.epochs,
                        initial_epoch=initial_epoch_number,
                        validation_data=(x_test, y_test),
                        callbacks=callbacks)

    save_history(args.model_dir + '/history.p', history)
    
    # create a TensorFlow SavedModel for deployment to a SageMaker endpoint with TensorFlow Serving
    model.save(args.model_dir + '/1')

In [87]:
from sagemaker.tensorflow import TensorFlow

exp_datetime = strftime('%Y-%m-%d-%H-%M-%S', gmtime())
jobname = f'imdb-tf-spot-{exp_datetime}'

s3_output_location = f's3://{bucket}/{prefix}'
code_dir = f's3://{bucket}/{prefix}'

train_instance_type = 'ml.c2.xlarge'
hyperparameters = {'epochs': 20, 'batch_size': 256, 'learning_rate': 0.01, 'drop_out_rate': 0.2}

use_spot_instances = True
max_run = 3600
max_wait = 3600

checkpoint_suffix = str(uuid.uuid4())[:8]
checkpoint_suffix = '02fa28a1'
checkpoint_s3_uri = f's3://{bucket}/{prefix}/checkpoint-{checkpoint_suffix}'
checkpoint_local_path = '/opt/ml/checkpoints/'
model_local_path = '/opt/ml/model'

estimator = TensorFlow(source_dir='code',
                       entry_point='tensorflow_sentiment_with_checkpoint.py',
                       output_path=s3_output_location,
                       model_dir=model_local_path,
                       code_location=code_dir,
                       instance_type=train_instance_type,
                       instance_count=1,
                       enable_sagemaker_metrics=True,
                       hyperparameters=hyperparameters,
                       role=role,
                       framework_version='2.1',
                       py_version='py3',
                       use_spot_instances=use_spot_instances,
                       checkpoint_s3_uri=checkpoint_s3_uri,
                       max_run=max_run,
                       max_wait=max_wait,
                       debugger_hook_config=False)

data_channels = {'train':train_s3, 'test': test_s3}
print(data_channels)

{'train': 's3://sagemaker-us-east-1-104877823522/sagemaker-spottraining-checkpoint/imdb_data/train', 'test': 's3://sagemaker-us-east-1-104877823522/sagemaker-spottraining-checkpoint/imdb_data/test'}


In [None]:
# Creating a new trial for the experiment
exp_trial = Trial.create(experiment_name=experiment_name, 
                         trial_name=jobname)

experiment_config={'ExperimentName': experiment_name,
                   'TrialName': exp_trial.trial_name,
                   'TrialComponentDisplayName': 'Training'}

estimator.fit(inputs=data_channels,
              job_name=jobname,
              experiment_config=experiment_config,
              wait=True)

Model Monitoring

In [2]:
!pip install --upgrade pip 
!pip install -q sagemaker-experiments

Collecting pip
  Using cached pip-22.0.4-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 21.1.2
    Uninstalling pip-21.1.2:
      Successfully uninstalled pip-21.1.2
Successfully installed pip-22.0.4
[0m

In [3]:
import sagemaker
import boto3

role = sagemaker.get_execution_role()
sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()
local_prefix = 'monitoring'
prefix = f'sagemaker-monitoring/{local_prefix}'


In [4]:
from datetime import datetime, timedelta, timezone
import json, os, re, uuid
from time import sleep, gmtime, strftime
from threading import Thread

import pandas as pd
import numpy as np

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from botocore.exceptions import ClientError

from sagemaker import image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.predictor import Predictor
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer

In [5]:
# column names taken from https://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.names
columns = ['Sex', 'Length', 'Diameter', 'Height', 'WholeWeight', 
           'ShuckedWeight', 'VisceraWeight', 'ShellWeight', 'Rings']
df=pd.read_csv('https://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.data', 
               names=columns)

In [6]:
df_processed = df.copy()
# Convert Rings to float so that model prediction (regression) and 
# the ground truth are both of float type for model monitor to work with
df_processed['Rings']=df_processed['Rings'].astype(float)
df_processed['Sex'] = df_processed['Sex'].replace(to_replace=['M', 'F', 'I'], 
                                                  value=[2., 1., 0.])
# moving the target Rings to the first so that we can train with XGBoost.
columns=['Rings', 'Sex', 'Length', 'Diameter', 'Height', 'WholeWeight', 
         'ShuckedWeight', 'VisceraWeight', 'ShellWeight']
df_processed = df_processed[columns]

In [7]:
from sklearn.model_selection import train_test_split
df_build, df_test = train_test_split(df_processed, test_size=0.1, random_state=42, 
                                     shuffle=True, stratify=df_processed['Sex'])
df_train, df_val = train_test_split(df_build, test_size=1/9., random_state=42, 
                                    shuffle=True, stratify=df_build['Sex'])

In [8]:
columns_no_target = ['Sex', 'Length', 'Diameter', 'Height', 'WholeWeight', 
                     'ShuckedWeight', 'VisceraWeight', 'ShellWeight']

In [9]:
os.makedirs(local_prefix, exist_ok=True)
df_train.to_csv(f'./{local_prefix}/abalone_train.csv', index=False)
df_val.to_csv(f'./{local_prefix}/abalone_val.csv', index=False)
df_test.to_csv(f'./{local_prefix}/abalone_test.csv', index=False)

desired_s3_uri = f's3://{bucket}/{prefix}/data'
train_data_s3 = sagemaker.s3.S3Uploader.upload(local_path=f'./{local_prefix}/abalone_train.csv',
                                               desired_s3_uri=desired_s3_uri,
                                               sagemaker_session=sess)
val_data_s3 = sagemaker.s3.S3Uploader.upload(local_path=f'./{local_prefix}/abalone_val.csv',
                                             desired_s3_uri=desired_s3_uri,
                                             sagemaker_session=sess)
test_data_s3 = sagemaker.s3.S3Uploader.upload(local_path=f'./{local_prefix}/abalone_test.csv',
                                              desired_s3_uri=desired_s3_uri,
                                              sagemaker_session=sess)

In [10]:
image = image_uris.retrieve(region=region, framework='xgboost', version='1.3-1')

exp_datetime = strftime('%Y-%m-%d-%H-%M-%S', gmtime())
jobname = f'abalone-xgb-{exp_datetime}'

experiment_name = 'abalone-age-prediction'

try:
    experiment = Experiment.create(
        experiment_name=experiment_name, 
        description='Predicting age for abalone based on physical measurements.')
except ClientError as e:
    print(f'{experiment_name} experiment already exists! Reusing the existing experiment.')
    
# Creating a new trial for the experiment
exp_trial = Trial.create(experiment_name=experiment_name, 
                         trial_name=jobname)

experiment_config={'ExperimentName': experiment_name,
                   'TrialName': exp_trial.trial_name,
                   'TrialComponentDisplayName': 'Training'}

train_s3_output = f's3://{bucket}/{prefix}/abalone_data/training'

xgb = sagemaker.estimator.Estimator(image,
                                    role,
                                    instance_type='ml.m5.xlarge',
                                    instance_count=1,
                                    output_path=train_s3_output,
                                    enable_sagemaker_metrics=True,
                                    sagemaker_session=sess)

xgb.set_hyperparameters(objective='reg:squarederror', num_round=20)

train_input = sagemaker.inputs.TrainingInput(s3_data=train_data_s3, 
                                             content_type='csv')
val_input = sagemaker.inputs.TrainingInput(s3_data=val_data_s3, 
                                           content_type='csv')
data_channels={'train': train_input, 'validation': val_input}

xgb.fit(inputs=data_channels, 
        job_name=jobname, 
        experiment_config=experiment_config, 
        wait=True)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating training-job with name: abalone-xgb-2022-05-03-23-40-37


2022-05-03 23:40:37 Starting - Starting the training job...
2022-05-03 23:40:55 Starting - Preparing the instances for trainingProfilerReport-1651621237: InProgress
......
2022-05-03 23:42:05 Downloading - Downloading input data...
2022-05-03 23:42:30 Training - Downloading the training image.....
2022-05-03 23:43:37 Uploading - Uploading generated training model
2022-05-03 23:43:37 Completed - Training job completed
[34m[2022-05-03 23:43:19.847 ip-10-2-233-184.ec2.internal:1 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2022-05-03:23:43:19:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2022-05-03:23:43:19:INFO] Failed to parse hyperparameter objective value reg:squarederror to Json.[0m
[34mReturning the value itself[0m
[34m[2022-05-03:23:43:19:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2022-05-03:23:43:19:INFO] Running XGBoost Sagemaker in algorithm mode[0m
[34m[2022-05-03:23:43:19:INFO] Determined delimiter of CSV 

In [11]:
##S3 prefixes
data_capture_prefix = f'{prefix}/datacapture'
s3_capture_upload_path = f's3://{bucket}/{data_capture_prefix}'

ground_truth_upload_path = f's3://{bucket}/{prefix}/ground-truth-data/{exp_datetime}'

reports_prefix = f'{prefix}/reports'
s3_report_path = f's3://{bucket}/{reports_prefix}'

print(f'Capture path: {s3_capture_upload_path}')
print(f'Ground truth path: {ground_truth_upload_path}')
print(f'Report path: {s3_report_path}')

Capture path: s3://sagemaker-us-east-1-104877823522/sagemaker-monitoring/monitoring/datacapture
Ground truth path: s3://sagemaker-us-east-1-104877823522/sagemaker-monitoring/monitoring/ground-truth-data/2022-05-03-23-40-37
Report path: s3://sagemaker-us-east-1-104877823522/sagemaker-monitoring/monitoring/reports


In [12]:
from sagemaker.model_monitor import DataCaptureConfig

data_capture_config = DataCaptureConfig(enable_capture=True, 
                                        sampling_percentage=100, 
                                        destination_s3_uri=s3_capture_upload_path)

In [13]:
endpoint_name = f'abalone-xgb-{exp_datetime}'
print(f'EndpointName: {endpoint_name}')

predictor = xgb.deploy(initial_instance_count=1,
                       instance_type='ml.m5.large',
                       endpoint_name=endpoint_name,
                       serializer=CSVSerializer(),
                       data_capture_config=data_capture_config)

INFO:sagemaker:Creating model with name: sagemaker-xgboost-2022-05-03-23-51-53-287


EndpointName: abalone-xgb-2022-05-03-23-40-37


INFO:sagemaker:Creating endpoint with name abalone-xgb-2022-05-03-23-40-37


-------------!

In [15]:
predictor_np = Predictor(endpoint_name=endpoint_name, 
                         sagemaker_session=sess,
                         serializer=CSVSerializer(),
                         deserializer=CSVDeserializer())

In [16]:
pred=predictor_np.predict(df_val[columns_no_target].values)

In [17]:
pred_f = [float(i) for i in pred[0]]

In [32]:
df_val
#pred_f
#df_val



Unnamed: 0,Rings,Sex,Length,Diameter,Height,WholeWeight,ShuckedWeight,VisceraWeight,ShellWeight
2779,9.0,2.0,0.590,0.470,0.145,0.9235,0.4545,0.1730,0.254
429,18.0,1.0,0.575,0.450,0.170,1.0475,0.3775,0.1705,0.385
2171,6.0,0.0,0.190,0.130,0.030,0.0295,0.0155,0.0150,0.010
3449,7.0,2.0,0.520,0.395,0.125,0.8115,0.4035,0.1660,0.200
196,11.0,1.0,0.505,0.410,0.150,0.6440,0.2850,0.1450,0.210
...,...,...,...,...,...,...,...,...,...
2161,17.0,1.0,0.715,0.565,0.240,2.1995,0.7245,0.4650,0.885
4084,10.0,1.0,0.575,0.480,0.170,1.1000,0.5060,0.2485,0.310
1353,11.0,2.0,0.600,0.480,0.155,1.0140,0.4510,0.1885,0.325
2121,9.0,0.0,0.475,0.360,0.110,0.4555,0.1770,0.0965,0.145
