In [14]:
import os
import json
import base64
import asyncio
import wave
from datetime import datetime
from typing import Optional, Callable, Dict, Any

import pyaudio
import websockets
from dotenv import load_dotenv

load_dotenv()

True

In [15]:
def list_audio_input_devices() -> None:
    """
    Print all available input devices (microphones) for user selection.
    """
    p = pyaudio.PyAudio()
    print("\nAvailable audio input devices:")
    for i in range(p.get_device_count()):
        dev = p.get_device_info_by_index(i)
        if dev["maxInputChannels"] > 0:
            print(f"{i}: {dev['name']}")
    p.terminate()


def choose_default_audio_device() -> int:
    """
    Return the index of the default audio input device, or prompt user if >1.
    """
    p = pyaudio.PyAudio()
    mic_indices = [
        i
        for i in range(p.get_device_count())
        if p.get_device_info_by_index(i)["maxInputChannels"] > 0
    ]
    p.terminate()
    if len(mic_indices) == 0:
        raise RuntimeError("No microphone devices found.")
    if len(mic_indices) == 1:
        print(f"Auto-selecting only available input device: {mic_indices[0]}")
        return mic_indices[0]
    list_audio_input_devices()
    try:
        idx = int(
            input(f"Select audio input device index [{mic_indices[0]}]: ")
            or mic_indices[0]
        )
    except Exception:
        idx = mic_indices[0]
    return idx

In [22]:
class AudioRecorder:
    """
    Async audio recorder using PyAudio.
    Allows independent recording (to memory and .wav) and streaming (for STT).
    """

    def __init__(
        self,
        rate: int,
        channels: int,
        format_: int,
        chunk: int,
        device_index: Optional[int] = None,
    ):
        self.rate = rate
        self.channels = channels
        self.format = format_
        self.chunk = chunk
        self.device_index = (
            device_index if device_index is not None else choose_default_audio_device()
        )
        self.p = pyaudio.PyAudio()
        self.stream = None
        self.frames = []
        self.audio_queue: asyncio.Queue[bytes] = asyncio.Queue()
        self._loop = asyncio.get_event_loop()
        self._running = False

    def start(self) -> None:
        """
        Start the audio stream and begin capturing to the queue.
        """

        def callback(in_data, frame_count, time_info, status):
            self.frames.append(in_data)
            self._loop.call_soon_threadsafe(self.audio_queue.put_nowait, in_data)
            return (None, pyaudio.paContinue)

        self.stream = self.p.open(
            format=self.format,
            channels=self.channels,
            rate=self.rate,
            input=True,
            input_device_index=self.device_index,
            frames_per_buffer=self.chunk,
            stream_callback=callback,
        )
        self._running = True
        self.stream.start_stream()

    def stop(self) -> None:
        """
        Stop and close the stream, release audio resources.
        """
        self._running = False
        if self.stream is not None:
            self.stream.stop_stream()
            self.stream.close()
        self.p.terminate()

    def save_wav(self, filename: str) -> None:
        """
        Save the recorded audio to a .wav file.
        """
        wf = wave.open(filename, "wb")
        wf.setnchannels(self.channels)
        wf.setsampwidth(self.p.get_sample_size(self.format))
        wf.setframerate(self.rate)
        wf.writeframes(b"".join(self.frames))
        wf.close()
        print(f"🎙️ Audio saved to {filename}")

