# Batch Transform on Amazon SageMaker Pipelines Integrated with PrestoDB


***This notebook works best with the `Data Science 3.0` kernel on an `ml.t3.medium` instance type***.

Run the [0_model_training_pipeline](./0_model_training_pipeline.ipynb) notebook prior to running the notebook. This notebook runs a batch transform using the model trained in the previous notebook. It does so by running the following steps:

1. Extract the latest approved model from the SageMaker model registry.

1. Read raw data for inference from PrestoDB and stores in an Amazon S3 bucket.

1. Create a SageMaker pipeline with a data processing step and a batch transform step to provide inference on the data. The inference results are also stored in S3.

In [None]:
#import sys
#!{sys.executable} -m pip install -r requirements.txt

In [None]:
## Install the necessary boto3 and sagemaker libraries to initialize session
import os
import json
import boto3
import time
import logging
import sagemaker
from pathlib import Path
import sagemaker.session
from typing import Dict, List
from datetime import datetime, timedelta
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline_context import PipelineSession
from utils import load_config, print_pipeline_execution_summary

from sagemaker.workflow.functions import Join
from sagemaker.processing import  ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.execution_variables import ExecutionVariables

from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.transformer import Transformer

In [None]:
## set the logger to track all of the logs as this pipeline runs
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

### Load the Config.yml file that contains information that is used across this pipeline

In [None]:
config = load_config('config.yml')
logger.info(json.dumps(config, indent=2))

In [None]:
## initialize the sagemaker session, region, role bucket and pipeline session
session = sagemaker.session.Session()
region = session.boto_region_name
pipeline_session = PipelineSession()

ci = boto3.client('sts').get_caller_identity()

role_name = config['aws']['sagemaker_execution_role_name']
config['aws']['sagemaker_execution_role_arn'] = config['aws']['sagemaker_execution_role_arn'].format(account_id=ci['Account'], role=role_name)
role = config['aws']['sagemaker_execution_role_arn']

bucket = config['aws']['s3_bucket'].format(account_id=ci['Account'], region=region)
prefix = config['aws']['s3_prefix']  # Prefix to S3 artifacts

logger.info(f"bucket={bucket}, prefix={prefix}, role={role}")

In [None]:
# Convert your list to a JSON string
training_features_str = json.dumps(config['training_step']['training_features'])
logger.info(f"the training features being used for this pipeline --> {training_features_str}")

# Define new pipeline parameters
host_parameter = ParameterString(name="HostParameter", default_value=config['presto']['host'])
port_parameter = ParameterString(name="PortParameter", default_value=config['presto']['parameter'])
target_parameter = ParameterString(name="Target", default_value=config['training_step']['training_target'])
feature_parameter = ParameterString(name="Feature", default_value=training_features_str)

## presto credential key and region pipeline parameters
presto_parameter = ParameterString(name="PrestoParameter", default_value=config['presto']['presto_credentials'])
region_parameter = ParameterString(name="Region", default_value=config['aws']['region'])

# query for the training job, write it to query_training.py
fpath: str = os.path.join(config['scripts']['source_dir'], config['scripts']['query'])
logger.info(f"writing transform step query to {fpath}")
Path(fpath).write_text(f"BATCH_INFERENCE_QUERY=\"\"\"{config['transform_step']['query']}\"\"\"")

## represents the parameters being used to track the catalog and the schema needed to connect to the presto server
presto_catalog_parameter = ParameterString(name="Catalog", default_value=config['presto']['catalog'])
presto_schema_parameter = ParameterString(name="Schema", default_value=config['presto']['schema'])

<a id='parameters'></a>

### Pipeline input parameters

Pipeline Parameters are input parameter when triggering a pipeline execution. They need to be explicitly defined when creating the pipeline and contain default values.

Create parameters for the inputs to the pipeline. In this case, parameters will be used for:

- `ProcessingInstanceType` - What EC2 instance type to use for processing.
- `TrainingInstanceType` - What EC2 instance type to use for training.

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

# What instance type to use for processing.
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value=config['data_processing_step']['processing_instance_type']
)


# Create SKlearn processor object,
# The object contains information about what instance type to use, the IAM role to use etc.
# A managed processor comes with a preconfigured container, so only specifying version is required.

