---
output-file: transcription.html
title: Transcription
---

In [None]:
#| default_exp live

## Live Transcription

This module implements a real-time speech-to-text system that: <br>
- Listens to microphone input continuously <br>
- Uses **Silero VAD** (Voice Activity Detection) to detect when you are speaking <br>
- Automatically segments audio into utterances based on natural pauses <br>
- Transcribes each utterance using **Faster-Whisper** (optimized Whisper implementation) <br>
- Streams transcribed text chunks as they become available <br>

The system is optimized for interactive applications that need responsive, chunked speech transcription with minimal latency.

### Architecture Overview

The implementation uses a multi-threaded, asynchronous architecture:

1. **PyAudio Thread**: Captures raw audio from the microphone <br>
2. **AsyncIO Queue**: Thread-safe bridge between audio callback and async processing <br>
3. **Processing Loop**: Analyzes audio with VAD and triggers transcription <br>
4. **Worker Threads**: Runs Whisper transcription without blocking the main loop <br>

This design ensures audio capture never blocks or drops frames while maintaining responsive transcription.

## Imports and Device Selection

The module imports only essential dependencies: <br>
- **numpy**: Audio data manipulation and buffer management <br>
- **torch**: Required for Silero VAD model <br>
- **pyaudio**: Cross-platform audio I/O <br>
- **faster_whisper**: Optimized Whisper implementation with CTranslate2 backend <br>
- **asyncio**: Non-blocking audio processing <br>
- **logging**: Debug and status output <br>

### Device Selection Strategy

The `get_device()` helper automatically selects the best available compute device:

1. **CPU (forced)**: If `force_cpu=True` <br>
2. **CUDA**: NVIDIA GPU acceleration (if available) <br>
3. **MPS**: Apple Silicon Metal Performance Shaders (if available) <br>
4. **CPU (fallback)**: Default fallback <br>

For Whisper models, device selection impacts speed significantly: <br>
- CPU: Slower but works everywhere <br>
- CUDA: 5-10x faster on compatible NVIDIA GPUs <br>
- MPS: 3-5x faster on M1/M2/M3 Macs <br>

In [None]:
#| export
#| include: false
import logging
import asyncio
from typing import Optional, Callable
from queue import Queue

import numpy as np
import pyaudio
import torch
from faster_whisper import WhisperModel

def get_device(force_cpu: bool = False) -> str:
    """Pick best available device."""
    if force_cpu:
        return "cpu"
    if torch.cuda.is_available():
        return "cuda"
    if torch.backends.mps.is_available():
        try:
            torch.mps.empty_cache()
        except Exception:
            pass
        return "mps"
    return "cpu"

def load_silero_vad():
    """Load Silero VAD model from torch hub."""
    try:
        model, _utils = torch.hub.load(
            repo_or_dir='snakers4/silero-vad',
            model='silero_vad',
            force_reload=False,
            onnx=False
        )
        return model
    except Exception as e:
        logging.warning(f"Failed to load Silero VAD: {e}")
        return None

## Silero VAD - Voice Activity Detection

The `load_silero_vad()` function loads the **Silero VAD** model from PyTorch Hub.

### What is VAD?

Voice Activity Detection is a technique for identifying segments of audio that contain human speech. Silero VAD is a lightweight neural network that outputs a probability score (0.0 to 1.0) indicating the likelihood that a given audio chunk contains speech.

### Why VAD Matters

Without VAD, a transcription system would: <br>
- Send every audio chunk to Whisper (expensive and slow) <br>
- Transcribe silence and background noise <br>
- Have difficulty segmenting continuous speech into utterances <br>

With VAD, the system: <br>
- Only transcribes when speech is detected <br>
- Automatically segments based on natural pauses <br>
- Reduces API costs and improves responsiveness <br>

### Implementation Details

The Silero VAD model: <br>
- Accepts 16 kHz audio tensors <br>
- Returns a single float between 0.0 and 1.0 <br>
- Runs extremely fast (< 1ms per chunk on CPU) <br>
- Requires no internet connection after initial download <br>

If loading fails (offline, network issues), the function returns `None` and the transcriber raises a runtime error since VAD is essential for operation.

## LiveTranscriber Class

The `LiveTranscriber` class captures live audio from your microphone, detects when you are speaking using **Silero VAD**, and transcribes each spoken sentence using **Faster-Whisper** once you pause.

It runs asynchronously, making it ideal for real-time interfaces like TUIs or voice assistants.

### Parameters

