# UDACITY SageMaker Essentials: Batch Transform

In the last exercise, we asked you to reflect on the disadvantages of having to perform preprocessing on a local machine. In addition to those disadvantages, such as user error and hardware limitations, you may have also encountered another frustration in submitting a large amount of data to an endpoint. There may be network limitations on your end, there may be security/privacy concerns, and there might be an obvious performance advantage in parallelism that may be difficult to implement. 

Batch transform essentially addresses all of these issues. The primary use case for this is to make an inference on a dataset rather than making many individual calls to an endpoint. AWS SageMaker, similar to other tools that we encountered, does the heavy implementation lifting of reading data and splitting the burden among instances. All that's required of us is to give batch transform the correct directions to the data we want to submit. 

Alas, this dataset is unfortunately not quite in the correct format to be properly digested by batch transform. Although this tool is capable of digesting lists of json objects, it is not capable of the processing operations that we would ideally perform on it. So, yet again, we must preprocess data. 

## Exercise: Preprocess (again, again) and upload to S3

The cell below provides you two functions. The `split_sentences` preprocesses the reviews and you should be very familiar with function. Remember that the BlazingText expects a input with JSON format, the `cycle_data` formats the review to the following: {'source': 'THIS IS A SAMPLE SENTENCE'} and writes it into a file.

Using the cell to complete the following tasks:
* preprecessing reviews_Musical_Instruments_5.json 
* upload the file consisting of the data to s3

In [5]:
%load_ext dotenv
%dotenv

import boto3
import json
import sys
import os
import logging
import zipfile
from botocore.exceptions import ClientError

from src.paths import CODE_DIR, DATA_DIR, RAW_DATA_DIR, TRANSFORMED_DATA_DIR, TEST_DIR

# load BUCKET and ROLE from .env file
bucket = os.getenv("BUCKET")
role = os.getenv("ROLE")
region = os.getenv("REGION")

S3_LOCATION = f"s3://{bucket}/1"

# Adding custom folders to the system path for easy import
sys.path.extend([str(CODE_DIR)])

# Data file path
DATA_FILE_PATH = RAW_DATA_DIR / "reviews_Musical_Instruments_5.json.zip"
OUTPUT_FILE_PATH = TRANSFORMED_DATA_DIR / "reviews_Musical_Instruments_5.json"

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [10]:
# Input the the file to write the data 
file_name = "music_instruments_reviews.txt"
s3_prefix = "1"
s3_output_path = s3_prefix + '/' + file_name # The key within the bucket


# Function below unzips the archive to the local directory. 

def unzip_data(input_file_path: str, output_file_path: str) -> None:
    """
    Unzip the data file
    """
    # extracl all files to the output_file_path, that already includes the file name
    with zipfile.ZipFile(input_file_path, "r") as zip_ref:
        zip_ref.extractall(output_file_path)


def split_sentences(input_data):
    split_sentences = []
    for l in open(input_data, 'r'):
        l_object = json.loads(l)
        helpful_votes = float(l_object['helpful'][0])
        total_votes = l_object['helpful'][1]
        if total_votes != 0 and helpful_votes/total_votes != .5:  # Filter out same data as prior jobs. 
            reviewText = l_object['reviewText']
            sentences = reviewText.split(".") 
            for s in sentences:
                if s: # Make sure sentences isn't empty. Common w/ "..."
                    split_sentences.append(s)
    return split_sentences


# Format the data as {'source': 'THIS IS A SAMPLE SENTENCE'}
# And write the data into a file
def cycle_data(fp, data):
    for d in data:
        fp.write(json.dumps({'source':d}) + '\n')


# Uploads a file to an S3 bucket, with optional customizations.
def upload_file_to_s3(local_file_path, bucket_name, s3_output_path):
    """
    Uploads a file to an S3 bucket, with optional customizations.

    Parameters:
    local_file_path (str): The path to the local file to upload.
    bucket_name (str): The name of the S3 bucket to upload to.
    s3_output_path (str): The output path (key) in the S3 bucket.
    """

    # Create an S3 client
    s3_client = boto3.client('s3')

    # Upload the file
    try:
        s3_client.upload_file(local_file_path, bucket_name, s3_output_path)
        print(f"File {local_file_path} uploaded to {bucket_name}/{s3_output_path}")
    except ClientError as e:
        logging.error(e)
        return False
    


# Unzips file.
unzip_data(DATA_FILE_PATH, TRANSFORMED_DATA_DIR)

