# OpenAI Voice Assistant Notebook

- Run the installation cell once per environment.
- Update the `.env` file with your `OPENAI_API_KEY`.
- Each turn: run the last cell, press Enter to start speaking, wait for the response.
- Type `r` then Enter to reset conversation memory, `q` to quit the loop.
- Ensure your microphone is connected and not muted.

In [1]:
%pip install --quiet openai sounddevice simpleaudio python-dotenv numpy scipy

[33m  DEPRECATION: Building 'simpleaudio' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'simpleaudio'. Discussion can be found at https://github.com/pypa/pip/issues/6334[0m[33m
  NOTE: The current PATH contains path(s) starting with `~`, which may not be expanded by all applications.[0m[33m
[0mNote: you may need to restart the kernel to use updated packages.


In [2]:
import os
from dotenv import load_dotenv
from openai import OpenAI

load_dotenv()

SYSTEM_PROMPT = os.getenv("SYTEM_PROMPT", "You are an ai voice assistance")
API_KEY = os.getenv("OPENAI_API_KEY")

if not API_KEY:
    raise ValueError("Set OPENAI_API_KEY in your .env file before proceeding.")

client = OpenAI(api_key=API_KEY)
print("Client configured. System prompt loaded.")

Client configured. System prompt loaded.


In [3]:
from IPython.display import Javascript, display

display(Javascript("""
(async () => {
  try {
    await navigator.mediaDevices.getUserMedia({ audio: true });
    console.log('Microphone access granted.');
  } catch (err) {
    console.error('Microphone access denied:', err);
  }
})();
"""))

<IPython.core.display.Javascript object>

In [None]:
import base64
import io
import threading
import time
import wave
from typing import List, Dict, Optional

import numpy as np
import simpleaudio as sa
import sounddevice as sd

class AudioPlayer:
    def __init__(self) -> None:
        self._lock = threading.Lock()
        self._play_obj: Optional[sa.PlayObject] = None

    def play_wav(self, wav_bytes: bytes) -> None:
        with self._lock:
            if self._play_obj is not None:
                self._play_obj.stop()
                self._play_obj = None
            with wave.open(io.BytesIO(wav_bytes), 'rb') as wf:
                frames = wf.readframes(wf.getnframes())
                channels = wf.getnchannels()
                sample_width = wf.getsampwidth()
                sample_rate = wf.getframerate()
            self._play_obj = sa.play_buffer(frames, channels, sample_width, sample_rate)

    def is_playing(self) -> bool:
        with self._lock:
            return self._play_obj is not None and self._play_obj.is_playing()

    def wait_finish(self) -> None:
        """Wait for current playback to complete."""
        with self._lock:
            play_obj = self._play_obj
        if play_obj is not None:
            play_obj.wait_done()

    def stop(self) -> None:
        with self._lock:
            if self._play_obj is not None:
                self._play_obj.stop()
                self._play_obj = None

def record_until_silence(
    sample_rate: int = 16000,
    threshold: float = 0.01,
    silence_duration: float = 1.0,
    max_seconds: float = 30.0,
    chunk_size: int = 1024
) -> Optional[np.ndarray]:
    buffer: List[np.ndarray] = []
    speaking = False
    silence_start: Optional[float] = None
    start_time = time.time()
    with sd.InputStream(samplerate=sample_rate, channels=1, dtype='float32') as stream:
        while True:
            data, _ = stream.read(chunk_size)
            rms = float(np.sqrt(np.mean(np.square(data))))
            now = time.time()
            if rms > threshold:
                if not speaking:
                    print('Speech detected...')
                speaking = True
                silence_start = None
                buffer.append(data.copy())
            else:
                if speaking:
                    buffer.append(data.copy())
                    if silence_start is None:
                        silence_start = now
                    elif now - silence_start >= silence_duration:
                        break
            if now - start_time >= max_seconds:
                break
    if not buffer:
        return None
    return np.concatenate(buffer, axis=0)

def numpy_to_wav_bytes(audio: np.ndarray, sample_rate: int) -> bytes:
    audio = np.clip(audio, -1.0, 1.0)
    int_audio = (audio * 32767).astype(np.int16)
    with io.BytesIO() as output:
        with wave.open(output, 'wb') as wf:
            wf.setnchannels(1)
            wf.setsampwidth(2)
            wf.setframerate(sample_rate)
            wf.writeframes(int_audio.tobytes())
        return output.getvalue()

class VoiceAssistantSession:
    def __init__(
        self,
        client: OpenAI,
        system_prompt: str,
        sample_rate: int = 16000,
        threshold: float = 0.01,
        silence_duration: float = 1.0,
        max_seconds: float = 30.0
    ) -> None:
        self.client = client
        self.system_prompt = system_prompt
        self.sample_rate = sample_rate
        self.threshold = threshold
        self.silence_duration = silence_duration
        self.max_seconds = max_seconds
        self.history: List[Dict[str, List[Dict[str, str]]]] = []
        self.player = AudioPlayer()

    def stop_playback(self) -> None:
        self.player.stop()

    def reset_history(self) -> None:
        self.history.clear()
        print('Conversation history cleared.')

    def _build_messages(self) -> List[Dict[str, List[Dict[str, str]]]]:
        return [
            {
                'role': 'system',
                'content': [{ 'type': 'text', 'text': self.system_prompt }]
            }
        ] + self.history

    def record_user(self) -> Optional[str]:
        self.stop_playback()
        print('Listening... start speaking, stay within microphone range.')
        audio = record_until_silence(
            sample_rate=self.sample_rate,
            threshold=self.threshold,
            silence_duration=self.silence_duration,
            max_seconds=self.max_seconds
        )
        if audio is None:
            print('No speech detected. Try again.')
            return None
        wav_bytes = numpy_to_wav_bytes(audio, self.sample_rate)
        transcription = self.client.audio.transcriptions.create(
            model='whisper-1',
            file=('user.wav', wav_bytes, 'audio/wav')
        )
        user_text = transcription.text.strip()
        if not user_text:
            print('Transcription failed to capture speech.')
            return None
        print(f'You: {user_text}')
        self.history.append({
            'role': 'user',
            'content': [{ 'type': 'text', 'text': user_text }]
        })
        return user_text

    def respond(self) -> None:
        response = self.client.chat.completions.create(
            model='gpt-4o-audio-preview',
            modalities=['text', 'audio'],
            audio={'voice': 'alloy', 'format': 'wav'},
            messages=[msg for msg in self._build_messages()]
        )
        message = response.choices[0].message
        assistant_text = message.content or ''
        audio_bytes = None
        if hasattr(message, 'audio') and message.audio:
            audio_bytes = base64.b64decode(message.audio.data)
        if assistant_text:
            print(f'Assistant: {assistant_text}')
            self.history.append({
                'role': 'assistant',
                'content': [{ 'type': 'text', 'text': assistant_text }]
            })
        if audio_bytes:
            self.player.play_wav(audio_bytes)
            # Wait for audio to finish playing
            self.player.wait_finish()

    def turn(self) -> None:
        if self.record_user() is None:
            return
        self.respond()


In [5]:
assistant_session = VoiceAssistantSession(
    client=client,
    system_prompt=SYSTEM_PROMPT,
    sample_rate=16000,
    threshold=0.01,
    silence_duration=1.0,
    max_seconds=30.0
)
print('Voice assistant session ready.')

Voice assistant session ready.


In [1]:
while True:
    command = input("Press Enter to speak (q to quit, r to reset history): ")
    if command.lower().strip() == 'q':
        assistant_session.stop_playback()
        print('Session ended.')
        break
    if command.lower().strip() == 'r':
        assistant_session.reset_history()
        continue
    assistant_session.stop_playback()
    assistant_session.turn()

Press Enter to speak (q to quit, r to reset history):  q


NameError: name 'assistant_session' is not defined