In [7]:
from sagemaker.sklearn.estimator import SKLearn
from sagemaker import get_execution_role
from time import gmtime, strftime
import pandas as pd
import numpy as np
import sagemaker
import boto3
sm_boto3 = boto3.client('sagemaker')

role = get_execution_role()
sess = sagemaker.Session()
account = sess.boto_session.client('sts').get_caller_identity()['Account']
region = sess.boto_session.region_name
#bucket = sess.default_bucket()


def create_bucket(bucket_name, region=None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """

    # Create bucket
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True





ENDPOINT_NAME = "text-encoder"
WORKFLOW_DATE_TIME = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
BUCKET = "{}-{}".format(ENDPOINT_NAME, account)

create_bucket(bucket_name=BUCKET, region=None)


print("BUCKET: ", BUCKET)

BUCKET:  text-encoder-227921966468


### Create Entry-Point Script

In [8]:
%%writefile text_encoder.py
import tarfile
import boto3
import argparse
import numpy as np
import pandas as pd
import os
from sklearn.externals import joblib
from sklearn.neighbors import NearestNeighbors
import subprocess
import sys

from sagemaker_containers.beta.framework import worker, encoders
#from six import BytesIO
from io import StringIO

def pip_upgrade(package):
    subprocess.call([sys.executable, "-m", "pip", "install", "--upgrade", package])

def pip_install(package):
    subprocess.call([sys.executable, "-m", "pip", "install", "--quiet", package])

print('********** Pip Install TF-Hub **********')
pip_upgrade('setuptools')
pip_install('tensorflow>=1.7')
pip_install('tensorflow_hub')
pip_install('sentencepiece')

import tensorflow as tf
import tensorflow_hub as hub
import sentencepiece as spm

import glob
import hashlib
print('********** Copying BERT-Lite **********')
module_url = "https://tfhub.dev/google/universal-sentence-encoder-lite/2"
os.environ['TFHUB_CACHE_DIR'] = "/opt/ml/bert_lite_module_cache"

# Reduce logging output.
#tf.logging.set_verbosity(tf.logging.ERROR)

def process_to_IDs_in_sparse_format(sp, sentences):
    """An utility method that processes sentences with the
       sentence piece processor 'sp' and returns the results in 
       tf.SparseTensor-similar format: (values, indices, dense_shape)
    """
    ids = [sp.EncodeAsIds(x) for x in sentences]
    max_len = max(len(x) for x in ids)
    dense_shape=(len(ids), max_len)
    values=[item for sublist in ids for item in sublist]
    indices=[[row,col] for row in range(len(ids)) for col in range(len(ids[row]))]
    return (values, indices, dense_shape)



g = tf.Graph()
with g.as_default():
    module = hub.Module(module_url)
    input_placeholder = tf.compat.v1.sparse_placeholder(tf.int64, shape=[None, None])
    encodings = module(inputs=dict(values=input_placeholder.values,
                                   indices=input_placeholder.indices,
                                   dense_shape=input_placeholder.dense_shape
                                  )
                      )
    
    # Load the SentencePiece model.
    # This model is conveniently stored inside the module's assets.
    # It has to be loaded in order to initialize the processor.
    with tf.compat.v1.Session() as sess:
        spm_path = sess.run(module(signature="spm_path"))
        sp = spm.SentencePieceProcessor()
        sp.Load(spm_path)
        print("SentencePiece model loaded at {}.".format(spm_path))

    init_op = tf.group([tf.compat.v1.global_variables_initializer(), tf.compat.v1.tables_initializer()])
g.finalize()


GRAPH_PB_PATH = ('/opt/ml/bert_lite_module_cache/' + 
             hashlib.sha1(module_url.encode("utf8")).hexdigest())

print('********** Encoder saved at: **********')
print(GRAPH_PB_PATH)
print('********** Listing BERT-Lite artifacts on host **********')
encoder_files = glob.glob(GRAPH_PB_PATH+"/*")
print(encoder_files)
    

def model_fn(model_dir):
    """A modified model_fn method that returns a TF session of a tf_hub sentence 
       encoder module along with the sagemaker trained model
    """
    
    g = tf.Graph()
    with g.as_default():
        module = hub.Module(GRAPH_PB_PATH)
        input_placeholder = tf.compat.v1.sparse_placeholder(tf.int64, shape=[None, None])
        encodings = module(inputs=dict(values=input_placeholder.values,
                                       indices=input_placeholder.indices,
                                       dense_shape=input_placeholder.dense_shape
                                      )
                          )

        # Load the SentencePiece model.
        # This model is conveniently stored inside the module's assets.
        # It has to be loaded in order to initialize the processor.
        with tf.compat.v1.Session() as sess:
            spm_path = sess.run(module(signature="spm_path"))
            sp = spm.SentencePieceProcessor()
            sp.Load(spm_path)
            print("SentencePiece model loaded at {}.".format(spm_path))

        init_op = tf.group([tf.compat.v1.global_variables_initializer(), tf.compat.v1.tables_initializer()])
    g.finalize()

    # Create session and initialize.
    session = tf.compat.v1.Session(graph=g)
    session.run(init_op)
    
    return input_placeholder, encodings, sp, session


def input_fn(input_data, content_type):
    """This function deserializes the input data into an object
       that is passed into the prediction_fn function.
       It currently only takes csv input.
    """
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        deserialized_input_data = pd.read_csv(StringIO(input_data), 
                                              header=None
                                             )
        deserialized_input_data.columns = ['summary']

        return deserialized_input_data
    else:
        raise ValueError("{} not supported by script!".format(content_type))


def predict_fn(input_data, model):
    """This function takes the output of the input_fn function
       and passes it into the loaded model.
    """
    # Parse artifacts
    input_placeholder, encodings, sp, session = model

    # Encode concept using tfhub sentence encoder
    values, indices, dense_shape = process_to_IDs_in_sparse_format(sp, list(input_data.summary))
    
    encoded_concept =  session.run(encodings,
                                   feed_dict={input_placeholder.values: values,
                                              input_placeholder.indices: indices,
                                              input_placeholder.dense_shape: dense_shape
                                             }
                                  )
    encoded_concept_reshaped = encoded_concept.reshape(1, -1)


    
    return encoded_concept_reshaped


def _npy_dumps(data):
    """Serializes a numpy array into a stream of npy-formatted bytes."""
    buffer = BytesIO()
    np.save(buffer, data)
    return buffer.getvalue()


def output_fn(prediction_output, accept):
    if accept == 'application/x-npy':
        print('output_fn input is', prediction_output, 'in format', accept)
        return _npy_dumps(prediction_output), 'application/x-npy'
    elif accept == 'application/json':
        print('output_fn input is', prediction_output, 'in format', accept)
        return worker.Response(encoders.encode(prediction_output, accept), accept, mimetype=accept)
    else:
        raise ValueError('Accept header must be application/x-npy or application/json, but it is {}'.format(accept))


if __name__ == '__main__':
    print('*********No training needed, we will simply pull the model from tfhub at deployment***********')

Overwriting text_encoder.py


### Run the entry-point training script

In [9]:
knn_estimator = SKLearn(entry_point='text_encoder.py',
                        role=role,
                        instance_count=1,
                        instance_type="ml.c5.9xlarge",
                        framework_version='0.20.0',
                        output_path = 's3://{}'.format(BUCKET)
                       )

In [10]:
job_name = "{}-{}".format(ENDPOINT_NAME, WORKFLOW_DATE_TIME)
knn_estimator.fit(job_name=job_name)

2021-01-26 19:18:53 Starting - Starting the training job...
2021-01-26 19:19:16 Starting - Launching requested ML instancesProfilerReport-1611688732: InProgress
.........
2021-01-26 19:20:37 Starting - Preparing the instances for training...
2021-01-26 19:21:17 Downloading - Downloading input data
2021-01-26 19:21:17 Training - Downloading the training image.....[34m2021-01-26 19:21:58,309 sagemaker-training-toolkit INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2021-01-26 19:21:58,311 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-01-26 19:21:58,319 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2021-01-26 19:21:58,531 botocore.utils INFO     IMDS ENDPOINT: http://169.254.169.254/[0m
[34m2021-01-26 19:21:58,710 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2021-01-26 19:22:04,996 sagemaker-training-toolkit INFO     No GPUs

### Note: here we have to manually upload a .tar.gz file to take the role of model object... we will upload an empty file since the endpoint will be using a pre-trained model from tfhub 

In [11]:
artifact = sm_boto3.describe_training_job(
    TrainingJobName=knn_estimator.latest_training_job.name
)['ModelArtifacts']['S3ModelArtifacts']

print('Model artifact is expected at ' + artifact)

Model artifact is expected at s3://text-encoder-227921966468/text-encoder-2021-01-26-19-18-48/output/model.tar.gz


In [14]:
import tarfile
source = 'model.tar.gz'
s3_path = "{}/output/model.tar.gz".format(job_name)
tar = tarfile.open(source, 'w:gz')
#tar.add (<model binaries>)
tar.close()
s3 = boto3.client('s3')
s3.upload_file(source, BUCKET, s3_path)

### Create endpoint

In [None]:
predictor = knn_estimator.deploy(initial_instance_count=1,
                                 instance_type='ml.c5.9xlarge',
                                 endpoint_name=ENDPOINT_NAME
                                )

----

### Query Endpoint

In [None]:
import json
import boto3
import pandas as pd

runtime = boto3.client('sagemaker-runtime')

In [None]:
payload = pd.DataFrame(["When CIA analyst Jack Ryan stumbles upon a suspicious series of bank transfers his search for answers pulls him from the safety of his desk job and catapults him into a deadly game of cat"])

serialized_payload = payload.to_csv(header=False, index=False).encode('utf-8')

serialized_payload

In [None]:
response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME,
                                   Body=serialized_payload,
                                   ContentType='text/csv'
                                  )

In [None]:
response_string = response['Body'].read()#.decode('utf-8')

response_dict = json.loads(response_string)
response_dict

In [None]:
len(response_dict[0])