# Deploy Cochl.Sense Model Package from AWS Marketplace

[Cochl.Sense](https://cochl.ai) is an AI-powered audio recognition API that analyzes audio and detects various sound events such as music, speech, sirens, alarms, and more.

This notebook demonstrates how to deploy and use the Cochl.Sense model from AWS Marketplace using Amazon SageMaker.

## Contents

1. [Subscribe to the model package](#1.-Subscribe-to-the-model-package)
2. [Set up the environment](#2.-Set-up-the-environment)
3. [Real-time Inference](#3.-Real-time-Inference)
4. [Asynchronous Inference](#4.-Asynchronous-Inference)
5. [Batch Transform](#5.-Batch-Transform)
6. [Clean up](#6.-Clean-up)

## Usage Instructions

You can run this notebook one cell at a time by pressing Shift+Enter.

## 1. Subscribe to the model package

To subscribe to the model package:

1. Open the model package listing page: **Cochl.Sense**
2. On the AWS Marketplace listing, click on the **Continue to Subscribe** button.
3. On the **Subscribe to this software** page, review and click on **"Accept Offer"** if you and your organization agree with EULA, pricing, and support terms.
4. Once you click on **Continue to configuration** button and then choose a **region**, you will see a **Product Arn** displayed. This is the model package ARN that you need to specify while creating a deployable model using Boto3.

Copy the ARN corresponding to your region and specify it in the following cell.

In [None]:
# Specify the model package ARN from AWS Marketplace
model_package_arn = "<Customer to specify Model package ARN corresponding to their AWS region>"

## 2. Set up the environment

Install required packages and set up SageMaker session.

In [None]:
import boto3
import json
import time
import sagemaker
from sagemaker import ModelPackage, get_execution_role

In [None]:
# Initialize SageMaker session
sagemaker_session = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name
bucket = sagemaker_session.default_bucket()

print(f"SageMaker Role: {role}")
print(f"Region: {region}")
print(f"Default Bucket: {bucket}")

### Create a deployable model from the model package

In [None]:
model_name = f"cochl-sense-model-{int(time.time())}"

model = ModelPackage(
    role=role,
    model_package_arn=model_package_arn,
    sagemaker_session=sagemaker_session,
    name=model_name
)

print(f"Model name: {model_name}")

## 3. Real-time Inference

Real-time inference is suitable for low-latency, interactive use cases. The maximum payload size is **25 MB** with a processing time limit of **60 seconds**.

### Deploy the endpoint

In [None]:
# Deploy a real-time endpoint
realtime_endpoint_name = f"cochl-sense-realtime-{int(time.time())}"
instance_type = "ml.g4dn.xlarge"  # GPU instance for faster inference

predictor = model.deploy(
    initial_instance_count=1,
    instance_type=instance_type,
    endpoint_name=realtime_endpoint_name
)

print(f"Endpoint deployed: {realtime_endpoint_name}")

### Prepare input data

The input must be raw audio binary data. Supported formats:
- `audio/mp3`
- `audio/wav`
- `audio/ogg`
- `application/octet-stream` (auto-detection)
- `audio/x-raw; rate={sample_rate}; format={sample_format}; channels={num_channels}` (for raw PCM)

In [None]:
# Load sample audio file
audio_file_path = "data/sample.mp3"
content_type = "audio/mp3"

with open(audio_file_path, "rb") as f:
    audio_data = f.read()

print(f"Loaded audio file: {audio_file_path}")
print(f"File size: {len(audio_data)} bytes")

### Invoke the endpoint

In [None]:
# Invoke the endpoint using boto3 runtime client
runtime_client = boto3.client("sagemaker-runtime")

response = runtime_client.invoke_endpoint(
    EndpointName=realtime_endpoint_name,
    ContentType=content_type,
    Body=audio_data
)

result = json.loads(response["Body"].read().decode("utf-8"))
print(json.dumps(result, indent=2))

### Invoke with sensitivity control (optional)

You can adjust the detection sensitivity using `CustomAttributes`:

- `X-Default-Sensitivity`: Default sensitivity for all tags. Range: [-2, 2] (integer). Default: 0
- `X-Tags-Sensitivity`: Per-tag sensitivity adjustment as JSON string. Range: [-2, 2] (integer)

In [None]:
# Invoke with custom sensitivity settings
tags_sensitivity = json.dumps({"Siren": 2, "Laughter": -2})
custom_attributes = f"X-Default-Sensitivity=1,X-Tags-Sensitivity={tags_sensitivity}"

response = runtime_client.invoke_endpoint(
    EndpointName=realtime_endpoint_name,
    ContentType=content_type,
    CustomAttributes=custom_attributes,
    Body=audio_data
)

result = json.loads(response["Body"].read().decode("utf-8"))
print(json.dumps(result, indent=2))

### Delete the real-time endpoint

Delete the endpoint to avoid incurring charges when not in use.

In [None]:
# Delete the real-time endpoint
sagemaker_client = boto3.client("sagemaker")
sagemaker_client.delete_endpoint(EndpointName=realtime_endpoint_name)
print(f"Endpoint deleted: {realtime_endpoint_name}")

## 4. Asynchronous Inference

Asynchronous inference is suitable for large files or when you don't need immediate results. The maximum payload size is **1 GB** with a processing time limit of **60 minutes**.

### Create an asynchronous inference endpoint

In [None]:
from sagemaker.async_inference import AsyncInferenceConfig

async_endpoint_name = f"cochl-sense-async-{int(time.time())}"
async_output_path = f"s3://{bucket}/cochl-sense/async-output/"

# Create async inference config
async_config = AsyncInferenceConfig(
    output_path=async_output_path,
    max_concurrent_invocations_per_instance=4
)

# Deploy async endpoint
async_predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.g4dn.xlarge",
    endpoint_name=async_endpoint_name,
    async_inference_config=async_config
)

print(f"Async endpoint deployed: {async_endpoint_name}")

### Upload input file to S3

In [None]:
# Upload audio file to S3
s3_client = boto3.client("s3")
s3_input_key = "cochl-sense/async-input/sample.mp3"
s3_input_location = f"s3://{bucket}/{s3_input_key}"

s3_client.upload_file(audio_file_path, bucket, s3_input_key)
print(f"Uploaded to: {s3_input_location}")

### Invoke the async endpoint

In [None]:
# Invoke async endpoint
response = runtime_client.invoke_endpoint_async(
    EndpointName=async_endpoint_name,
    InputLocation=s3_input_location,
    ContentType="audio/mp3"
)

output_location = response["OutputLocation"]
print(f"Output will be stored at: {output_location}")

### Wait for and retrieve the result

In [None]:
# Poll for the result
import urllib.parse

parsed_url = urllib.parse.urlparse(output_location)
output_bucket = parsed_url.netloc
output_key = parsed_url.path.lstrip("/")

max_retries = 60
retry_interval = 5

for i in range(max_retries):
    try:
        obj = s3_client.get_object(Bucket=output_bucket, Key=output_key)
        result = json.loads(obj["Body"].read().decode("utf-8"))
        print("Inference completed!")
        print(json.dumps(result, indent=2))
        break
    except s3_client.exceptions.NoSuchKey:
        print(f"Waiting for result... ({i+1}/{max_retries})")
        time.sleep(retry_interval)
else:
    print("Timeout waiting for async inference result.")

### Delete the async endpoint

In [None]:
# Delete the async endpoint
sagemaker_client.delete_endpoint(EndpointName=async_endpoint_name)
print(f"Async endpoint deleted: {async_endpoint_name}")

## 5. Batch Transform

Batch Transform is suitable for running inference on large datasets stored in S3.

For datasets with mixed audio formats (mp3, wav, ogg), use `application/octet-stream` as the ContentType to enable auto-detection.

### Upload batch input data to S3

In [None]:
# Upload sample files for batch transform
batch_input_prefix = "cochl-sense/batch-input/"
batch_output_prefix = "cochl-sense/batch-output/"

batch_input_path = f"s3://{bucket}/{batch_input_prefix}"
batch_output_path = f"s3://{bucket}/{batch_output_prefix}"

# Upload sample file
s3_client.upload_file(audio_file_path, bucket, f"{batch_input_prefix}sample.mp3")
print(f"Batch input path: {batch_input_path}")
print(f"Batch output path: {batch_output_path}")

### Create and run a batch transform job

In [None]:
transform_job_name = f"cochl-sense-batch-{int(time.time())}"

# Create batch transform job
transformer = model.transformer(
    instance_count=1,
    instance_type="ml.g4dn.xlarge",
    output_path=batch_output_path,
    max_payload=100,  # Max payload size in MB
    max_concurrent_transforms=1,
)

# Start the transform job
transformer.transform(
    data=batch_input_path,
    data_type="S3Prefix",
    content_type="application/octet-stream",  # Auto-detection for mixed formats
    split_type="None",
    job_name=transform_job_name,
    wait=False
)

print(f"Batch transform job started: {transform_job_name}")

### Monitor the batch transform job

In [None]:
# Wait for the transform job to complete
print("Waiting for batch transform job to complete...")

waiter = sagemaker_client.get_waiter("transform_job_completed_or_stopped")
waiter.wait(TransformJobName=transform_job_name)

# Check job status
response = sagemaker_client.describe_transform_job(TransformJobName=transform_job_name)
status = response["TransformJobStatus"]
print(f"Transform job status: {status}")

if status == "Completed":
    print(f"Output available at: {batch_output_path}")

### Retrieve batch transform results

In [None]:
# List and display batch transform output files
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=batch_output_prefix)

if "Contents" in response:
    for obj in response["Contents"]:
        key = obj["Key"]
        print(f"\n=== {key} ===")
        output_obj = s3_client.get_object(Bucket=bucket, Key=key)
        result = json.loads(output_obj["Body"].read().decode("utf-8"))
        print(json.dumps(result, indent=2))
else:
    print("No output files found.")

## 6. Clean up

Delete the model and clean up resources to avoid incurring unnecessary charges.

In [None]:
# Delete the model
sagemaker_client.delete_model(ModelName=model_name)
print(f"Model deleted: {model_name}")

### Optional: Clean up S3 data

In [None]:
# Uncomment the following lines to delete S3 data

# # Delete async input/output
# s3_client.delete_object(Bucket=bucket, Key=s3_input_key)

# # Delete batch input/output
# for prefix in [batch_input_prefix, batch_output_prefix]:
#     response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
#     if "Contents" in response:
#         for obj in response["Contents"]:
#             s3_client.delete_object(Bucket=bucket, Key=obj["Key"])

# print("S3 data cleaned up.")

### Unsubscribe from the model package

If you no longer need the model, you can unsubscribe from the AWS Marketplace:

1. Navigate to **Machine Learning** tab in [Your Software subscriptions page](https://aws.amazon.com/marketplace/ai/library?productType=ml&ref_=mlmp_gitdemo_ind498)
2. Locate the listing that you want to cancel the subscription for, and then click **Cancel Subscription**

## Output Format Reference

The inference result is returned in JSON format:

```json
{
  "metadata": {
    "content_type": "audio/mp3",
    "length_sec": 3.012,
    "size_byte": 12345,
    "name": "testfile.mp3"
  },
  "data": [
    {
      "tags": [
        {
          "name": "Drum",
          "probability": 0.51982635
        }
      ],
      "start_time": 0,
      "end_time": 2
    }
  ]
}
```

For available tags, see: https://docs.cochl.ai/sense/home/soundtags/