est_cls = sagemaker.sklearn.estimator.SKLearn

sklearn_processor = FrameworkProcessor(
                                     estimator_cls=est_cls,
                                     framework_version=config['training_step']['sklearn_framework_version'],
                                     role=role,
                                     instance_type=processing_instance_type,
                                     instance_count=config['data_processing_step']['instance_count'],
                                     tags=config['data_processing_step']['tags'], 
                                     sagemaker_session=pipeline_session,
                                     base_job_name=config['pipeline']['base_job_name'], )

#### Create an Image URI object to use while creating the model from the approved model in the registry

In [None]:
# Fetch container to use for training
image_uri = sagemaker.image_uris.retrieve(
    framework="sklearn",
    region=config['aws']['region'],
    version=config['training_step']['sklearn_framework_version'],
    py_version="py3",
    instance_type=config['data_processing_step']['processing_instance_type'],
)
logger.info(f"processing step image_uri={image_uri}")

### Now, step is to approve the model
---
Finally, approve the model to launch the model deployment process

In [None]:
sm = boto3.client("sagemaker")

# list all model packages and select the latest one
model_packages = []

for p in sm.get_paginator('list_model_packages').paginate(
        ModelPackageGroupName=config['register_model_step']['model_group'],
        SortBy="CreationTime",
        SortOrder="Descending",
    ):
    model_packages.extend(p["ModelPackageSummaryList"])

if len(model_packages) == 0:
    raise Exception(f"No model package is found for {config['register_model_step']['model_group']} model package group")

## print the latest model, approve it
latest_model_package_arn = model_packages[0]["ModelPackageArn"]
logger.info(f"for model_group={config['register_model_step']['model_group']}, latest_model_package_arn={latest_model_package_arn}")

The following statement sets the ModelApprovalStatus for the model package to Approved. The model package state change will launch the EventBridge rule and the rule will launch the CodePipeline CI/CD pipeline with model deployment.

In [None]:
## updating the latest model package to approved status to use it for batch inference
model_package_update_response = sm.update_model_package(
    ModelPackageArn=latest_model_package_arn,
    ModelApprovalStatus="Approved",
)

## PART 2: Batch Transform Pipeline: Prepare Batch Data & Perform Batch Inference

### first step is to get the latest batch data from presto and use that for batch transform step

In [None]:
# Use the sklearn_processor in a SageMaker Pipelines ProcessingStep
# Configure the ProcessingStep

## represents the output processing for the batch pre processing step
batch_output=[
        ProcessingOutput(
            output_name="batch",
            source="/opt/ml/processing/batch",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "batch",
                ], 
            ),
        ),
    ]


# Use the sklearn_processor's run method and configure the batch preprocessing step
step_args = sklearn_processor.run(
    code=config['scripts']['batch_transform_get_data'],
    source_dir=config['scripts']['source_dir'], 
    outputs=batch_output,
    arguments=[
        "--host", host_parameter,
        "--port", port_parameter,
        "--presto_credentials_key", presto_parameter,
        "--region", region_parameter,
        "--presto_catalog", presto_catalog_parameter,
        "--presto_schema", presto_schema_parameter,
    ],
)


batch_data_prep = ProcessingStep(
    name=config['data_processing_step']['step_name'],
    step_args=step_args,
)

### Batch Transform Configuration begins below:
---

1. Create the model with the model image uri, refer to the 'inference.py' script that grabs information on features to use while making predictions.

2. Create the model which automatically triggers the training and the preprocess data step

3. Run the transformer step on the created model and 

In [None]:
client = boto3.client("sagemaker")
list_model_packages_response = client.list_model_packages(ModelPackageGroupName=config['register_model_step']['model_group'])
logger.info(f"list_model_packages_response={list_model_packages_response}")

latest_model_version_arn = list_model_packages_response["ModelPackageSummaryList"][0][
    "ModelPackageArn"
]
logger.info(f"latest_model_version_arn={latest_model_version_arn}")