In [23]:
class TranscriptionClient:
    """
    Handles async websocket transcription session to Azure OpenAI STT.
    Can be used independently: just supply an async generator of audio chunks.
    """

    def __init__(
        self,
        url: str,
        headers: dict,
        session_config: Dict[str, Any],
        on_delta: Optional[Callable[[str], None]] = None,
        on_transcript: Optional[Callable[[str], None]] = None,
    ):
        self.url = url
        self.headers = headers
        self.session_config = session_config
        self.ws: Optional[websockets.WebSocketClientProtocol] = None
        self._on_delta = on_delta
        self._on_transcript = on_transcript
        self._running = False
        self._send_task = None
        self._recv_task = None

    async def __aenter__(self):
        try:
            self.ws = await websockets.connect(
                self.url, additional_headers=self.headers
            )
        except TypeError:
            self.ws = await websockets.connect(self.url, extra_headers=self.headers)
        self._running = True
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self._running = False
        if self.ws:
            await self.ws.close()
        if self._send_task:
            self._send_task.cancel()
        if self._recv_task:
            self._recv_task.cancel()

    async def send_json(self, data: dict) -> None:
        if self.ws:
            await self.ws.send(json.dumps(data))

    async def send_audio_chunk(self, audio_data: bytes) -> None:
        audio_base64 = base64.b64encode(audio_data).decode("utf-8")
        await self.send_json(
            {"type": "input_audio_buffer.append", "audio": audio_base64}
        )

    async def start_session(self, rate: int, channels: int) -> None:
        session_config = {
            "type": "transcription_session.update",
            "session": self.session_config,
        }
        await self.send_json(session_config)
        await self.send_json(
            {
                "type": "audio_start",
                "data": {"encoding": "pcm", "sample_rate": rate, "channels": channels},
            }
        )

    async def receive_loop(self) -> None:
        async for message in self.ws:
            try:
                data = json.loads(message)
                event_type = data.get("type", "")
                if event_type == "conversation.item.input_audio_transcription.delta":
                    delta = data.get("delta", "")
                    if delta and self._on_delta:
                        self._on_delta(delta)
                elif (
                    event_type
                    == "conversation.item.input_audio_transcription.completed"
                ):
                    transcript = data.get("transcript", "")
                    if transcript and self._on_transcript:
                        self._on_transcript(transcript)
                elif event_type == "conversation.item.created":
                    transcript = data.get("item", "")
                    if (
                        isinstance(transcript, dict)
                        and "content" in transcript
                        and transcript["content"]
                    ):
                        t = transcript["content"][0].get("transcript")
                        if t and self._on_transcript:
                            self._on_transcript(t)
                    elif transcript and self._on_transcript:
                        self._on_transcript(str(transcript))
            except Exception as e:
                print("❌ Error parsing message:", e)

    async def run(self, audio_chunk_iter: asyncio.Queue, rate: int, channels: int):
        """
        Main loop: configure session, send audio from queue, receive results.
        """
        await self.start_session(rate, channels)
        self._send_task = asyncio.create_task(self._send_audio_loop(audio_chunk_iter))
        self._recv_task = asyncio.create_task(self.receive_loop())
        done, pending = await asyncio.wait(
            [self._send_task, self._recv_task], return_when=asyncio.FIRST_COMPLETED
        )
        for task in pending:
            task.cancel()

    async def _send_audio_loop(self, audio_queue: asyncio.Queue):
        while self._running:
            try:
                audio_data = await audio_queue.get()
                if audio_data is None:
                    break
                await self.send_audio_chunk(audio_data)
            except asyncio.CancelledError:
                break

In [24]:
class AudioTranscriber:
    """
    High-level orchestrator for audio recording and real-time transcription.
    Use as: record only, transcribe only, or chain both (record+transcribe).
    """

    def __init__(
        self,
        url: str,
        headers: dict,
        rate: int,
        channels: int,
        format_: int,
        chunk: int,
        device_index: Optional[int] = None,
    ):
        self.url = url
        self.headers = headers
        self.rate = rate
        self.channels = channels
        self.format = format_
        self.chunk = chunk
        self.device_index = device_index

    async def record(
        self, duration: Optional[float] = None, output_file: Optional[str] = None
    ) -> AudioRecorder:
        """
        Record audio from mic. Returns AudioRecorder.
        Optionally, specify duration (seconds). Use output_file to auto-save.
        """
        recorder = AudioRecorder(
            rate=self.rate,
            channels=self.channels,
            format_=self.format,
            chunk=self.chunk,
            device_index=self.device_index,
        )
        recorder.start()
        print(
            f"Recording{' for ' + str(duration) + ' seconds' if duration else ' (Ctrl+C to stop)'}..."
        )
        try:
            if duration:
                await asyncio.sleep(duration)
            else:
                while True:
                    await asyncio.sleep(0.5)
        except (KeyboardInterrupt, asyncio.CancelledError):
            pass
        finally:
            recorder.stop()
            if output_file:
                recorder.save_wav(output_file)
        return recorder

    async def transcribe(
        self,
        audio_queue: Optional[asyncio.Queue] = None,
        model: str = "gpt-4o-transcribe",
        prompt: Optional[str] = "Respond in English.",
        language: Optional[str] = None,
        noise_reduction: str = "near_field",
        vad_type: str = "server_vad",
        vad_config: Optional[dict] = None,
        on_delta: Optional[Callable[[str], None]] = None,
        on_transcript: Optional[Callable[[str], None]] = None,
    ):
        """
        Run a transcription session with full model/config control.
        If audio_queue is None, uses a live AudioRecorder.
        """
        if audio_queue is None:
            recorder = AudioRecorder(
                rate=self.rate,
                channels=self.channels,
                format_=self.format,
                chunk=self.chunk,
                device_index=self.device_index,
            )
            recorder.start()
            audio_queue = recorder.audio_queue
        else:
            recorder = None

        session_config = {
            "input_audio_format": "pcm16",
            "input_audio_transcription": {
                "model": model,
                "prompt": prompt,
            },
            "input_audio_noise_reduction": {"type": noise_reduction},
            "turn_detection": {"type": vad_type} if vad_type else None,
        }
        if vad_config:
            session_config["turn_detection"].update(vad_config)
        if language:
            session_config["input_audio_transcription"]["language"] = language

        async with TranscriptionClient(
            self.url, self.headers, session_config, on_delta, on_transcript
        ) as client:
            try:
                await client.run(audio_queue, self.rate, self.channels)
            except asyncio.CancelledError:
                print("Transcription cancelled.")
            finally:
                if recorder:
                    recorder.stop()
                    filename = f"microphone_capture_{datetime.now():%Y%m%d_%H%M%S}.wav"
                    recorder.save_wav(filename)