- **model_id** (`str`): Whisper model to use <br>
  - Options: `"tiny"`, `"base"`, `"small"`, `"medium"`, `"large-v3"` <br>
  - Trade-off: Speed vs. accuracy (tiny is fastest, large is most accurate) <br>
  - Default: `"openai/whisper-base"` <br>

- **language** (`str`): ISO language code for transcription <br>
  - Default: `"en"` (English) <br>
  - Supports 40+ languages (see Whisper documentation) <br>

- **force_cpu** (`bool`): Force CPU usage even if GPU is available <br>
  - Useful for systems where GPU is reserved for other tasks <br>
  - Default: `False` <br>

- **on_transcript** (`Callable`): Callback function called with each transcribed text chunk <br>
  - Signature: `def callback(text: str) -> None` or `async def callback(text: str) -> None` <br>
  - Called once per utterance (speech segment followed by silence) <br>

- **vad_threshold** (`float`): Silero confidence threshold (0.0‚Äì1.0) <br>
  - Higher values require more confident speech detection <br>
  - Lower values are more sensitive but may catch background noise <br>
  - Default: `0.5` (balanced) <br>

- **min_speech_duration_ms** (`int`): Minimum length of speech to count as valid <br>
  - Filters out very brief sounds that might be noise <br>
  - Default: `250` ms <br>

- **min_silence_duration_ms** (`int`): How long silence must last before triggering transcription <br>
  - Controls utterance segmentation <br>
  - Lower values create more frequent, shorter transcriptions <br>
  - Higher values wait longer before processing <br>
  - Default: `500` ms <br>

### Key Methods

#### start() - Begin Transcription
Starts the microphone capture and transcription loop. Runs asynchronously until `stop()` is called.

```python
await transcriber.start()
```

#### stop(transcribe_remaining=True) - Stop Transcription
Gracefully stops audio processing. If `transcribe_remaining=True`, processes any buffered speech before exiting.

```python
await transcriber.stop()
```

#### process_audio() - Internal Processing Loop
Continuously consumes audio chunks from the queue, runs VAD, and triggers transcription. Automatically called by `start()`.

#### _transcribe_chunk(audio_data) - Transcription
Uses Whisper to transcribe one complete utterance. Runs in a worker thread to avoid blocking.

#### _detect_speech_silero(audio_chunk) - VAD Analysis
Returns `True` if Silero VAD detects speech in the current chunk.

### How It Works

1. **Audio Capture**: PyAudio streams 512-sample chunks (32 ms at 16 kHz) from the microphone <br>
2. **Thread Safety**: Audio callback pushes chunks to an AsyncIO queue via `call_soon_threadsafe` <br>
3. **VAD Processing**: Each chunk is analyzed by Silero VAD <br>
4. **Speech Buffering**: Speech chunks are accumulated in a list <br>
5. **Silence Detection**: When silence persists for `min_silence_duration_ms`, buffered speech is sent to Whisper <br>
6. **Transcription**: Whisper runs in a worker thread and calls `on_transcript` with the result <br>
7. **State Reset**: Buffer is cleared and the system is ready for the next utterance <br>

### State Machine

```
IDLE (waiting for speech)
  ‚Üì (speech detected)
RECORDING (buffering speech chunks)
  ‚Üì (silence detected for min_silence_duration_ms)
TRANSCRIBING (Whisper processes buffered audio)
  ‚Üì (transcription complete)
IDLE (ready for next utterance)
```

### Thread Safety

The implementation carefully manages threading: <br>
- **PyAudio thread**: Runs the audio callback <br>
- **AsyncIO thread**: Runs the processing loop <br>
- **Worker threads**: Run Whisper transcription <br>

Communication between threads uses `asyncio.Queue` and `loop.call_soon_threadsafe()` to ensure thread safety.

In [None]:
#| export
#| include: false