# Todo: preprocess reviews_Musical_Instruments_5.json 
sentences = split_sentences(OUTPUT_FILE_PATH)

# Write data to a file and upload it to s3.
with open(str(TRANSFORMED_DATA_DIR / file_name), 'w') as f:
    cycle_data(f, sentences)

# # upload_file_to_s3(file_name, s3_prefix)
upload_file_to_s3(str(TRANSFORMED_DATA_DIR / file_name), bucket, s3_output_path)

# Adjust the batch_transform_input_path
batch_transform_input_path = f"s3://{bucket}/{s3_output_path}"
print(batch_transform_input_path)


File /Users/carlos/Projects/aws-ml-engineering/data/transformed/music_instruments_reviews.txt uploaded to sigmoidal-bucket/1/music_instruments_reviews.txt
s3://sigmoidal-bucket/1/music_instruments_reviews.txt


## Exercise: Use Batch Transform to perform an inference on the dataset

We utilize batch transform through a transformer object. Similar to how we initialized a predictor object in the last exercise, complete the code below to initialize a transformer object and launch a transform job.   

You will need the following:

* Similar to last exercise, you will need to get a BlazingText image uri from AWS. The methodology you use to do so should be identical to the last exercise.  
* You will need to instantiate a "model" object.
* You will need to call the "transformer" method on the model object to create a transformer. We suggest using 1 instance of ml.m4.xlarge. If this isn't available in your region, feel free to use another instance, such as ml.m5.large
* You will need to use this transformer on the data we uploaded to s3. You will be able to do so by inserting an "S3Prefix" data_type and a "application/jsonlines" content_type, split by "Line".

Consult the following documentation: https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html

End-to-end, this process should take about 5 minutes on the whole dataset. While developing, consider uploading a subset of the data to s3, and evaluate on that instead. 


In [11]:
from sagemaker import get_execution_role
from sagemaker.model import Model
from sagemaker import image_uris

# Get the image uri using the "blazingtext" algorithm in your region. 

image_uri = image_uris.retrieve(
    framework='blazingtext', 
    region=region
)

# Get the model artifact from S3
model_data = f"{S3_LOCATION}/model_artifacts/blazingtext-2023-12-13-13-50-26-062/output/model.tar.gz"

# Get the s3 path for the batch transform data
batch_transform_output_path = S3_LOCATION + '/batch_transform_output'

# Define a model object
model = Model(
    image_uri=image_uri, 
    model_data=model_data, 
    role=role
)

# Define a transformer object, using a single instance ml.m4.xlarge. Specify an output path to your s3 bucket. 

transformer = model.transformer(
    instance_count=1, 
    instance_type='ml.m4.xlarge', 
    output_path=batch_transform_output_path
)

# Call the transform method. Set content_type='application/jsonlines', split_type='Line'

transformer.transform(
    data=batch_transform_input_path, 
    data_type='S3Prefix',
    content_type='application/jsonlines', 
    split_type='Line'
)

transformer.wait()


sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/carlos/Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/carlos/Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/carlos/Library/Application Support/sagemaker/config.yaml


INFO:sagemaker:Creating transform job with name: blazingtext-2023-12-13-19-02-04-162


...............................Arguments: serve
[12/13/2023 19:07:41 INFO 140107733665600] Finding and loading model
[12/13/2023 19:07:41 INFO 140107733665600] Trying to load model from /opt/ml/model/model.bin
[12/13/2023 19:07:42 INFO 140107733665600] Number of server workers: 4
[2023-12-13 19:07:42 +0000] [1] [INFO] Starting gunicorn 19.7.1
[2023-12-13 19:07:42 +0000] [1] [INFO] Listening at: http://0.0.0.0:8080 (1)
[2023-12-13 19:07:42 +0000] [1] [INFO] Using worker: sync
  return io.open(fd, *args, **kwargs)
[2023-12-13 19:07:42 +0000] [33] [INFO] Booting worker with pid: 33
[2023-12-13 19:07:42 +0000] [34] [INFO] Booting worker with pid: 34
Arguments: serve
[12/13/2023 19:07:41 INFO 140107733665600] Finding and loading model
[12/13/2023 19:07:41 INFO 140107733665600] Trying to load model from /opt/ml/model/model.bin
[12/13/2023 19:07:42 INFO 140107733665600] Number of server workers: 4
[2023-12-13 19:07:42 +0000] [1] [INFO] Starting gunicorn 19.7.1
[2023-12-13 19:07:42 +0000] [1] 