In [None]:
%%sh
pip install -q sagemaker --upgrade

In [None]:
import json
import time
from time import gmtime, strftime

In [None]:
import sagemaker

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.inputs import TrainingInput, CreateModelInput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.step_collections import RegisterModel

from sagemaker import image_uris
from sagemaker.estimator import Estimator

print(sagemaker.__version__)

session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = session.default_bucket()

## Upload customer reviews to one of our S3 buckets

In [None]:
%%sh
# https://s3.amazonaws.com/amazon-reviews-pds/readme.html
aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Camera_v1_00.tsv.gz /tmp

In [None]:
prefix = 'amazon-reviews-camera'

input_data_uri = session.upload_data(
    path='/tmp/amazon_reviews_us_Camera_v1_00.tsv.gz', 
    key_prefix=prefix)

## Define workflow parameters

In [None]:
region = ParameterString(
    name='Region',
    default_value='eu-west-1'
)

processing_instance_count = ParameterInteger(
    name='ProcessingInstanceCount',
    default_value=1
)

processing_instance_type = ParameterString(
    name='ProcessingInstanceType',
    default_value='ml.m5.4xlarge'
)

training_instance_type = ParameterString(
    name='TrainingInstanceType',
    default_value='ml.p3.2xlarge'
)

training_instance_count = ParameterInteger(
    name='TrainingInstanceCount',
    default_value=1
)

model_approval_status = ParameterString(
    name='ModelApprovalStatus',
    default_value='PendingManualApproval'
)

input_data = ParameterString(
    name='InputData'
)

model_name = ParameterString(
    name='ModelName'
)

## Define the preprocessing step

In [None]:
sklearn_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count)

In [None]:
step_process = ProcessingStep(
    name='process-customer-reviews',
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input")  
    ],
    outputs=[
        ProcessingOutput(
                output_name='bt_data',
                source='/opt/ml/processing/output/bt'),
        ProcessingOutput(
                output_name='fs_data',
                source='/opt/ml/processing/output/fs')
    ],
    code='preprocessing.py',
    job_arguments=[
               '--filename', 'amazon_reviews_us_Camera_v1_00.tsv.gz',
               #'--num-reviews', '10000',
               '--library', 'spacy'  # 'spacy' or 'nltk'
    ]
)

## Define the feature ingestion step

In [None]:
feature_group_name = 'amazon-reviews-feature-group-' + strftime('%d-%H-%M-%S', gmtime())

In [None]:
step_ingest = ProcessingStep(
    name='ingest-customer-reviews',
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(
          source=step_process.properties.ProcessingOutputConfig.Outputs['fs_data'].S3Output.S3Uri, 
          destination="/opt/ml/processing/input"),
     ],
    # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html
    outputs = [
      ProcessingOutput(
                output_name='feature_group_name',
                source='/opt/ml/processing/output/')
    ],
    code='ingesting.py',
    job_arguments=[
               '--region', region,
               '--bucket', bucket,  # For offline store
               '--role', role,
               '--feature-group-name', feature_group_name,
               '--max-workers', '32'
    ]
)

## Define the dataset step

In [None]:
step_build_dataset = ProcessingStep(
    name='build-dataset',
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(
          source=step_ingest.properties.ProcessingOutputConfig.Outputs['feature_group_name'].S3Output.S3Uri, 
          destination='/opt/ml/processing/input'),  
    ],
    outputs=[
        ProcessingOutput(
                output_name='training',
                source='/opt/ml/processing/output/training'
        ),
        ProcessingOutput(
                output_name='validation',
                source='/opt/ml/processing/output/validation'),
    ],
    code='querying.py',
    job_arguments=[
               '--region', region,
               '--bucket', bucket,  # For query results
    ]
)

## Define the training step

In [None]:
container = image_uris.retrieve('blazingtext', str(region))     # region is a ParameterString...

prefix = 'blazing-text-amazon-reviews'
s3_output = 's3://{}/{}/output/'.format(bucket, prefix)

bt = Estimator(container,
               role,
               instance_count=training_instance_count, 
               instance_type=training_instance_type,
               output_path=s3_output
)

bt.set_hyperparameters(mode='supervised')

In [None]:
step_train = TrainingStep(
    name='train-blazing-text',
    estimator=bt,
    inputs={
        'train': TrainingInput(
            s3_data=step_build_dataset.properties.ProcessingOutputConfig.Outputs['training'].S3Output.S3Uri,
            content_type='text/plain'
        ),
        'validation': TrainingInput(
            s3_data=step_build_dataset.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri,
            content_type='text/plain'
        )
    }
)

## Define the model creation step

In [None]:
from sagemaker.model import Model

model = Model(
    image_uri=container,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=session,
    name=model_name,
    role=role
)

step_create_model = CreateModelStep(
    name='create-model',
    model=model,
    inputs=None
)

## Define the registration step

In [None]:
step_register = RegisterModel(
    name='register-model',
    estimator=bt,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=['text/plain'],
    response_types=['application/json'],
    inference_instances=['ml.t2.medium'],
    transform_instances=['ml.m5.xlarge'],
    model_package_group_name='blazing-text-on-amazon-customer-reviews-package',
    approval_status=model_approval_status
)

## Assemble the pipeline

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = 'blazing-text-amazon-customer-reviews'

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        region,
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        training_instance_count,
        model_approval_status,
        input_data,
        model_name
    ],
    steps=[step_process, step_ingest, step_build_dataset, step_train, step_create_model, step_register]
)

## Run pipeline

In [None]:
pipeline.upsert(role_arn=role)

execution = pipeline.start(
    parameters=dict(
        InputData=input_data_uri,
        ModelName='blazing-text-amazon-reviews'
    )
)

In [None]:
execution.list_steps()

In [None]:
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(session)
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

## Deploy model

In [None]:
# This info is available in the model registry

model_package_arn = 'MODEL_PACKAGE_ARN'

In [None]:
from sagemaker import ModelPackage

model = sagemaker.ModelPackage(
    role = role,
    model_package_arn = model_package_arn
)

model.deploy(
    initial_instance_count = 1, 
    instance_type = 'ml.t2.medium', 
    endpoint_name='blazing-text-on-amazon-reviews'
)

In [None]:
instances = [' I really love this camera , it takes amazing pictures . ',
            ' this camera is ok , it gets the job done . Nothing fancy . ', 
            ' Poor quality , the camera stopped working after a couple of days .']

In [None]:
from sagemaker.predictor import Predictor

bt_predictor = Predictor(
    endpoint_name='blazing-text-on-amazon-reviews',
    serializer=sagemaker.serializers.JSONSerializer(),
    deserializer=sagemaker.deserializers.JSONDeserializer()
)

## Predict with model

In [None]:
import pprint

payload = {'instances': instances, 'configuration': {'k': 3}}
response = bt_predictor.predict(payload)
                                
pprint.pprint(response)

In [None]:
bt_predictor.delete_endpoint()