In [25]:
async def main():
    load_dotenv()
    OPENAI_API_KEY = os.environ.get("AZURE_OPENAI_STT_TTS_KEY")
    AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_STT_TTS_ENDPOINT")
    if not OPENAI_API_KEY or not OPENAI_API_KEY:
        raise RuntimeError("❌ API key or endpoint missing in environment.")

    url = f"{AZURE_OPENAI_ENDPOINT.replace('https', 'wss')}/openai/realtime?api-version=2025-04-01-preview&intent=transcription"
    headers = {"api-key": OPENAI_API_KEY}
    RATE = 24000
    CHANNELS = 1
    FORMAT = pyaudio.paInt16
    CHUNK = 1024

    device_index = choose_default_audio_device()

    transcriber = AudioTranscriber(
        url=url,
        headers=headers,
        rate=RATE,
        channels=CHANNELS,
        format_=FORMAT,
        chunk=CHUNK,
        device_index=device_index,
    )

    def print_delta(delta: str):
        print(delta, end=" ", flush=True)

    def print_transcript(transcript: str):
        print(f"\n✅ Transcript: {transcript}")

    print(">>> Starting real-time transcription session. Ctrl+C to stop.")
    try:
        await transcriber.transcribe(
            model="gpt-4o-transcribe",
            prompt="Respond in English. This is a medical environment.",
            noise_reduction="near_field",
            vad_type="server_vad",
            vad_config={
                "threshold": 0.5,
                "prefix_padding_ms": 300,
                "silence_duration_ms": 200,
            },
            on_delta=print_delta,
            on_transcript=print_transcript,
        )
    except (KeyboardInterrupt, asyncio.CancelledError):
        print("\n🛑 Interrupted by user. Exiting...")

In [26]:
await main()


