# Use Sonic 3 with AWS Jumpstart

This sample notebook shows you how to deploy Sonic 3 Models from Cartesia AI as an endpoint on Amazon SageMaker.

Note: This is a reference notebook and it cannot run unless you make changes suggested in the notebook.

## Prerequisites

Note: Open this notebook from an Amazon SageMaker Notebook Instance or Amazon SageMaker Studio.

Ensure that IAM role used has AmazonSageMakerFullAccess

To deploy the ML model successfully using the steps in this notebook, ensure that either:

Your IAM role has the following three permissions and you have authority to make AWS Marketplace subscriptions in the AWS account used:
- aws-marketplace:ViewSubscriptions
- aws-marketplace:Unsubscribe
- aws-marketplace:Subscribe

Or your AWS account has a subscription to the [Sonic 3 Sagemaker Listing](https://aws.amazon.com/marketplace/pp/prodview-w2bmik3jypagm). If so, skip the Subscribe to the model package step.

## Subscribe to the model package

To subscribe to the Sonic 3 Sagemaker Model Package:

- Open the [Sonic 3 Sagemaker Model Package listing page](https://aws.amazon.com/marketplace/pp/prodview-w2bmik3jypagm)
- On the AWS Marketplace listing, click on the Continue to subscribe button.
- On the Subscribe to this software page, review and click on "Accept Offer" if you and your organization accept the EULA, pricing, and support terms.

## Create an endpoint

In [None]:
import sagemaker
from sagemaker import ModelPackage
import boto3
import asyncio
import base64
import json
import time
import wave
from typing import Generator, Iterable, List

ENDPOINT_NAME = "sonic-3"
ROLE = "<input the arn of the role you want to deploy the endpoint with>"

In [None]:
MODEL_PACKAGE_MAP = {
    "us-east-2": 'arn:aws:sagemaker:us-east-2:570011132906:model-package/Sonic3-Sagemaker-V5'
}

region = boto3.Session().region_name
if region not in MODEL_PACKAGE_MAP.keys():
    raise ("UNSUPPORTED REGION")
package_arn = MODEL_PACKAGE_MAP[region]

In [None]:
sagemaker_session = sagemaker.Session()

In [None]:
# Verify the active account
sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()["Account"]
print(f"Active account ID: {account_id}")

Create a deployable ModelPackage. For Sonic 3, deploy onto an ml.g6e.xlarge instance. Specify it as instance_type below.

Note: be sure to request service quota for `ml.g6e.xlarge for endpoint usage` under the `AWS Sagemaker Service`

In [None]:
model = ModelPackage(role=ROLE, model_package_arn=package_arn, sagemaker_session=sagemaker_session)

instance_type="ml.g6e.xlarge"
deployed_model = model.deploy(initial_instance_count=1,instance_type=instance_type,endpoint_name=ENDPOINT_NAME)

You should now be able to see an ongoing endpoint creation in the AWS Sagemaker AI > Endpoints page on AWS console

## Run an inference request

Lets feed some text to the model to get a inference response stream

In [None]:
sagemaker_runtime = boto3.client("sagemaker-runtime", region_name=region)

In [None]:
def events_from_aws_stream(
    eventstream: Iterable[dict]
) -> Generator[dict, None, None]:
    """
    Convert SageMaker event stream (InvokeEndpointWithResponseStream) into json events
    """
    buffered_text = ""
    for event in eventstream:
        if "PayloadPart" in event:
            chunk_bytes = event["PayloadPart"]["Bytes"]
            chunk_text = chunk_bytes.decode("utf-8")
            if chunk_text.endswith("\n"):
                yield json.loads(buffered_text + chunk_text)
                buffered_text = ""
            else:
                buffered_text += chunk_text
        elif "ModelStreamError" in event:
            err = event["ModelStreamError"]
            raise RuntimeError(
                f"ModelStreamError: {err.get('ErrorCode')}: {err.get('Message')}"
            )
        elif "InternalStreamFailure" in event:
            raise RuntimeError("InternalStreamFailure from SageMaker")
        else:
            # Unknown event type; ignore or log
            continue


async def get_tts_chunks_async():
    def sync_stream():
        """Invokes the AWS response streaming endpoint and returns processed 
        responses from aws event stream"""
        body_str = json.dumps(
            {
                "context_id": "0",
                "transcript": """
In contemporary hyperdimensional differential-topological metamathematics,
<speed ratio="1.5"/><emotion value="mysterious" /><volume ratio="4.0"/>
the quasi-isomorphic ultratranscendentalization of pseudoholomorphic manifolds
necessitates an pistemologically recontextualized framework for analyzing
infinitesimally perturbative tensorial eigen-decompositions.
                """,
                "language": "en",
                "output_format": {
                    "container": "raw",
                    "sample_rate": 44100,
                    "encoding": "pcm"
                },
                "voice": {
                    "mode": "id",
                    "id": "bf0a246a-8642-498a-9950-80c35e9276b5"
                },
                "add_timestamps": True,
                "add_phoneme_timestamps": True
            }
        )

        request_start_time = time.perf_counter()
        response = sagemaker_runtime.invoke_endpoint_with_response_stream(
            EndpointName=ENDPOINT_NAME,
            Body=body_str,
            ContentType="application/json",
            Accept="text/event-stream",
        )
        print(
            f"[METRIC] InvokeEndpointWithResponseStream request time: {time.perf_counter() - request_start_time:.3f}s"
        )
        print(response)

        event_stream = response.get("Body")
        return events_from_aws_stream(event_stream)

    audio_chunks = []
    start_time = time.perf_counter()
    first_chunk_time = None

    async def consume_events():
        nonlocal first_chunk_time

        # Run the synchronous generator in a background thread
        # Process response events and extract audio chunks
        for chunk in await asyncio.to_thread(sync_stream):
            if chunk["type"] == "chunk":
                if first_chunk_time is None:
                    first_chunk_time = time.perf_counter()
                    ttfa = first_chunk_time - start_time
                    print(f"[METRIC] Time to first audio: {ttfa:.3f}s")

                audio_chunks.append(chunk["data"])
            elif chunk["type"] == "timestamps":
                print(json.dumps(chunk["word_timestamps"]))
            elif chunk["type"] == "phoneme_timestamps":
                print(json.dumps(chunk["phoneme_timestamps"]))
            elif chunk["type"] == "done":
                print("[LOG] Stream finished.")
            elif chunk["type"] == "error":
                print(f"[ERROR] {chunk['data']}")

    try:
        await consume_events()
    except sagemaker_runtime.exceptions.ModelError as e:
        print(e.response['Message'])
        print(e.response["OriginalStatusCode"])

    # Calculate full stream time
    total_time = time.perf_counter() - start_time
    print(f"[METRIC] Total TTS stream time: {total_time:.3f}s")

    return audio_chunks

chunks = await get_tts_chunks_async()

Now we can save the audio chunk to a local wav file

In [None]:
def save_audio_chunks_to_wav(
    chunks: List[str], output_file: str = "output.wav", sample_rate: int = 44100
):
    """Decode base64 audio chunks and save as WAV file."""
    combined_audio = bytearray()
    for chunk_data in chunks:
        audio_bytes = base64.b64decode(chunk_data)
        combined_audio.extend(audio_bytes)

    with wave.open(output_file, "wb") as wav_file:
        wav_file.setnchannels(1)
        wav_file.setsampwidth(2)
        wav_file.setframerate(sample_rate)
        wav_file.writeframes(combined_audio)

    file_size = len(combined_audio)
    duration = len(combined_audio) / (sample_rate * 2)  # 16-bit = 2 bytes per sample
    print(f"[LOG] Saved WAV: {output_file} ({file_size} bytes, {duration:.2f}s)")
    return file_size, duration

save_audio_chunks_to_wav(chunks, output_file="output.wav")