class LiveTranscriber:
    """
    Improved LiveTranscriber:
      - uses asyncio.Queue for clean async consumption
      - pushes audio from audio thread with loop.call_soon_threadsafe
      - collects chunks in a list (no repeated np.append)
      - does NOT append silence chunks into the sent buffer
      - dispatches transcription to a thread and safely calls callbacks
      - provides stop() for clean shutdown
    """

    def __init__(
        self,
        model_id: str = "openai/whisper-base",
        language: str = "en",
        force_cpu: bool = False,
        on_transcript: Optional[Callable[[str], None]] = None,
        vad_threshold: float = 0.5,
        min_speech_duration_ms: int = 250,
        min_silence_duration_ms: int = 500,
    ):
        self.logger = logging.getLogger(__name__)
        self.on_transcript = on_transcript

        self.model_id = model_id
        self.language = language

        self.sample_rate = 16000

        # ASR model
        self.device = get_device(force_cpu=force_cpu)
        self.transcribe_model = WhisperModel(
            self.model_id,
            device=self.device,
            compute_type="int8" if self.device == "cpu" else "float16",
        )

        # VAD
        self.vad_threshold = vad_threshold
        self.silero_model = load_silero_vad()
        if self.silero_model is None:
            raise RuntimeError("Silero VAD failed to load. Cannot continue.")

        # thresholds in samples
        self.min_speech_samples = int(self.sample_rate * min_speech_duration_ms / 1000)
        self.min_silence_samples = int(self.sample_rate * min_silence_duration_ms / 1000)

        # async queue and runtime vars (set in start())
        self.async_queue: Optional[asyncio.Queue] = None
        self.loop: Optional[asyncio.AbstractEventLoop] = None

        # state used by process_audio
        self.is_running = False
        self.is_speech_active = False
        self.speech_chunks: list[np.ndarray] = []  # collect speech chunks here (no np.append)
        self.speech_samples = 0  # total samples currently in speech_chunks
        self.silence_counter = 0  # in samples

        self._pyaudio = None
        self._stream = None

        self.logger.info(
            f"LiveTranscriber init (model={model_id}, device={self.device}, rate={self.sample_rate})"
        )

    # -------------------------
    # VAD & transcription helpers
    # -------------------------
    def _detect_speech_silero(self, audio_chunk: np.ndarray) -> bool:
        """Return True if Silero considers this chunk speech."""
        try:
            audio_tensor = torch.from_numpy(audio_chunk).float()
            prob = self.silero_model(audio_tensor, self.sample_rate).item()
            return prob > self.vad_threshold
        except Exception as e:
            self.logger.warning(f"Silero VAD error: {e}")
            return False

    def _transcribe_chunk(self, audio_data: np.ndarray) -> str:
        """Blocking transcription call. Run in thread via asyncio.to_thread."""
        segments, _info = self.transcribe_model.transcribe(
            audio_data,
            language=self.language,
            beam_size=1,
            condition_on_previous_text=False,
            vad_filter=False,
        )
        return " ".join(s.text.strip() for s in segments).strip()

    async def _run_transcribe_and_callback(self, buffer_copy: np.ndarray) -> None:
        """Run transcription in a worker thread and call user callback safely."""
        try:
            text = await asyncio.to_thread(self._transcribe_chunk, buffer_copy)
        except Exception as e:
            self.logger.exception(f"Transcription failed: {e}")
            return

        if not text:
            return

        if not self.on_transcript:
            return

        try:
            if asyncio.iscoroutinefunction(self.on_transcript):
                await self.on_transcript(text)
            else:
                await asyncio.to_thread(self.on_transcript, text)
        except Exception as e:
            self.logger.exception(f"on_transcript callback failed: {e}")

    # -------------------------
    # PyAudio callback -> async queue
    # -------------------------
    def audio_callback(self, in_data, frame_count, time_info, status):
        """Runs in PyAudio native thread. Put chunk into asyncio.Queue via loop.call_soon_threadsafe."""
        if status:
            self.logger.debug(f"Audio callback status: {status}")
        audio = np.frombuffer(in_data, dtype=np.int16).astype(np.float32) / 32768.0

        # the audio stream starts only after start() sets self.loop and self.async_queue
        if self.loop and self.async_queue:
            # safe: put_nowait from the audio thread via call_soon_threadsafe
            self.loop.call_soon_threadsafe(self.async_queue.put_nowait, audio)
        else:
            # fallback: drop audio if not ready
            self.logger.debug("Dropping audio: loop or async_queue not ready")

        return in_data, pyaudio.paContinue

    # -------------------------
    # Main async processing loop
    # -------------------------
    async def process_audio(self):
        """Consume audio chunks from self.async_queue and run VAD/chunking with minimal nesting."""
        assert self.async_queue is not None

        while self.is_running:
            # await next chunk (clean, no busy-sleep)
            try:
                chunk: np.ndarray = await self.async_queue.get()
            except asyncio.CancelledError:
                break

            is_speech = self._detect_speech_silero(chunk)

            if is_speech:
                # start or extend a speech buffer (we only append speech chunks)
                if not self.is_speech_active:
                    # You just STARTED talking
                    self.is_speech_active = True
                    self.speech_chunks = [chunk.copy()]     # Start new buffer
                    self.speech_samples = len(chunk)        # Count samples
                    self.silence_counter = 0                # Reset silence timer
                else:
                    # You're STILL talking
                    self.speech_chunks.append(chunk)        # Add to buffer
                    self.speech_samples += len(chunk)       # Count more samples
                    self.silence_counter = 0                # Reset silence timer
                continue    # Skip to next chunk

            # chunk is silence
            if not self.is_speech_active:
                # You're still quiet, nothing to do
                continue    # Skip to next chunk

            # we were in speech; got a silence chunk -> count it
            self.silence_counter += len(chunk)

            if self.silence_counter < self.min_silence_samples:
                # not enough silence yet to finalize
                continue

            # enough silence observed -> finalize this speech segment
            if self.speech_samples >= self.min_speech_samples:

                # 1. Combine all speech chunks into one audio buffer
                if len(self.speech_chunks) > 1:
                    buffer_copy = np.concatenate(self.speech_chunks)  # Multiple chunks
                else:
                    buffer_copy = self.speech_chunks[0].copy()        # Single chunk
                
                # 2. IMMEDIATELY reset state (so we can capture new speech)
                self.is_speech_active = False
                self.speech_chunks = []
                self.speech_samples = 0
                self.silence_counter = 0

                # 3. Send to Whisper (in background, don't wait for it)
                asyncio.create_task(self._run_transcribe_and_callback(buffer_copy))
            else:
                # Speech was too short (< 250ms), just ignore it
                self.is_speech_active = False
                self.speech_chunks = []
                self.speech_samples = 0
                self.silence_counter = 0

    # -------------------------
    # Start / stop helpers
    # -------------------------
    async def start(self):
        """Start audio stream and processing loop. Returns when process_audio finishes (stop() called)."""
        if self.is_running:
            return

        self.loop = asyncio.get_running_loop()
        self.async_queue = asyncio.Queue()
        self.is_running = True

        # start pyaudio stream
        self._pyaudio = pyaudio.PyAudio()
        self._stream = self._pyaudio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=self.sample_rate,
            input=True,
            frames_per_buffer=512,
            stream_callback=self.audio_callback,
        )
        self._stream.start_stream()

        try:
            await self.process_audio()
        finally:
            # cleanup
            try:
                self._stream.stop_stream()
                self._stream.close()
            except Exception:
                pass
            try:
                self._pyaudio.terminate()
            except Exception:
                pass
            self.is_running = False
            self.loop = None
            self.async_queue = None

    async def stop(self, transcribe_remaining: bool = True):
        """Signal the processing loop to stop and optionally transcribe remaining speech."""
        self.is_running = False
        
        # If we have buffered speech and want to transcribe it
        if transcribe_remaining and self.is_speech_active and len(self.speech_chunks) > 0:
            # Combine speech chunks
            if len(self.speech_chunks) > 1:
                buffer_copy = np.concatenate(self.speech_chunks)
            else:
                buffer_copy = self.speech_chunks[0].copy()
            
            # Clear state
            self.is_speech_active = False
            self.speech_chunks = []
            self.speech_samples = 0
            self.silence_counter = 0
            
            # Transcribe immediately (wait for completion)
            await self._run_transcribe_and_callback(buffer_copy)
        
        # Wait for process_audio to exit
        await asyncio.sleep(0.05)