Available audio input devices:
0: Microsoft Sound Mapper - Input
1: Surface Stereo Microphones (Sur
2: Echo Cancelling Speakerphone (M
3: Microphone (Lumina Camera - Raw
8: Primary Sound Capture Driver
9: Surface Stereo Microphones (Surface High Definition Audio)
10: Echo Cancelling Speakerphone (Microsoft Audio Dock)
11: Microphone (Lumina Camera - Raw)
19: Echo Cancelling Speakerphone (Microsoft Audio Dock)
20: Microphone (Lumina Camera - Raw)
21: Surface Stereo Microphones (Surface High Definition Audio)
24: Headset (@System32\drivers\bthhfenum.sys,#2;%1 Hands-Free%0
;(Shiva’s AirPods Pro #2))
26: Microphone (Dell USB Audio)
28: Headset (@System32\drivers\bthhfenum.sys,#2;%1 Hands-Free%0
;(Shiva’s AirPods Pro #2 - Find My))
31: PC Speaker (Realtek HD Audio 2nd output with SST)
34: PC Speaker (Realtek HD Audio output with SST)
35: Microphone Array (Realtek HD Audio Mic input)
36: Headset Microphone (Headset Microphone)
38: Headset (@System32\drivers\bthhfenum.sys,#2;%1 Hands-Free%0


## ACS

In [None]:
import os
import asyncio
import json
import logging
from typing import Any, Dict, Optional
import websockets
from base64 import b64decode, b64encode
from dotenv import load_dotenv

# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("acs-openai-relay")


# Message transformation (see your provided functions)
def transform_acs_to_openai_format(
    msg_data: dict, model: str = "gpt-4o-transcribe"
) -> Optional[dict]:
    """
    Map ACS AudioData to OpenAI input_audio_buffer.append.
    If AudioMetadata, send session config to OpenAI.
    """
    if msg_data.get("kind") == "AudioMetadata":
        return {
            "type": "transcription_session.update",
            "session": {
                "input_audio_format": "pcm16",
                "input_audio_transcription": {
                    "model": model,
                    "prompt": "Respond in English.",
                },
                "input_audio_noise_reduction": {"type": "near_field"},
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.7,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": 500,
                },
            },
        }
    elif msg_data.get("kind") == "AudioData":
        return {
            "type": "input_audio_buffer.append",
            "audio": msg_data["audioData"]["data"],
        }
    return None


def transform_openai_to_acs_format(msg_data: dict) -> Optional[dict]:
    """
    Map OpenAI transcript/completion to ACS format for playback (e.g., TTS or text event).
    """
    # For actual audio response, you'd encode the TTS audio as base64 and send as "AudioData"
    if msg_data.get("type") == "conversation.item.input_audio_transcription.delta":
        # Live text: send as event or queue for TTS
        return {"kind": "TranscriptionDelta", "text": msg_data.get("delta")}
    elif (
        msg_data.get("type") == "conversation.item.input_audio_transcription.completed"
    ):
        # Final transcript: send to ACS as event or trigger TTS
        return {"kind": "TranscriptionCompleted", "text": msg_data.get("transcript")}
    # Add more types as needed
    return None


async def relay_acs_to_openai(acs_ws, openai_ws, model: str):
    """
    Relay messages from ACS to OpenAI (audio).
    """
    while True:
        try:
            msg = await acs_ws.receive_text()
            data = json.loads(msg)
            mapped = transform_acs_to_openai_format(data, model=model)
            if mapped:
                await openai_ws.send(json.dumps(mapped))
        except Exception as e:
            logger.error(f"ACS → OpenAI relay error: {e}")
            break


async def relay_openai_to_acs(openai_ws, acs_ws):
    """
    Relay messages from OpenAI to ACS (text, function calls, events).
    """
    while True:
        try:
            msg = await openai_ws.recv()
            data = json.loads(msg)
            mapped = transform_openai_to_acs_format(data)
            if mapped:
                await acs_ws.send_text(json.dumps(mapped))
        except Exception as e:
            logger.error(f"OpenAI → ACS relay error: {e}")
            break


async def main_acs_openai_relay(
    acs_ws_url: str,
    openai_ws_url: str,
    openai_api_key: str,
    model: str = "gpt-4o-transcribe",
):
    """
    Opens two WebSockets and relays audio and transcription between ACS and OpenAI Realtime STT.
    """
    # Connect to ACS as a client WebSocket (simulate incoming call, or use FastAPI's ws param in handler)
    async with websockets.connect(acs_ws_url) as acs_ws:
        logger.info(f"Connected to ACS WebSocket at {acs_ws_url}")

        # Connect to Azure OpenAI Realtime STT
        openai_headers = {"api-key": openai_api_key}
        async with websockets.connect(
            openai_ws_url, extra_headers=openai_headers
        ) as openai_ws:
            logger.info(f"Connected to OpenAI Realtime WebSocket at {openai_ws_url}")

            # Run both relays in parallel
            relay_acs = asyncio.create_task(
                relay_acs_to_openai(acs_ws, openai_ws, model)
            )
            relay_openai = asyncio.create_task(relay_openai_to_acs(openai_ws, acs_ws))
            done, pending = await asyncio.wait(
                [relay_acs, relay_openai],
                return_when=asyncio.FIRST_COMPLETED,
            )
            for task in pending:
                task.cancel()
            logger.info("Relay session closed.")


# ----------- ENV & EXAMPLE USAGE -----------
if __name__ == "__main__":
    load_dotenv()
    # Normally ACS WS comes from a FastAPI endpoint, but for demo:
    ACS_WS_URL = os.getenv(
        "ACS_TEST_WEBSOCKET_URL"
    )  # ws://localhost:9000/acs-test (simulate)
    AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_STT_TTS_ENDPOINT")
    OPENAI_API_KEY = os.getenv("AZURE_OPENAI_STT_TTS_KEY")

    model = "gpt-4o-transcribe"  # or "gpt-4o-mini-transcribe"

    # Azure OpenAI real-time WebSocket URL
    openai_ws_url = f"{AZURE_OPENAI_ENDPOINT.replace('https', 'wss')}/openai/realtime?api-version=2025-04-01-preview&intent=transcription"

    # You would use this relay as part of your /realtime-acs FastAPI websocket handler,
    # passing the FastAPI websocket as `acs_ws` and launching the relay to OpenAI.
    # For demo, both URLs should be testable WebSocket echo endpoints.

    if not ACS_WS_URL or not OPENAI_API_KEY or not AZURE_OPENAI_ENDPOINT:
        print(
            "Set ACS_TEST_WEBSOCKET_URL, AZURE_OPENAI_STT_TTS_ENDPOINT, and AZURE_OPENAI_STT_TTS_KEY in your .env file!"
        )
    else:
        asyncio.run(
            main_acs_openai_relay(
                acs_ws_url=ACS_WS_URL,
                openai_ws_url=openai_ws_url,
                openai_api_key=OPENAI_API_KEY,
                model=model,
            )
        )

In [None]:
# If not already installed, install these
!pip install azure-communication-callautomation python-dotenv websockets aiohttp openai

In [1]:
import os
import asyncio
import json
import logging
from dotenv import load_dotenv
import websockets
from base64 import b64decode, b64encode
from azure.communication.callautomation import (
    CallAutomationClient,
    CallInvite,
    PhoneNumberIdentifier,
    MediaStreamingOptions,
    MediaStreamingTransportType,
    MediaStreamingContentType,
    MediaStreamingAudioChannelType,
    AudioFormat,
)

## Load Environment Variables

Put your secrets in a .env file in the notebook’s directory:

In [16]:
load_dotenv()
ACS_CONNECTION_STRING = os.getenv("ACS_CONNECTION_STRING")
ACS_SOURCE_PHONE_NUMBER = os.getenv("ACS_SOURCE_PHONE_NUMBER")
ACS_TARGET_PHONE_NUMBER = os.getenv("ACS_TARGET_PHONE_NUMBER")
AZURE_OPENAI_STT_TTS_KEY = os.getenv("AZURE_OPENAI_STT_TTS_KEY")
AZURE_OPENAI_STT_TTS_ENDPOINT = os.getenv("AZURE_OPENAI_STT_TTS_ENDPOINT")
BASE_URL = os.getenv("BASE_URL")

In [4]:
import asyncio
import websockets
import json
from base64 import b64decode
import wave

# Store all audio for .wav
all_pcm_bytes = []


async def acs_ws_handler(websocket, path):
    print(f"ACS: WebSocket connection accepted from {websocket.remote_address}")

    # Connect to Azure OpenAI Realtime STT
    openai_ws_url = f"{AZURE_OPENAI_STT_TTS_ENDPOINT.replace('https', 'wss')}/openai/realtime?api-version=2025-04-01-preview&intent=transcription"
    openai_headers = {"api-key": AZURE_OPENAI_STT_TTS_KEY}
    async with websockets.connect(
        openai_ws_url, extra_headers=openai_headers
    ) as openai_ws:
        print("Connected to Azure OpenAI Realtime STT API")

        # Send session config
        session_config = {
            "type": "transcription_session.update",
            "session": {
                "input_audio_format": "pcm16",
                "input_audio_transcription": {
                    "model": "gpt-4o-transcribe",
                    "prompt": "Respond in English.",
                },
                "input_audio_noise_reduction": {"type": "near_field"},
                "turn_detection": {"type": "server_vad"},
            },
        }
        await openai_ws.send(json.dumps(session_config))

        # Send audio_start
        await openai_ws.send(
            json.dumps(
                {
                    "type": "audio_start",
                    "data": {"encoding": "pcm", "sample_rate": 16000, "channels": 1},
                }
            )
        )

        # Start relays
        relay1 = asyncio.create_task(acs_to_openai_relay(websocket, openai_ws))
        relay2 = asyncio.create_task(openai_to_transcript_relay(openai_ws))
        await asyncio.gather(relay1, relay2)


async def acs_to_openai_relay(acs_ws, openai_ws):
    while True:
        try:
            msg = await acs_ws.recv()
            data = json.loads(msg)
            if data.get("kind") == "AudioData":
                b64_audio = data["audioData"]["data"]
                pcm_bytes = b64decode(b64_audio)
                all_pcm_bytes.append(pcm_bytes)
                await openai_ws.send(
                    json.dumps(
                        {"type": "input_audio_buffer.append", "audio": b64_audio}
                    )
                )
        except Exception as e:
            print(f"ACS->OpenAI relay error: {e}")
            break


async def openai_to_transcript_relay(openai_ws):
    while True:
        try:
            msg = await openai_ws.recv()
            data = json.loads(msg)
            if data.get("type") == "conversation.item.input_audio_transcription.delta":
                delta = data.get("delta", "")
                if delta:
                    print("Δ", delta, end=" ", flush=True)
            if (
                data.get("type")
                == "conversation.item.input_audio_transcription.completed"
            ):
                transcript = data.get("transcript", "")
                print("\n✅ Final transcript:", transcript)
        except Exception as e:
            print(f"OpenAI transcript relay error: {e}")
            break

In [5]:
BASE_URL

'https://xkrcv9t3-8010.use.devtunnels.ms'

In [6]:
port = 8765
ACS_WEBSOCKET_URL = f"wss://xkrcv9t3-8010.use.devtunnels.ms:{port}/"

In [7]:
import nest_asyncio

nest_asyncio.apply()  # So asyncio works in Jupyter

port = 8765  # Pick an open port

# Show the URL you need to put into your ACS streaming configuration
print(f"Your ACS WebSocket server is ws://YOUR_PUBLIC_HOSTNAME:{port}/")

start_server = websockets.serve(acs_ws_handler, "0.0.0.0", port)
await start_server
print("WebSocket server running. Ready for ACS audio!")

Your ACS WebSocket server is ws://YOUR_PUBLIC_HOSTNAME:8765/
WebSocket server running. Ready for ACS audio!


In [11]:
ACS_WEBSOCKET_URL

'wss://xkrcv9t3-8010.use.devtunnels.ms:8765/'

In [12]:
from azure.communication.callautomation import (
    CallAutomationClient,
    PhoneNumberIdentifier,
    MediaStreamingOptions,
    MediaStreamingTransportType,
    MediaStreamingContentType,
    MediaStreamingAudioChannelType,
    AudioFormat,
)


def initiate_acs_call(target_number: str):
    """
    Place a phone call to `target_number` via ACS,
    streaming audio to the provided WebSocket URL.
    """
    client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING)
    source = PhoneNumberIdentifier(ACS_SOURCE_PHONE_NUMBER)
    target = PhoneNumberIdentifier(target_number)
    media_streaming = MediaStreamingOptions(
        transport_url=ACS_WEBSOCKET_URL,
        transport_type=MediaStreamingTransportType.WEBSOCKET,
        content_type=MediaStreamingContentType.AUDIO,
        audio_channel_type=MediaStreamingAudioChannelType.UNMIXED,
        start_media_streaming=True,
        enable_bidirectional=False,
        audio_format=AudioFormat.PCM16_K_MONO,
    )
    print(f"Calling {target_number} from {ACS_SOURCE_PHONE_NUMBER}...")
    response = client.create_call(
        target_participant=target,
        callback_url="https://yourapp/callback",  # Set to your callback if needed
        media_streaming=media_streaming,
        source_caller_id_number=source,
    )
    print(f"Call initiated. Call Connection ID: {response.call_connection_id}")
    return response.call_connection_id

In [13]:
# Replace with your real phone number (E.164 format, e.g., +12225551234)
target_phone = os.getenv("ACS_TARGET_PHONE_NUMBER")
call_id = initiate_acs_call(target_phone)
print(f"Call is being placed. Connection ID: {call_id}")

Calling +18165019907 from +18332397320...
Call initiated. Call Connection ID: 27005880-557a-4133-858e-9713ca702a55
Call is being placed. Connection ID: 27005880-557a-4133-858e-9713ca702a55


In [14]:
output_filename = "acs_recording.wav"
sample_rate = 16000
channels = 1

with wave.open(output_filename, "wb") as wf:
    wf.setnchannels(channels)
    wf.setsampwidth(2)
    wf.setframerate(sample_rate)
    wf.writeframes(b"".join(all_pcm_bytes))

print(f"Saved ACS call audio to {output_filename}")

Saved ACS call audio to acs_recording.wav


In [10]:
from azure.communication.callautomation import (
    CallAutomationClient,
    PhoneNumberIdentifier,
    MediaStreamingOptions,
    MediaStreamingTransportType,
    MediaStreamingContentType,
    MediaStreamingAudioChannelType,
    AudioFormat,
)

acs_ws_path = "realtime-acs"
acs_ws_url = f"{BASE_URL.replace('https://', 'wss://').rstrip('/')}/{acs_ws_path}"

media_streaming_configuration = MediaStreamingOptions(
    transport_url=acs_ws_url,
    transport_type=MediaStreamingTransportType.WEBSOCKET,
    content_type=MediaStreamingContentType.AUDIO,
    audio_channel_type=MediaStreamingAudioChannelType.UNMIXED,
    start_media_streaming=True,
    enable_bidirectional=True,
    audio_format=AudioFormat.PCM16_K_MONO,
)

client = CallAutomationClient.from_connection_string(ACS_CONNECTION_STRING)
target = PhoneNumberIdentifier(ACS_TARGET_PHONE_NUMBER)
source = PhoneNumberIdentifier(ACS_SOURCE_PHONE_NUMBER)

response = client.create_call(
    target_participant=target,
    callback_url=f"{BASE_URL}/api/acs/callback",
    media_streaming=media_streaming_configuration,
    source_caller_id_number=source,
)

call_connection_id = response.call_connection_id
print("📞 Call initiated! Call Connection ID:", call_connection_id)

📞 Call initiated! Call Connection ID: 23005880-323c-448b-b1dc-bdbf76720c6b


In [6]:
openai_ws_url = f"{AZURE_OPENAI_STT_TTS_ENDPOINT.replace('https', 'wss')}/openai/realtime?api-version=2025-04-01-preview&intent=transcription"
openai_headers = {"api-key": AZURE_OPENAI_STT_TTS_KEY}

In [7]:
def transform_acs_to_openai_format(
    msg_data: dict, model: str = "gpt-4o-transcribe"
) -> dict:
    if msg_data.get("kind") == "AudioMetadata":
        return {
            "type": "transcription_session.update",
            "session": {
                "input_audio_format": "pcm16",
                "input_audio_transcription": {
                    "model": model,
                    "prompt": "Respond in English.",
                },
                "input_audio_noise_reduction": {"type": "near_field"},
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.7,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": 500,
                },
            },
        }
    elif msg_data.get("kind") == "AudioData":
        return {
            "type": "input_audio_buffer.append",
            "audio": msg_data["audioData"]["data"],
        }
    return None


def transform_openai_to_acs_format(msg_data: dict) -> dict:
    if msg_data.get("type") == "conversation.item.input_audio_transcription.delta":
        return {"kind": "TranscriptionDelta", "text": msg_data.get("delta")}
    elif (
        msg_data.get("type") == "conversation.item.input_audio_transcription.completed"
    ):
        return {"kind": "TranscriptionCompleted", "text": msg_data.get("transcript")}
    return None

In [8]:
async def relay_acs_to_openai(acs_ws, openai_ws, model: str):
    while True:
        try:
            msg = await acs_ws.receive_text()
            data = json.loads(msg)
            mapped = transform_acs_to_openai_format(data, model=model)
            if mapped:
                await openai_ws.send(json.dumps(mapped))
        except Exception as e:
            print("ACS → OpenAI relay error:", e)
            break


async def relay_openai_to_acs(openai_ws, acs_ws):
    while True:
        try:
            msg = await openai_ws.recv()
            data = json.loads(msg)
            mapped = transform_openai_to_acs_format(data)
            if mapped:
                await acs_ws.send_text(json.dumps(mapped))
        except Exception as e:
            print("OpenAI → ACS relay error:", e)
            break

In [9]:
async def run_relay(
    acs_ws_url, openai_ws_url, openai_headers, model="gpt-4o-transcribe"
):
    async with websockets.connect(acs_ws_url) as acs_ws, websockets.connect(
        openai_ws_url, extra_headers=openai_headers
    ) as openai_ws:
        print(
            "✅ Both WebSocket connections established. Relaying audio in real-time..."
        )
        t1 = asyncio.create_task(relay_acs_to_openai(acs_ws, openai_ws, model))
        t2 = asyncio.create_task(relay_openai_to_acs(openai_ws, acs_ws))
        await asyncio.gather(t1, t2)


await run_relay(
    acs_ws_url=acs_ws_url,
    openai_ws_url=openai_ws_url,
    openai_headers=openai_headers,
    model="gpt-4o-transcribe",  # or "gpt-4o-mini-transcribe"
)

InvalidStatus: server rejected WebSocket connection: HTTP 502

In [None]:
!pip install azure-cognitiveservices-speech

In [None]:
import azure.cognitiveservices.speech as speechsdk


def synthesize_text_to_pcm_bytes(
    text: str,
    subscription_key: str,
    region: str,
    voice: str = "en-US-JennyNeural",
    sample_rate: int = 16000,
) -> bytes:
    """Synthesize text to PCM bytes using Azure Speech SDK."""
    speech_config = speechsdk.SpeechConfig(subscription=subscription_key, region=region)
    speech_config.speech_synthesis_voice_name = voice
    # PCM, 16-bit, 16 kHz, mono
    speech_config.set_speech_synthesis_output_format(
        speechsdk.SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
    )

    audio_stream = speechsdk.audio.PushAudioOutputStream()
    audio_config = speechsdk.audio.AudioOutputConfig(stream=audio_stream)
    synthesizer = speechsdk.SpeechSynthesizer(speech_config, audio_config)

    # Synthesize, push PCM to the stream, then read it all back
    result = synthesizer.speak_text_async(text).get()
    if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
        pcm_bytes = result.audio_data
        return pcm_bytes
    else:
        raise RuntimeError(f"TTS failed: {result.reason}")

In [None]:
import os

TTS_SUBSCRIPTION_KEY = (
    AZURE_OPENAI_STT_TTS_KEY  # Usually the same as your OpenAI key for Speech resource
)
TTS_REGION = os.getenv("AZURE_OPENAI_REGION") or "eastus"  # Change as appropriate


async def relay_openai_to_acs_with_tts(openai_ws, acs_ws):
    while True:
        try:
            msg = await openai_ws.recv()
            data = json.loads(msg)
            mapped = transform_openai_to_acs_format(data)
            if mapped:
                # Send transcript as text to ACS (could trigger TTS client-side)
                await acs_ws.send_text(json.dumps(mapped))

                # If it's a final transcript, synthesize and send audio
                if mapped["kind"] == "TranscriptionCompleted" and mapped.get("text"):
                    text = mapped["text"]
                    print(f"🔊 TTS Synthesizing: {text}")
                    pcm_bytes = synthesize_text_to_pcm_bytes(
                        text=text,
                        subscription_key=TTS_SUBSCRIPTION_KEY,
                        region=TTS_REGION,
                        voice="en-US-JennyNeural",  # or your preferred voice
                        sample_rate=16000,
                    )
                    # Split PCM into frames (20ms @ 16kHz mono = 640 bytes)
                    for i in range(0, len(pcm_bytes), 640):
                        frame = pcm_bytes[i : i + 640]
                        # pad last frame if needed
                        if len(frame) < 640:
                            frame += b"\x00" * (640 - len(frame))
                        b64_audio = b64encode(frame).decode("ascii")
                        audio_payload = {
                            "kind": "AudioData",
                            "audioData": {"data": b64_audio},
                            "stopAudio": None,
                        }
                        await acs_ws.send_text(json.dumps(audio_payload))
                        await asyncio.sleep(0.02)  # Simulate real-time audio playback

        except Exception as e:
            print("OpenAI → ACS relay error:", e)
            break

In [None]:
async def run_relay_with_tts(
    acs_ws_url, openai_ws_url, openai_headers, model="gpt-4o-transcribe"
):
    async with websockets.connect(acs_ws_url) as acs_ws, websockets.connect(
        openai_ws_url, extra_headers=openai_headers
    ) as openai_ws:
        print(
            "✅ Both WebSocket connections established. Relaying audio & TTS in real-time..."
        )
        t1 = asyncio.create_task(relay_acs_to_openai(acs_ws, openai_ws, model))
        t2 = asyncio.create_task(relay_openai_to_acs_with_tts(openai_ws, acs_ws))
        await asyncio.gather(t1, t2)


# And launch:
await run_relay_with_tts(
    acs_ws_url=acs_ws_url,
    openai_ws_url=openai_ws_url,
    openai_headers=openai_headers,
    model="gpt-4o-transcribe",
)