In [1]:
# Cài đặt ffmpeg trên MacOS bằng homebrew
!brew install -q ffmpeg

In [2]:
!pip install -q numpy sounddevice webrtcvad soundfile

In [3]:
!pip install -q --upgrade pip

In [4]:
!pip install -q setuptools

In [5]:
!pip install -q openai-whisper

In [None]:
#1. Cấu trúc file và dependencies:
# realtime_vad.py
import queue
import threading
import numpy as np
import sounddevice as sd
from dataclasses import dataclass
from typing import List, Optional
from collections import deque
import webrtcvad
import time
import whisper
import soundfile as sf
import os

@dataclass
class SpeechSegment:
    samples: np.ndarray
    start_time: float
    end_time: float
    confidence: float
    transcript: str = ""

In [7]:
class RealtimeVAD:
    def __init__(self, 
                 sample_rate: int = 16000,
                 chunk_duration_ms: int = 30,
                 padding_duration_ms: int = 300,
                 silence_duration_ms: int = 500,
                 vad_sensitivity: int = 3):
        """
        Args:
            sample_rate: Phải là 16000 cho WebRTC VAD
            chunk_duration_ms: Độ dài mỗi chunk (30ms là tối ưu cho WebRTC VAD)
            padding_duration_ms: Padding trước và sau speech
            silence_duration_ms: Thời gian silence để kết thúc segment
            vad_sensitivity: Độ nhạy của VAD (1-3, 3 là nhạy nhất)
        """
        self.sample_rate = sample_rate
        self.chunk_duration_ms = chunk_duration_ms
        self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)
        self.padding_duration_ms = padding_duration_ms
        self.silence_duration_ms = silence_duration_ms
        
        # Initialize WebRTC VAD
        self.vad = webrtcvad.Vad()
        self.vad.set_mode(vad_sensitivity)

        # Buffers
        self.audio_buffer = deque(maxlen=32000)  # 2 seconds buffer
        self.current_speech = []
        self.speech_segments = queue.Queue()
        
        # State tracking
        self.in_speech = False
        self.silence_start = None
        self.speech_start = None
        self.processed_samples = 0

        # Recording storage (full session) - will be written to WAV at the end (fallback)
        self.recorded_samples = []  # store int16 samples

        # Optional writer queue (started by higher-level class if streaming to disk)
        self.audio_write_queue = None

    #2. Audio Input Handler:
    def _audio_callback(self, indata, frames, time, status):
        """Callback for sounddevice's InputStream"""
        if status:
            print(f"Status: {status}")
            
        # Convert to mono and correct format (int16)
        audio_chunk = (indata[:, 0] * 32767).astype(np.int16)
        
        # Add to processing queue for VAD
        self.audio_buffer.extend(audio_chunk)

        # If an audio writer queue exists, push the chunk for streaming write
        if getattr(self, 'audio_write_queue', None) is not None:
            try:
                # put a copy so further mutations don't affect queued data
                self.audio_write_queue.put(audio_chunk.copy(), block=False)
            except Exception:
                # queue full or other issue - drop silently to avoid blocking audio thread
                pass
        
        # Process complete chunks
        while len(self.audio_buffer) >= self.chunk_size:
            chunk = np.array([self.audio_buffer.popleft() 
                            for _ in range(self.chunk_size)])
            self._process_chunk(chunk)

    #3. VAD Processing Logic:
    def _process_chunk(self, chunk: np.ndarray):
        """Process một chunk audio với VAD"""
        # Append to full recording buffer (keep as Python ints to avoid large numpy memory until write)
        try:
            self.recorded_samples.extend(chunk.tolist())
        except Exception:
            # Fallback: if extend fails for memory reasons, silently continue (still do VAD)
            pass

        is_speech = self.vad.is_speech(chunk.tobytes(), self.sample_rate)
        current_time = self.processed_samples / self.sample_rate

        if is_speech and not self.in_speech:
            # Speech bắt đầu
            self.in_speech = True
            self.speech_start = max(0, current_time - self.padding_duration_ms/1000)
            self.silence_start = None
            
            # Add padding từ buffer trước đó
            padding_samples = int(self.padding_duration_ms * self.sample_rate / 1000)
            if self.audio_buffer:
                padding = list(self.audio_buffer)[-padding_samples:]
                self.current_speech.extend(padding)
            
        elif not is_speech and self.in_speech:
            # Potential end of speech
            if self.silence_start is None:
                self.silence_start = current_time
                
            # Check if silence đủ dài
            if (current_time - self.silence_start) > self.silence_duration_ms/1000:
                self._finalize_speech_segment()
                
        # Collect samples nếu đang trong speech segment
        if self.in_speech:
            self.current_speech.extend(chunk)
            
        self.processed_samples += len(chunk)

    def _finalize_speech_segment(self):
        """Kết thúc speech segment hiện tại"""
        if not self.current_speech:
            return
            
        # Add post-speech padding
        padding_samples = int(self.padding_duration_ms * self.sample_rate / 1000)
        if self.audio_buffer:
            padding = list(self.audio_buffer)[:padding_samples]
            self.current_speech.extend(padding)
        
        # Create speech segment
        segment = SpeechSegment(
            samples=np.array(self.current_speech),
            start_time=self.speech_start,
            end_time=self.processed_samples / self.sample_rate,
            confidence=0.9
        )
        
        # Add to output queue
        self.speech_segments.put(segment)
        
        # Reset state
        self.current_speech = []
        self.in_speech = False
        self.speech_start = None
        self.silence_start = None

    #4. Main Recording Loop:
    def start(self, duration: Optional[float] = None):
        """Bắt đầu recording và VAD processing"""
        try:
            with sd.InputStream(channels=1,
                            samplerate=self.sample_rate,
                            dtype=np.float32,
                            callback=self._audio_callback,
                            blocksize=self.chunk_size):
                
                print(f"Started VAD recording (Press Ctrl+C to stop)")
                
                start_time = time.time()
                while True:
                    # Check duration
                    if duration and (time.time() - start_time) > duration:
                        break
                        
                    # Process available speech segments
                    try:
                        segment = self.speech_segments.get_nowait()
                        self._process_speech_segment(segment)
                    except queue.Empty:
                        time.sleep(0.1)
                        continue
                        
        except KeyboardInterrupt:
            print("\nRecording stopped.")
            
    def _process_speech_segment(self, segment: SpeechSegment):
        """Process detected speech segment - Override trong subclass"""
        duration = segment.end_time - segment.start_time
        print(f"Speech detected: {duration:.2f}s ({segment.start_time:.2f}s - {segment.end_time:.2f}s)")
        # Có thể thêm xử lý khác ở đây (e.g., transcription)