In [None]:
#| eval: false
import asyncio

all_chunks = []

def handle_transcript_chunk(text: str):
    """Callback called whenever a transcription chunk is ready."""
    if text.strip():
        print(f"\n[TRANSCRIBED] {text}")
        all_chunks.append(text)

async def test_live_transcription(duration_seconds: int = 10):
    all_chunks.clear()
    print("üé§ Speak in short sentences; pauses will trigger transcription.")
    transcriber = LiveTranscriber(
        model_id="base",          # keep whatever model id you use
        language="en",
        on_transcript=handle_transcript_chunk,
        vad_threshold=0.5,
        min_speech_duration_ms=250,
        min_silence_duration_ms=500,
    )

    # start the transcriber in the background
    start_task = asyncio.create_task(transcriber.start())

    try:
        # run for given duration (you can interrupt with Ctrl+C)
        await asyncio.sleep(duration_seconds)
    except KeyboardInterrupt:
        print("Interrupted by user.")
    finally:
        # ask the transcriber to stop and wait for it to finish
        await transcriber.stop()

        # wait for the start() task to exit cleanly
        try:
            await start_task
        except Exception as e:
            # if any error bubbled up from start/process_audio, show it
            print(f"Transcriber task ended with exception: {e}")

    print("\nüìù Full transcript:")
    for i, t in enumerate(all_chunks, 1):
        print(f"{i}. {t}")

# run it
await test_live_transcription(10)

