# Cyanite Audio Analyzer SageMaker Inference Demo

This notebook demonstrates how to deploy and call the Cyanite Audio Analyzer marketplace algorithm using the Sagemaker Python SDK.

Here, we will only cover a basic usage scenario. For more information on advanced functionality please refer to the [Python Sagemaker SKD Documentation](https://sagemaker.readthedocs.io/en/stable/index.html) and the [Amazon Sagemaker API Reference](https://docs.aws.amazon.com/sagemaker/latest/APIReference/Welcome.html).

__Please Note__: In order to follow this notebook you have to be subscribed to the Cyanite Audio Analyzer on the AWS Marketplace.

Contents:
- [Real-time Inference](#rtinference)
- [Asyncronous Inference](#asyncinference)
- [Batch Transformation Jobs](#batchtransform)


## 1. Imports

In [None]:
# install latest SageMaker SDK
!pip install sagemaker

In [2]:
import json
import pathlib
from datetime import datetime

import sagemaker
from sagemaker import ModelPackage

## 2. Prerequisites

Set up necessary variables. 

***Important***: 

- Please replace `model_arn` with the ARN of your Audio Analyzer algorithm subscription. 

- Replace `s3_bucket` with your s3 bucket name and `base_prefix` with the prefix where you want to store outputs of the asynchronous inference and batch transformation job. Make sure that your `role` has read/write rights for the chosen bucket.

Optionally, you can set your own `audio_filepath`. Supported file formats are _mp3_, _wav_, _m4a_, _mp4_ and _mpga_. 

You can also choose a  different `base_name` or manually set the IAM `role`.


In [3]:
# Replace this with the ARN of you Audio Analyzer algorithm subscripiton
model_arn = "your-arn-here"

# base name for endpoint deployment
base_name = "cyanite-analyzer-demo"

# bucket for storing async and transformation results
s3_bucket = "your-bucket-here"

# path on bucket, where to store async and transformation results
base_prefix = "your-prefix-here"

# Path to audio files
audio_filepath = "example-audios"

# supported file formats
file_formats = ["mp3", "wav", "m4a", "mp4", "mpga"]

# Replace this with a valid IAM role
role = sagemaker.get_execution_role()

# Sagemaker Session
sess = sagemaker.Session()

In [4]:
# define job/endpoint names
now = datetime.now()
timestamp = now.strftime("%d-%m-%Y-%H-%M-%S")

rt_endpoint_name = f"{base_name}-rt-{timestamp}"
async_endpoint_name = f"{base_name}-async-{timestamp}"
transform_job_name = f"{base_name}-transformer-{timestamp}"

# gather available audio files
available_files = [
    xx for x in file_formats for xx in pathlib.Path(audio_filepath).rglob(f"*.{x}")
]
print(f"Available audio files:\n{[x.as_posix() for x in available_files]}")

Available audio files:
['example-audios/podcast-jingle-01-by-musikhalde-from-filmmusic-io.mp3']


## Prepare SageMaker Model

First we instantiate the SageMaker model from your provided Model Package ARN.

This model object is the starting point for all three deployment types.

In [6]:
# create SageMaker model from marketplace model package
model = ModelPackage(
    role=role,
    model_package_arn=model_arn,
    sagemaker_session=sess,
)

<a name="rtinference"></a>
## 3. Real-time Inference

In the real-time inference scenario, our algorithm is deployed on an EC2 instance. 
We can then send audio data directly to the deployed endpoint using REST calls. 
The endpoint will handle the request and send the results back to the caller. 

***Important:***
- Real-time inference requests cannot exceed a payload size of 6MB
- Inference requests will time out after 60 seconds. Depending on the chose EC2 instance and the length of the audio, processing time might exceed that limit. In this case, please refer to another inference type.

### Deploy Endpoint

In [7]:
print(f"Deploying: {rt_endpoint_name}")

model.deploy(
    instance_type="ml.m5.xlarge",
    initial_instance_count=1,
    endpoint_name=rt_endpoint_name,
)

Deploying: cyanite-analyzer-demo-rt-10-11-2022-21-09-28
---------!

### Call Endpoint

In [8]:
# create predictor instance for endpoint
predictor = sagemaker.predictor.Predictor(
    rt_endpoint_name,
    sagemaker_session=sess,
    deserializer=sagemaker.deserializers.JSONDeserializer(),
)

In [9]:
# Path to store analysis results
results_filepath = "results"

# create results filepath
pathlib.Path(results_filepath).mkdir(parents=True, exist_ok=True)

for path in available_files:

    print(f"Processing: {path.name}")

    # load audio as binary
    with open(path, "rb") as f:
        raw_audio_data = f.read()

    # call endpoint
    out_json = predictor.predict(data=raw_audio_data)

    # store result
    out_path = (
        path.as_posix().replace(audio_filepath, results_filepath).replace("mp3", "json")
    )
    with open(out_path, "w") as f:
        f.write(json.dumps(out_json))

    print(f"Output stored at {out_path}\n")

Processing: podcast-jingle-01-by-musikhalde-from-filmmusic-io.mp3
Output stored at results/podcast-jingle-01-by-musikhalde-from-filmmusic-io.json



### Optional: Display Results

In [None]:
import IPython.display as ipd

# display latest processing result
ipd.JSON(json.load(open(out_path)))

### Delete Endpoint

In [11]:
# after usage, we can delete the enpoint
predictor.delete_endpoint()

<a name="asyncinference"></a>
## 4. Asynchronous Inference

In the asynchronous inference scenario, our algorithm is deployed on an EC2 instance. 
Instead of calling the deployed endpoint directly, we send the input data to S3 for intermediary storage. 
AWS will handle the data using an internal queue and will do calls to the endpoint when it is available. 
Results will again be stored on S3 where we can fetch them.

***Important:***
- Asynchronous inference requests can be up to 500MB in size
- requests will time out after 15 minutes 

### Deploy Endpoint

In [12]:
# set up s3 paths
async_input_prefix = f"{base_prefix}/async_inputs"
async_output_prefix = f"{base_prefix}/async_outputs"

# create async config
# here we define, where on s3 the results will be stored
async_config = sagemaker.async_inference.async_inference_config.AsyncInferenceConfig(
    output_path=f"s3://{s3_bucket}/{async_output_prefix}"
)

In [13]:
print(f"Deploying: {async_endpoint_name}")

model.deploy(
    instance_type="ml.m5.xlarge",
    initial_instance_count=1,
    async_inference_config=async_config,
    endpoint_name=async_endpoint_name,
)

Deploying: cyanite-analyzer-demo-async-10-11-2022-21-09-28
---------!

### Call Endpoint

In [14]:
# get predictor instance for an asynchronous endpoint
async_predictor = sagemaker.predictor_async.AsyncPredictor(
    sagemaker.predictor.Predictor(
        async_endpoint_name,
        sagemaker_session=sess,
        deserializer=sagemaker.deserializers.JSONDeserializer(),
    ),
)

In [15]:
# local path to store analysis results
results_filepath = "results_async"

# create results filepath
pathlib.Path(results_filepath).mkdir(parents=True, exist_ok=True)

# process data
for path in available_files:

    print(f"Processing: {path.name}")

    # load audio as binary
    with open(path, "rb") as f:
        raw_audio_data = f.read()

    # call endpoint
    async_response = async_predictor.predict_async(
        data=raw_audio_data,
        input_path=f"s3://{s3_bucket}/{async_input_prefix}/{path.name}",
    )

    print("Waiting for results to be available..")
    out_json = async_response.get_result(
        waiter_config=sagemaker.async_inference.waiter_config.WaiterConfig(
            max_attempts=20, delay=5
        )
    )

    # store result
    out_path = (
        path.as_posix().replace(audio_filepath, results_filepath).replace("mp3", "json")
    )
    with open(out_path, "w") as f:
        f.write(json.dumps(out_json))

    print(f"Output stored at {out_path}\n")

Processing: podcast-jingle-01-by-musikhalde-from-filmmusic-io.mp3
Waiting for results to be available..
Output stored at results_async/podcast-jingle-01-by-musikhalde-from-filmmusic-io.json



### Optional: Display Results

In [None]:
import IPython.display as ipd

# display latest processing result
ipd.JSON(json.load(open(out_path)))

### Delete Endpoint

In [17]:
# after usage, we can delete the enpoint
async_predictor.delete_endpoint()

<a name="batchtransform"></a>
## 5. Batch Transformation Jobs

Batch transformation jobs can be utilized in case we have an one-time batch of data to be processed. 
A batch transformation job will deploy our algorithm on an EC2 instance, process our pre-defined data batch and shut down afterwards. 

To start a transformation job, we will need to provide a S3 location containing all data to be processed. 
Alternatively we can provide the transformer with a manifest file which points to the locations of data to be processed (not covered here).

Results will again be stored on S3 where we can fetch them.

***Important:***
- The maximum data size per file cannot exceed 100MB 
- there is no timeout limit for request processing


### Upload Data

In [18]:
# set up s3 paths
batch_input_prefix = f"{base_prefix}/batch_inputs"
batch_output_prefix = f"{base_prefix}/batch_outputs"

# instantiate uploader
s3_uploader = sagemaker.s3.S3Uploader()

# upload audios to be processed
for path in available_files:
    s3_uploader.upload(
        local_path=path.as_posix(),
        desired_s3_uri=f"s3://{s3_bucket}/{batch_input_prefix}",
    )

### Set Up Transformer

In [19]:
# initialize transformer
batch_transformer = model.transformer(
    instance_type="ml.m5.xlarge",
    instance_count=1,
    strategy="SingleRecord",
    max_concurrent_transforms=1,
    max_payload=100,
    output_path=f"s3://{s3_bucket}/{batch_output_prefix}",
)

### Run Transform Job

In [20]:
print(f"Starting transform job: {transform_job_name}")

batch_transformer.transform(
    job_name=transform_job_name,
    data=f"s3://{s3_bucket}/{batch_input_prefix}",
    data_type="S3Prefix",
    split_type=None,
    content_type=None,
    compression_type=None,
)

Starting transform job: cyanite-analyzer-demo-transformer-10-11-2022-21-09-28
........................................[34mStarting the inference server with 1 worker(s).[0m
[34mInternal worker timeout set to 900s.[0m
[34m[2022-11-10 21:25:21 +0000] [10] [INFO] Starting gunicorn 20.1.0[0m
[34m[2022-11-10 21:25:21 +0000] [10] [INFO] Listening at: unix:/tmp/gunicorn.sock (10)[0m
[34m[2022-11-10 21:25:21 +0000] [10] [INFO] Using worker: sync[0m
[34m[2022-11-10 21:25:21 +0000] [13] [INFO] Booting worker with pid: 13[0m
[34m2022-11-10 21:25:32.275342: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: UNKNOWN ERROR (34)[0m
[34m2022-11-10 21:25:32.275390: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (45f116645c2d): /proc/driver/nvidia/version does not exist[0m
[34m2022-11-10 21:25:32.275679: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized w

In [21]:
# local path to store analysis results
results_filepath = "results_transform"

# instantiate downloader
s3_downloader = sagemaker.s3.S3Downloader()

s3_downloader.download(
    s3_uri=f"s3://{s3_bucket}/{batch_output_prefix}",
    local_path=results_filepath,
)

### Optional: Display Results

In [None]:
import IPython.display as ipd

transform_results = [x for x in pathlib.Path(results_filepath).rglob("*")]

# display first result
# please note, that the default file ending for batch transformation
# outputs is .out
ipd.JSON(json.load(open(transform_results[0])))

## 6. Tips
- Choose the `instance_type` according to your requirements. For best runtime, choose a GPU accelerated machine. See [here](https://aws.amazon.com/sagemaker/pricing/) for more information
- You can scale the amount of machines used for your endpoint/transform jobs