In [8]:
class TranscriptionWorker:
    def __init__(self, model_name: str = "base"):
        self.model = whisper.load_model(model_name)
        self.queue = queue.Queue()
        self.results = queue.Queue()
        self.running = True
        self.thread = threading.Thread(target=self._process_loop)
        self.thread.daemon = True
        self.thread.start()
    
    def _process_loop(self):
        while self.running:
            try:
                segment = self.queue.get(timeout=1)
                
                # Save temporary WAV file for Whisper
                temp_file = f"temp_{time.time()}.wav"
                sf.write(temp_file, segment.samples, 16000)
                
                try:
                    # Transcribe with Whisper using FP32
                    result = self.model.transcribe(temp_file, fp16=False)
                    segment.transcript = result["text"].strip()
                    self.results.put(segment)
                finally:
                    # Cleanup temp file
                    if os.path.exists(temp_file):
                        os.remove(temp_file)
                        
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Transcription error: {e}")
    
    def add_segment(self, segment: SpeechSegment):
        self.queue.put(segment)
    
    def get_transcribed_segment(self, timeout: float = 0.1) -> Optional[SpeechSegment]:
        try:
            return self.results.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def stop(self):
        self.running = False
        if self.thread.is_alive():
            self.thread.join()

In [9]:
# Example với realtime transcription
class TranscribingVAD(RealtimeVAD):
    def __init__(self, *args, transcript_file="transcript.txt", audio_file="recording.wav", max_meeting_duration: Optional[float] = None, **kwargs):
        """Transcribing VAD.

        Args:
            transcript_file: path to write transcripts
            audio_file: path to save full recording
            max_meeting_duration: optional maximum meeting duration in seconds
        """
        super().__init__(*args, **kwargs)
        self.transcriber = TranscriptionWorker(model_name="turbo")
        self.transcript_file = transcript_file
        self.audio_file = audio_file
        self.pending_transcripts = 0  # Đếm số transcript đang chờ xử lý
        self.max_meeting_duration = max_meeting_duration
        
        # Create transcript file with header
        with open(self.transcript_file, 'w', encoding='utf-8') as f:
            f.write("=== Transcript File ===\n")
            f.write(f"Created: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
            if self.max_meeting_duration:
                f.write(f"Max meeting duration (s): {self.max_meeting_duration}\n")
            f.write("====================\n\n")
    
    def _save_transcript(self, segment: SpeechSegment):
        """Lưu transcript vào file với timestamp"""
        # Use absolute times relative to program start would be ideal; for simplicity use hh:mm:ss.ms now
        start_time = time.strftime('%H:%M:%S') + f".{int((segment.start_time % 1) * 1000):03d}"
        end_time = time.strftime('%H:%M:%S') + f".{int((segment.end_time % 1) * 1000):03d}"
        
        with open(self.transcript_file, 'a', encoding='utf-8') as f:
            f.write(f"[{start_time} - {end_time}] {segment.transcript}\n")
    
    def _process_speech_segment(self, segment):
        # Print speech detection
        super()._process_speech_segment(segment)
        
        # Send for async transcription
        self.transcriber.add_segment(segment)
        self.pending_transcripts += 1  # Tăng số lượng transcript đang chờ
        
        # Check for completed transcriptions
        while True:
            result = self.transcriber.get_transcribed_segment(timeout=0.1)
            if not result:
                break
            # In ra console
            print(f"Transcript [{result.start_time:.2f}s - {result.end_time:.2f}s]: {result.transcript}")
            # Lưu vào file
            self._save_transcript(result)
            self.pending_transcripts -= 1  # Giảm số lượng transcript đang chờ
    
    def _write_audio_file(self):
        """Write the full recorded samples to the configured WAV file."""
        try:
            import os
            print(f"Attempting to write audio file to: {os.path.abspath(self.audio_file)}")
            if not hasattr(self, 'recorded_samples') or len(self.recorded_samples) == 0:
                print("No recorded samples to write.")
                return
            import soundfile as sf
            samples_np = np.array(self.recorded_samples, dtype=np.int16)
            print(f"Number of samples to write: {len(samples_np)} (duration={len(samples_np)/self.sample_rate:.2f}s)")
            # soundfile expects float or int arrays; write as PCM 16
            sf.write(self.audio_file, samples_np, self.sample_rate, subtype='PCM_16')
            print(f"Saved recording to {self.audio_file}")
        except Exception as e:
            print(f"Failed to write audio file with soundfile: {e}")
            # Fallback: try using wave module
            try:
                import wave, struct
                samples_np = np.array(self.recorded_samples, dtype=np.int16)
                with wave.open(self.audio_file, 'wb') as wf:
                    wf.setnchannels(1)
                    wf.setsampwidth(2)  # bytes
                    wf.setframerate(self.sample_rate)
                    wf.writeframes(samples_np.tobytes())
                print(f"Saved recording to {self.audio_file} using wave fallback")
            except Exception as e2:
                print(f"Fallback wave write failed: {e2}")
    
    def start(self, duration: Optional[float] = None):
        """Override phương thức start để thêm xử lý chờ transcription và lưu audio

        `duration` is a requested recording time (seconds). If `max_meeting_duration` was set
        in the constructor, recording will stop no later than that value. If both are None, recording
        continues until interrupted.
        """
        try:
            with sd.InputStream(channels=1,
                            samplerate=self.sample_rate,
                            dtype=np.float32,
                            callback=self._audio_callback,
                            blocksize=self.chunk_size):
                
                print(f"Started VAD recording (Press Ctrl+C to stop)")
                
                start_time = time.time()
                # Compute absolute stop time based on duration and max_meeting_duration
                requested_end = None
                if duration is not None:
                    requested_end = start_time + duration
                max_end = None
                if self.max_meeting_duration is not None:
                    max_end = start_time + self.max_meeting_duration

                while True:
                    now = time.time()
                    # Enforce requested duration
                    if requested_end is not None and now >= requested_end:
                        print("Reached requested duration, stopping recording loop.")
                        break
                    # Enforce maximum meeting duration
                    if max_end is not None and now >= max_end:
                        print("Reached maximum meeting duration, stopping recording loop.")
                        break

                    # Process available speech segments
                    try:
                        segment = self.speech_segments.get_nowait()
                        self._process_speech_segment(segment)
                    except queue.Empty:
                        time.sleep(0.1)
                        continue
                
                print("\nRecording finished, waiting for pending transcriptions...")
                
                # Chờ cho tất cả transcription hoàn thành
                while self.pending_transcripts > 0:
                    # Tiếp tục xử lý các transcription đang pending
                    try:
                        result = self.transcriber.get_transcribed_segment(timeout=0.1)
                        if result:
                            print(f"Transcript [{result.start_time:.2f}s - {result.end_time:.2f}s]: {result.transcript}")
                            self._save_transcript(result)
                            self.pending_transcripts -= 1
                    except queue.Empty:
                        continue
                    
                # After all transcriptions saved, write the full audio file
                self._write_audio_file()
                print(f"All transcriptions completed. Results saved to {self.transcript_file}")
                        
        except KeyboardInterrupt:
            print("\nRecording stopped by user, waiting for pending transcriptions...")
            # Xử lý tương tự khi người dùng dừng recording
            while self.pending_transcripts > 0:
                try:
                    result = self.transcriber.get_transcribed_segment(timeout=0.1)
                    if result:
                        print(f"Transcript [{result.start_time:.2f}s - {result.end_time:.2f}s]: {result.transcript}")
                        self._save_transcript(result)
                        self.pending_transcripts -= 1
                except queue.Empty:
                    continue
            # Write audio file even if interrupted
            self._write_audio_file()
            print(f"All transcriptions completed. Results saved to {self.transcript_file}")
    
    def __del__(self):
        if hasattr(self, 'transcriber'):
            self.transcriber.stop()


In [None]:
# Create và start VAD với transcription
vad = TranscribingVAD(
    sample_rate=16000,
    chunk_duration_ms=30,
    padding_duration_ms=300,
    silence_duration_ms=500,
    vad_sensitivity=3,
    transcript_file="meeting_transcript.txt",  # Specify output file
    audio_file="meeting_recording.wav"
)

# Run for 60 seconds
vad.start(duration=60)