# How to use OpenAI Real Time API with websockets

## Architecture

1. Thread safe Audio Queue
1. Audio recording thread --> Saves to a thread safe Audio Queue
2. Asyncronously run two tasks
    1. Send messages to websocket: Send Audio data from Audio queue to websocket connection
    2. Receive messages from websocket: Receive Transcription events from websocket connection

In [None]:
import pyaudio
import queue
import threading
from dotenv import load_dotenv
import websockets
import os
import json
import asyncio
import base64

load_dotenv()

azure_api_key=os.getenv("AZURE_OPENAI_GPT4O_API_KEY")
azure_endpoint=os.getenv("AZURE_OPENAI_GPT4O_ENDPOINT")
azure_deployment=os.getenv("AZURE_OPENAI_GPT4O_DEPLOYMENT_ID")

## Audio Capture

In [None]:
# Audio Queue
AUDIO_QUEUE = queue.Queue()

In [None]:
def audio_capture(stop_event):
    """Capture audio from microphone and add to queue"""
    p = pyaudio.PyAudio()
    stream = p.open(
        format=pyaudio.paInt16,  # 16-bit PCM (pcm16)
        channels=1,              # Mono audio
        rate=24000,              # 24kHz as recommended by OpenAI
        input=True,
        frames_per_buffer=1024,  # Number of frames per buffer
    )

    print("🎙️ Recording started...")

    try:
        while not stop_event.is_set():
            data = stream.read(num_frames=1024, exception_on_overflow=False)
            if stop_event.is_set():
                break
            AUDIO_QUEUE.put(data)
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()
        print("🎙️ Recording stopped")

## Config to start Transcription

In [None]:
headers = {"api-key": azure_api_key}
ws_url = f"wss://{azure_endpoint}/openai/realtime?intent=transcription&deployment={azure_deployment}&api-version=2024-10-01-preview"

config = {
    "type": "transcription_session.update",
    "session": {
        "input_audio_format": "pcm16",
        "input_audio_transcription": {"model": "gpt-4o-transcribe"},
        "turn_detection": {
            "type": "server_vad",
            "threshold": 0.5,
            "prefix_padding_ms": 300,
            "silence_duration_ms": 500,
        },
        "input_audio_noise_reduction": {
            "type": "near_field"
        } # Use no noise reduction for now
    },
}


## Send Audio and Receive Message tasks

In [None]:
async def send_audio(websocket, stop_event):
    """Send audio data to the WebSocket server"""
    try:
        while not stop_event.is_set():
            if not AUDIO_QUEUE.empty():
                audio_data = AUDIO_QUEUE.get()
                
                # Encode audio data as base64
                encoded_data = base64.b64encode(audio_data).decode("utf-8")

                # Create audio buffer message
                message = {
                    "type": "input_audio_buffer.append",
                    "audio": encoded_data,
                }
                await websocket.send(json.dumps(message))
            await asyncio.sleep(0.01)  # Small delay to prevent busy waiting
    except websockets.ConnectionClosed:
        print("send_audio: WebSocket connection closed")
    except asyncio.CancelledError as e:
        print(f"send_audio: Task cancelled")

In [None]:
async def receive_messages(websocket, stop_event):
    """Receive messages from the WebSocket server"""
    try:
        while not stop_event.is_set():
            try:
                message = await websocket.recv()
                data = json.loads(message)
                if "type" in data and data["type"] == "input_audio_buffer.speech_started":
                    print("🎤 Speech Detected")
                elif "type" in data and data["type"] == "input_audio_buffer.speech_stopped":
                    print("🔇Speech Stopped")
                elif "type" in data and data["type"] == "conversation.item.input_audio_transcription.completed":
                    # Transcription utterance completed
                    transcript_raw = data.get("transcript", "")
                    transcript_json = json.loads(transcript_raw)
                    transcript = transcript_json.get("text", "")
                    print(f'\n📝 Azure Completed Transcript: "{transcript}"', flush=True)
                else:
                    pass
                    # Implement other message types as needed
            except websockets.ConnectionClosed:
                print("Connection closed")
                break
    except asyncio.CancelledError as e:
        print(f"receive_messages: Task cancelled")
    except websockets.ConnectionClosed as e:
        print(f"receive_messages: WebSocket connection closed")

## Start transcription

In [None]:
# Clear audio queue
while not AUDIO_QUEUE.empty():
    AUDIO_QUEUE.get()
    
    
stop_event = threading.Event()
# This will run the audio capture in a separate thread
audio_thread = threading.Thread(target=audio_capture, args=(stop_event,))
audio_thread.daemon = True
audio_thread.start()
    
async with websockets.connect(
    ws_url, additional_headers=headers
) as websocket:
    try:
        print("🔗 WebSocket connection established")
        print("Speak into the microphone...")

        # Setup the transcription session
        await websocket.send(json.dumps(config))
        
        # Create tasks for sending audio and receiving messages
        send_task = asyncio.create_task(send_audio(websocket, stop_event))
        receive_task = asyncio.create_task(receive_messages(websocket, stop_event))
        
        # Wait until any one the task finishes
        try:
            done, pending = await asyncio.wait(
                {send_task, receive_task}, return_when=asyncio.FIRST_COMPLETED
            )
        except asyncio.CancelledError:
            print("🛑 Stopping...")
        
    finally:
        if not stop_event.is_set():
            stop_event.set()
        if audio_thread.is_alive():
            audio_thread.join(timeout=1)
        
