In [None]:
import os
import io
import json
import numpy as np
import pandas as pd
from sklearn import datasets
from sklearn.model_selection import train_test_split

import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.session import TrainingInput
from sagemaker.tensorflow import TensorFlow, TensorFlowModel

In [None]:
sagemaker_session = sagemaker.Session()
s3 = boto3.resource('s3')

role = get_execution_role()
region = sagemaker_session.boto_session.region_name

# sm_boto3 = boto3.client("sagemaker")
sess = sagemaker.Session()
bucket = sess.default_bucket()  # this could also be a hard-coded bucket name
prefix = 'frontier'
print("Using bucket " + bucket)

In [None]:
# Generate training data
X, y = datasets.make_moons(1000, noise=0.2)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)


In [None]:
np.save('./data/X_train.npy', X_train)
np.save('./data/y_train.npy', y_train)

np.save('./data/X_val.npy', X_test)
np.save('./data/y_val.npy', y_test)


training_data_uri = f"s3://{bucket}/{prefix}/input"

s3.meta.client.upload_file('./data/X_train.npy', bucket, f'{prefix}/input/X_train.npy')
s3.meta.client.upload_file('./data/y_train.npy', bucket, f'{prefix}/input/y_train.npy')
s3.meta.client.upload_file('./data/X_val.npy', bucket, f'{prefix}/input/X_val.npy')
s3.meta.client.upload_file('./data/y_val.npy', bucket, f'{prefix}/input/y_val.npy')

# Train

In [None]:
%%writefile ./src/train.py

import os
import json
import argparse
import numpy as np
import pandas as pd
import tensorflow as tf


def model(x_train, y_train, x_test, y_test):
    """Generate a simple model"""
    model = tf.keras.models.Sequential(
        [
            tf.keras.Input(shape=[x_train.shape[1]]),
            tf.keras.layers.Dense(2, activation=tf.nn.relu),
            tf.keras.layers.Dense(1, activation=tf.nn.sigmoid),
        ]
    )
    
    print(model.summary)
    opt = tf.keras.optimizers.Adam(learning_rate=0.001)
    model.compile(optimizer=opt, loss="binary_crossentropy", metrics=["accuracy"])
    model.fit(x_train, y_train)
    model.evaluate(x_test, y_test)

    return model

def _load_training_data(base_dir):
    """Load training data"""
    x_train = np.load(os.path.join(base_dir, "X_train.npy"))
    y_train = np.load(os.path.join(base_dir, "y_train.npy"))
    return x_train, y_train


def _load_validation_data(base_dir):
    """Load testing data"""
    x_test = np.load(os.path.join(base_dir, "X_val.npy"))
    y_test = np.load(os.path.join(base_dir, "y_val.npy"))
    return x_test, y_test

def _parse_args():
    parser = argparse.ArgumentParser()

    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default this is a S3 path under the default bucket.
    parser.add_argument("--model_dir", type=str)
    parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAINING"))

    return parser.parse_known_args()

if __name__ == "__main__":
    args, unknown = _parse_args()
    
    train_data, train_labels = _load_training_data(args.train)
    eval_data, eval_labels = _load_validation_data(args.train)

    clf = model(train_data, train_labels, eval_data, eval_labels)
    clf.save(os.path.join(args.sm_model_dir, "000000001"))

In [None]:
# # test locally
# !mkdir model
# ! python src/train.py  --sm-model-dir ./model/ \
#                    --train ./data/

In [None]:
estimator = TensorFlow(
    entry_point="./src/train.py",
    role=role,
    instance_count=1,
    instance_type="ml.p3.2xlarge",
    framework_version="2.1.0",
    py_version="py3",
    output_path = f"s3://{bucket}/{prefix}/output"
)

# estimator.fit({"train": train_input, "validation": validation_input})
estimator.fit(training_data_uri)

In [None]:
estimator._current_job_name

In [None]:
sm_boto3 = boto3.client("sagemaker")

# Get information about the best training job
artifact = sm_boto3.describe_training_job(
    TrainingJobName=estimator._current_job_name)["ModelArtifacts"]["S3ModelArtifacts"]

print("Model artifact persisted at " + artifact)

# Batch inference

In [None]:
# Save test data as jsonlines
test_data = []

for i in range(X_test.shape[0]):
    data_row = {'id': int(i),
               'data': [float(x) for x in X[i].tolist()]
               }
    test_data.append(data_row)
      
with open('./data/test_data.jsonl', 'w') as f:
    for entry in test_data:
        json.dump(entry, f)
        f.write('\n')
        
s3.meta.client.upload_file('./data/test_data.jsonl', bucket, f'{prefix}/X_test.jsonl')
test_s3_uri = f"s3://{bucket}/{prefix}/X_test.jsonl"

In [None]:
artifact = "s3://sagemaker-us-east-1-367158743199/frontier/output/tensorflow-training-2021-06-29-20-30-37-585/output/model.tar.gz"
model = TensorFlowModel(model_data=artifact,
#                         entry_point='./src/inference.py',
                        role=role,
                        framework_version="2.1.0"                        
                       )

In [None]:
batch_output = f's3://{bucket}/{prefix}/transform/'
print(batch_output)

tf_transformer = model.transformer(
    instance_count=1,
    instance_type='ml.m4.xlarge',
    accept = 'application/jsonlines',
    output_path= batch_output,
#     assemble_with = 'Line'
)

tf_transformer.transform(test_s3_uri, 
                         content_type='application/jsonlines',
                         split_type='Line',
                         input_filter = "$.data",
#                          output_filter="$['id','SageMakerOutput']",
#                         join_source = "Input",
)



I am checking with service team to confirm when native support for joining sources for TF and json lines input will be added...in the meantime, you can use the following work around.


In [None]:
!mkdir ./output
!aws s3 cp {batch_output} ./output --recursive

In [None]:
with open('./output/X_test.jsonl.out') as f:
    for line in f:
        j_content = json.loads(line)

In [None]:
for i in range(len(test_data)):
    test_data[i]['prediction'] = j_content['predictions'][i][0]

___________

In [None]:
# %%writefile ./src/inference.py

# import json
# import requests

# def handler(data, context):
#     """Handle request.
#     Args:
#         data (obj): the request data
#         context (Context): an object containing request and configuration details
#     Returns:
#         (bytes, string): data to return to client, (optional) response content type
#     """
#     processed_input = _process_input(data, context)
#     response = requests.post(context.rest_uri, data=processed_input)
#     print(response.content)
#     return _process_output(response, context)


# def _process_input(data, context):
#     if context.request_content_type == 'application/jsonlines':
#         # pass through json (assumes it's correctly formed)
#         d = data.read().decode('utf-8')
# #         print(type(d), len(d), "*******", "\n", d)
#         d = [json.loads(l)['data'] for l in d.splitlines()]
#         d = '\n'.join(str(i) for i in d)
#         return d if len(d) else ''

#     raise ValueError('{{"error": "unsupported content type {}"}}'.format(
#         context.request_content_type or "unknown"))


# def _process_output(data, context):
#     if data.status_code != 200:
#         raise ValueError(data.content.decode('utf-8'))

#     response_content_type = context.accept_header
#     prediction = data.content
#     return prediction, response_content_type
