# APIM ❤️ OpenAI

## Azure OpenAI Realtime Audio lab
![flow](../../images/realtime-audio.gif)

Playground to try the APIM integration with the [Azure OpenAI Realtime API](https://learn.microsoft.com/en-us/azure/ai-services/openai/realtime-audio-reference) for text and audio.

### Result
![result](result.png)

### Prerequisites

- [Python 3.12 or later version](https://www.python.org/) installed
- [VS Code](https://code.visualstudio.com/) installed with the [Jupyter notebook extension](https://marketplace.visualstudio.com/items?itemName=ms-toolsai.jupyter) enabled
- [Python environment](https://code.visualstudio.com/docs/python/environments#_creating-environments) with the [requirements.txt](../../requirements.txt) or run `pip install -r requirements.txt` in your terminal
- [An Azure Subscription](https://azure.microsoft.com/free/) with [Contributor](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/privileged#contributor) + [RBAC Administrator](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/privileged#role-based-access-control-administrator) or [Owner](https://learn.microsoft.com/en-us/azure/role-based-access-control/built-in-roles/privileged#owner) roles
- [Azure CLI](https://learn.microsoft.com/cli/azure/install-azure-cli) installed and [Signed into your Azure subscription](https://learn.microsoft.com/cli/azure/authenticate-azure-cli-interactively)

▶️ Click `Run All` to execute all steps sequentially, or execute them `Step by Step`...


<a id='0'></a>
### 0️⃣ Initialize notebook variables

- Resources will be suffixed by a unique string based on your subscription id.
- Adjust the location parameters according your preferences and on the [product availability by Azure region.](https://azure.microsoft.com/explore/global-infrastructure/products-by-region/?cdn=disable&products=cognitive-services,api-management)
- Adjust the OpenAI model and version according the [availability by region.](https://learn.microsoft.com/azure/ai-services/openai/concepts/models) 

In [None]:
import os, sys, json
sys.path.insert(1, '../../shared')  # add the shared directory to the Python path
import utils

deployment_name = os.path.basename(os.path.dirname(globals()['__vsc_ipynb_file__']))
resource_group_name = f"lab-{deployment_name}" # change the name to match your naming style
resource_group_location = "eastus2"

aiservices_config = [{"name": "foundry1", "location": "eastus2"}]

models_config = [{"name": "gpt-realtime", "publisher": "OpenAI", "version": "2025-08-28", "sku": "GlobalStandard", "capacity": 10}]

apim_sku = 'Basicv2'
apim_subscriptions_config = [{"name": "subscription1", "displayName": "Subscription 1"}]

inference_api_path = 'inference'  # path to the inference API in the APIM service
inference_api_type = "websocket"  
inference_api_version = "2024-10-01-preview"
foundry_project_name = deployment_name

utils.print_ok('Notebook initialized')

<a id='1'></a>
### 1️⃣ Verify the Azure CLI and the connected Azure subscription

The following commands ensure that you have the latest version of the Azure CLI and that the Azure CLI is connected to your Azure subscription.

In [None]:
output = utils.run("az account show", "Retrieved az account", "Failed to get the current az account")

if output.success and output.json_data:
    current_user = output.json_data['user']['name']
    tenant_id = output.json_data['tenantId']
    subscription_id = output.json_data['id']

    utils.print_info(f"Current user: {current_user}")
    utils.print_info(f"Tenant ID: {tenant_id}")
    utils.print_info(f"Subscription ID: {subscription_id}")

<a id='2'></a>
### 2️⃣ Create deployment using 🦾 Bicep

This lab uses [Bicep](https://learn.microsoft.com/azure/azure-resource-manager/bicep/overview?tabs=bicep) to declarative define all the resources that will be deployed in the specified resource group. Change the parameters or the [main.bicep](main.bicep) directly to try different configurations.

`openAIModelCapacity` is set intentionally low to `6` (6k tokens per minute) to trigger the retry logic in the load balancer (transparent to the user) as well as the priority failover from priority 1 to 2.

In [None]:
# Create the resource group if doesn't exist
utils.create_resource_group(resource_group_name, resource_group_location)

# Define the Bicep parameters
bicep_parameters = {
    "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#",
    "contentVersion": "1.0.0.0",
    "parameters": {
        "apimSku": { "value": apim_sku },
        "aiServicesConfig": { "value": aiservices_config },
        "modelsConfig": { "value": models_config },
        "apimSubscriptionsConfig": { "value": apim_subscriptions_config },
        "inferenceAPIPath": { "value": inference_api_path },
        "inferenceAPIType": { "value": inference_api_type },
        "foundryProjectName": { "value": foundry_project_name }
    }
}

# Write the parameters to the params.json file
with open('params.json', 'w') as bicep_parameters_file:
    bicep_parameters_file.write(json.dumps(bicep_parameters))

# Run the deployment
output = utils.run(f"az deployment group create --name {deployment_name} --resource-group {resource_group_name} --template-file main.bicep --parameters params.json",
    f"Deployment '{deployment_name}' succeeded", f"Deployment '{deployment_name}' failed")

<a id='3'></a>
### 3️⃣ Get the deployment outputs

Retrieve the required outputs from the Bicep deployment.

In [None]:
# Obtain all of the outputs from the deployment
output = utils.run(f"az deployment group show --name {deployment_name} -g {resource_group_name}", f"Retrieved deployment: {deployment_name}", f"Failed to retrieve deployment: {deployment_name}")

if output.success and output.json_data:
    log_analytics_id = utils.get_deployment_output(output, 'logAnalyticsWorkspaceId', 'Log Analytics Id')
    apim_service_id = utils.get_deployment_output(output, 'apimServiceId', 'APIM Service Id')
    apim_resource_gateway_url = utils.get_deployment_output(output, 'apimResourceGatewayURL', 'APIM API Gateway URL')
    apim_subscriptions = json.loads(utils.get_deployment_output(output, 'apimSubscriptions').replace("\'", "\""))
    for subscription in apim_subscriptions:
        subscription_name = subscription['name']
        subscription_key = subscription['key']
        utils.print_info(f"Subscription Name: {subscription_name}")
        utils.print_info(f"Subscription Key: ****{subscription_key[-4:]}")
    api_key = apim_subscriptions[0].get("key") # default api key to the first subscription key


<a id='4'></a>
### 4 Install Python library requirements

In [None]:
%pip install -r requirements.txt

<a id='text'></a>
### 🧪 Test the Realtime API using just text

👉 Based on this [sample](https://github.com/openai/openai-python/blob/main/examples/realtime/azure_realtime.py)

In [None]:
from azure.identity.aio import DefaultAzureCredential
from openai import AsyncAzureOpenAI
import asyncio, nest_asyncio
nest_asyncio.apply()

async def main() -> None:
    credential = DefaultAzureCredential()
    client = AsyncAzureOpenAI(
            azure_endpoint=f"{apim_resource_gateway_url}/{inference_api_path}",
            api_key=api_key,
            api_version=inference_api_version)
    async with client.realtime.connect(model=models_config[0]['name']) as connection:
        await connection.session.update(session={"modalities": ["text"]})  # type: ignore
        await connection.conversation.item.create(
            item={
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": "What is the capital of Portugal?"}],
            }
        )
        await connection.response.create()
        async for event in connection:
            if event.type == "response.text.delta":
                print(event.delta, flush=True, end="")
            elif event.type == "response.text.done":
                print()
            elif event.type == "response.done":
                break
    await connection.close()

if __name__ == "__main__":
    asyncio.run(main())


<a id='fastrtc'></a>
### 🧪 Test the Realtime API using FastRTC + Gradio

⚡ FastRTC is an elegant realtime library communication library to enable you to easily and quickly build RTC application both using websockets and WebRTC.

Please ensure you have run the pip command succefully to install all required packages

In [None]:
import asyncio, base64, json, openai
import gradio as gr
import numpy as np
from pathlib import Path
from openai import AsyncAzureOpenAI
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, StreamingResponse
from fastrtc import (
    AdditionalOutputs,
    AsyncStreamHandler,
    Stream,
    wait_for_item,
    UIArgs
)
from gradio.utils import get_space

SAMPLE_RATE = 24000

AZURE_OPENAI_API_ENDPOINT = f"{apim_resource_gateway_url}/{inference_api_path}"
AZURE_OPENAI_API_KEY = api_key
AZURE_OPENAI_API_VERSION = inference_api_version
AZURE_OPENAI_DEPLOYMENT_NAME = models_config[0]['name']
SESSION_CONFIG={
    "input_audio_transcription": {
      "model": "whisper-1"
    },
    "turn_detection": {
      "threshold": 0.4,
      "silence_duration_ms": 600,
      "type": "server_vad"
    },
    "instructions": "Your name is Amy. You're a helpful agent who responds initially with a clam British accent, but also can speak in any language as the user chooses to. Always start the conversation with a cheery hello",
    "voice": "shimmer",
    "modalities": ["text", "audio"] ## required to solicit the initial welcome message
    }

def on_open(ws):
    print("Connected to server.")

def on_message(ws, message):
    data = json.loads(message)
    print("Received event:", json.dumps(data, indent=2))

class OpenAIHandler(AsyncStreamHandler):
    def __init__(self) -> None:
        super().__init__(
            expected_layout="mono",
            output_sample_rate=SAMPLE_RATE,
            output_frame_size=480,  # In this example we choose 480 samples per frame.
            input_sample_rate=SAMPLE_RATE,
        )
        self.connection = None
        self.output_queue = asyncio.Queue()

    def copy(self):
        return OpenAIHandler()

    async def welcome(self):
        await self.connection.conversation.item.create( # type: ignore
            item={
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": "what's your name?"}],
            }
        )
        await self.connection.response.create() # type: ignore

    async def start_up(self):
        """
        Establish a persistent realtime connection to the Azure OpenAI backend.
        The connection is configured for server‐side Voice Activity Detection.
        """
        self.client = openai.AsyncAzureOpenAI(
            azure_endpoint=AZURE_OPENAI_API_ENDPOINT,
            azure_deployment=AZURE_OPENAI_DEPLOYMENT_NAME,
            api_key=AZURE_OPENAI_API_KEY,
            api_version=AZURE_OPENAI_API_VERSION,
        )
        # When using Azure OpenAI realtime (beta), set the model/deployment identifier
        async with self.client.realtime.connect(
            model=AZURE_OPENAI_DEPLOYMENT_NAME  # Replace with your deployed realtime model id on Azure OpenAI.
        ) as conn:
            # Configure the session to use server-based voice activity detection (VAD)
            await conn.session.update(session=SESSION_CONFIG) # type: ignore
            self.connection = conn

            # Uncomment the following line to send a welcome message to the assistant.
            # await self.welcome()

            async for event in self.connection:
                # Handle interruptions
                if event.type == "input_audio_buffer.speech_started":
                    self.clear_queue()
                if event.type == "conversation.item.input_audio_transcription.completed":
                    # This event signals that an input audio transcription is completed.
                    await self.output_queue.put(AdditionalOutputs(event))
                if event.type == "response.audio_transcript.done":
                    # This event signals that a response audio transcription is completed.
                    await self.output_queue.put(AdditionalOutputs(event))
                if event.type == "response.audio.delta":
                    # For incremental audio output events, decode the delta.
                    await self.output_queue.put(
                        (
                            self.output_sample_rate,
                            np.frombuffer(base64.b64decode(event.delta), dtype=np.int16).reshape(1, -1),
                        ),
                    )

    async def receive(self, frame: tuple[int, np.ndarray]) -> None:
        """
        Receives an audio frame from the stream and sends it into the realtime API.
        The audio data is encoded as Base64 before appending to the connection's input.
        """
        if not self.connection:
            return
        _, array = frame
        array = array.squeeze()
        # Encode audio as Base64 string
        audio_message = base64.b64encode(array.tobytes()).decode("utf-8")
        await self.connection.input_audio_buffer.append(audio=audio_message)  # type: ignore

    async def emit(self) -> tuple[int, np.ndarray] | AdditionalOutputs | None:
        """
        Waits for and returns the next output from the output queue.
        The output may be an audio chunk or an additional output such as transcription.
        """
        return await wait_for_item(self.output_queue)

    async def shutdown(self) -> None: # type: ignore
        if self.connection:
            await self.connection.close()
            self.connection = None

def update_chatbot(chatbot: list[dict], content):
    """
    Append the completed transcription to the chatbot messages.
    """
    if content.type == "conversation.item.input_audio_transcription.completed":
        chatbot.append({"role": "user", "content": content.transcript})
    elif content.type == "response.audio_transcript.done":
        chatbot.append({"role": "assistant", "content": content.transcript})
    return chatbot


# Create the Gradio Chatbot component for displaying conversation messages.
ui_args: UIArgs = UIArgs(
    title="APIM ❤️ OpenAI - Contoso Assistant 🤖",
)
chatbot = gr.Chatbot(type="messages")
latest_message = gr.Textbox(type="text", visible=True)

# Instantiate the Stream object that uses the OpenAIHandler.
stream = Stream(
    OpenAIHandler(),
    mode="send-receive",
    modality="audio",
    additional_inputs=[chatbot],
    additional_outputs=[chatbot],
    additional_outputs_handler=update_chatbot,
    ui_args=ui_args,
)

if __name__ == "__main__":
    stream.ui.launch(server_port=7990)


<a id='kql'></a>
### 🔍 Display model usage


In [None]:
import pandas as pd

query = "model_usage"

output = utils.run(f"az monitor log-analytics query -w {log_analytics_id} --analytics-query \"{query}\"", "Retrieved log analytics query output", "Failed to retrieve log analytics query output") 
if output.success and output.json_data:
    table = output.json_data
    display(pd.DataFrame(table))


<a id='clean'></a>
### 🗑️ Clean up resources

When you're finished with the lab, you should remove all your deployed resources from Azure to avoid extra charges and keep your Azure subscription uncluttered.
Use the [clean-up-resources notebook](clean-up-resources.ipynb) for that.