In [None]:
try:
    latest_approved_model_package = client.describe_model_package(ModelPackageName=latest_model_version_arn)

    if latest_approved_model_package['ModelApprovalStatus'] == "Approved":
        logger.info(f"The latest approved model package is --> {latest_approved_model_package}")
        model_data_url = latest_approved_model_package['InferenceSpecification']['Containers'][0]['ModelDataUrl']
        logger.info(f"The model data for the latest approved model arn {latest_model_version_arn} is stored in {model_data_url}")
    else:
        # If the model approval status is not PendingApproval, throw an error exception
        error_message = f"ModelApprovalStatus is not PendingApproval. Current status: {latest_approved_model_package['ModelApprovalStatus']}"
        logger.error(error_message)
        raise ValueError(error_message)

except Exception as e:
    logger.error(f"An error occurred while tracking the approved model: {str(e)}")
    raise e



In [None]:
## create the model image based on the model data and refer to the inference script as an entry point for 
## batch inference
model = Model(
    image_uri=image_uri,
    entry_point=config['scripts']['batch_inference'],
    model_data=model_data_url,
    sagemaker_session=pipeline_session,
    role=role,
)

#### Create the model image from the approved model for batch inference in the next step

In [None]:
step_create_model = ModelStep(
    name=config['register_model_step']['model_name'],
    step_args=model.create(instance_type=config['transform_step']['instance_type']),
)

### Define a Transform Step to Perform Batch Transformation

Now that a model instance is defined, create a Transformer instance with the appropriate model type, compute instance type, and desired output S3 URI.

Specifically, pass in the ModelName from the CreateModelStep, step_create_model properties. The CreateModelStep properties attribute matches the object model of the DescribeModel response object.

In [None]:


# Capture the current time for recording the start and end time for the batch transform step
et = datetime.utcnow()
st = et - timedelta(hours=config['transform_step']['num_hours_to_go_back'])
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=config['transform_step']['instance_type'],
    instance_count=config['transform_step']['instance_count'],
    strategy="MultiRecord",
    accept="text/csv",
    assemble_with="Line",
    output_path=f"s3://{bucket}",
    tags = config['transform_step']['tags'], 
    env={
        'START_TIME_UTC': st.strftime('%Y-%m-%d %H:%M:%S'), 
        'END_TIME_UTC': et.strftime('%Y-%m-%d %H:%M:%S'),
    }
    
)

### Pass in the transformer instance and the TransformInput with the batch_data pipeline parameter defined earlier.

In [None]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

# Assuming batch_prediction_data is the S3 path where your input data is stored
transform_input = TransformInput(
    data=batch_data_prep.properties.ProcessingOutputConfig.Outputs[
                "batch" ## this refers to the batch data that is configured within s3 after the batch preprocessing step
            ].S3Output.S3Uri,
    
    content_type="text/csv", 
    split_type="Line")

step_transform = TransformStep(
    name=config['transform_step']['step_name'], transformer=transformer, inputs=transform_input, 
)

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = config['pipeline']['transform_pipeline_name']

batch_transform_pipeline = Pipeline(
    name=pipeline_name,
    parameters=[processing_instance_type,
    host_parameter,
    presto_parameter,
    region_parameter,
    port_parameter,
    target_parameter, 
    feature_parameter,
    presto_catalog_parameter,
    presto_schema_parameter,],
    steps=[
        batch_data_prep,
        step_create_model, 
        step_transform,
    ],
)

In [None]:
batch_transform_pipeline.upsert(role_arn=role, tags = config['pipeline']['tags'])

In [None]:
# use the sagemaker client to start the pipeline execution with the PipelineName referencing the previously created pipeline
session = boto3.Session()
sagemaker_client = session.client(service_name="sagemaker")
response = sagemaker_client.start_pipeline_execution(
    PipelineName=batch_transform_pipeline.name
)
print(response)


In [None]:
st = time.perf_counter()
logger.info(f"starting pipeline={batch_transform_pipeline.name}")
while True:
    resp = client.describe_pipeline_execution(
    PipelineExecutionArn=response['PipelineExecutionArn']
    )
    status = resp['PipelineExecutionStatus']
    print(status)
    time.sleep(30)
    if status != 'Executing':
        break
elapsed_time = time.perf_counter() - st
logger.info(f"pipeline={batch_transform_pipeline.name} took {elapsed_time:.2f} seconds to run")

In [None]:
logger.info(json.dumps(resp, indent=2, default=str))