## 1. Install Dependencies

In [10]:
# Uncomment to install dependencies
# !pip install torch transformers pyaudio numpy chunkformer

## 2. Import Libraries and Configuration

In [11]:
import time
import threading
import numpy as np
import pyaudio
import wave
import tempfile
import os
from dataclasses import dataclass
from typing import Optional, Tuple, Dict, List
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from chunkformer import ChunkFormerModel
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets

In [12]:
@dataclass
class Config:
    """Configuration for the real-time ASR + Classification system."""
    # Model paths
    ASR_MODEL_PATH: str = "khanhld/chunkformer-ctc-large-vie"
    CLASSIFIER_MODEL_PATH: str = "model/checkpoint-25000"
    
    # Audio settings
    SAMPLE_RATE: int = 16000
    CHANNELS: int = 1
    CHUNK_SIZE: int = 1024
    INPUT_DEVICE_INDEX: Optional[int] = None  # None uses default device
    
    # Recording settings
    MAX_RECORDING_DURATION: float = 30.0  # Max seconds before forced transcription
    SILENCE_DURATION: float = 2.0  # Seconds of silence to trigger transcription
    
    # Noise calibration settings (more robust)
    NOISE_CALIBRATION_DURATION: float = 3.0  # Seconds to record for calibration
    NOISE_PERCENTILE: float = 95.0  # Use 95th percentile for robust estimation
    NOISE_MULTIPLIER: float = 2.5  # Multiplier above noise floor
    MIN_SILENCE_THRESHOLD: float = 0.005  # Minimum threshold
    MAX_SILENCE_THRESHOLD: float = 0.1  # Maximum threshold to prevent issues
    CALIBRATION_SEGMENTS: int = 5  # Number of segments for robust estimation

config = Config()
print("Configuration loaded.")

Configuration loaded.


## 3. Load Models

In [13]:
# Determine device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load ASR model (ChunkFormer)
print(f"Loading ASR model: {config.ASR_MODEL_PATH}...")
asr_model = ChunkFormerModel.from_pretrained(config.ASR_MODEL_PATH)
if device.type == "cuda":
    asr_model = asr_model.cuda()
print("ASR model loaded.")

# Load Classification model (mDeBERTa)
print(f"Loading classifier model: {config.CLASSIFIER_MODEL_PATH}...")
classifier_tokenizer = AutoTokenizer.from_pretrained(config.CLASSIFIER_MODEL_PATH)
classifier_model = AutoModelForSequenceClassification.from_pretrained(config.CLASSIFIER_MODEL_PATH)
classifier_model = classifier_model.to(device)
classifier_model.eval()
print("Classifier model loaded.")

print(f"\n‚úÖ All models loaded successfully on {device}")

Using device: cuda
Loading ASR model: khanhld/chunkformer-ctc-large-vie...


Fetching 10 files:   0%|          | 0/10 [00:00<?, ?it/s]

ASR model loaded.
Loading classifier model: model/checkpoint-25000...


The tokenizer you are loading from 'model/checkpoint-25000' with an incorrect regex pattern: https://huggingface.co/mistralai/Mistral-Small-3.1-24B-Instruct-2503/discussions/84#69121093e8b480e709447d5e. This will lead to incorrect tokenization. You should set the `fix_mistral_regex=True` flag when loading this tokenizer to fix this issue.


Classifier model loaded.

‚úÖ All models loaded successfully on cuda


## 4. Robust Noise Calibration System

In [14]:
class RobustNoiseCalibrator:
    """
    Robust noise calibration using multiple statistical methods:
    - Percentile-based estimation (resistant to outliers)
    - Segmented analysis for consistency
    - Adaptive thresholding with bounds
    """
    
    def __init__(self, config: Config):
        self.config = config
        self.silence_threshold = config.MIN_SILENCE_THRESHOLD
        self.noise_stats = {}
        
    def list_audio_devices(self) -> List[Dict]:
        """List available audio input devices."""
        p = pyaudio.PyAudio()
        devices = []
        try:
            info = p.get_host_api_info_by_index(0)
            num_devices = info.get('deviceCount', 0)
            
            print("Available Audio Input Devices:")
            for i in range(num_devices):
                dev_info = p.get_device_info_by_host_api_device_index(0, i)
                if dev_info.get('maxInputChannels', 0) > 0:
                    name = dev_info.get('name', 'Unknown')
                    devices.append({'index': i, 'name': name})
                    print(f"  [{i}] {name}")
        except Exception as e:
            print(f"Error listing devices: {e}")
        finally:
            p.terminate()
        return devices
    
    def _compute_rms(self, audio: np.ndarray) -> float:
        """Compute RMS of audio signal."""
        return float(np.sqrt(np.mean(audio ** 2)))
    
    def _compute_percentile_amplitude(self, audio: np.ndarray, percentile: float) -> float:
        """Compute percentile of absolute amplitude."""
        return float(np.percentile(np.abs(audio), percentile))
    
    def calibrate(self, show_progress: bool = True) -> Dict:
        """
        Perform robust noise calibration.
        
        Returns:
            Dict with calibration statistics and computed threshold
        """
        duration = self.config.NOISE_CALIBRATION_DURATION
        num_segments = self.config.CALIBRATION_SEGMENTS
        
        if show_progress:
            print(f"Noise Calibration - Stay silent for {duration:.1f}s...")
        
        p = pyaudio.PyAudio()
        stream = p.open(
            format=pyaudio.paInt16,
            channels=self.config.CHANNELS,
            rate=self.config.SAMPLE_RATE,
            input=True,
            input_device_index=self.config.INPUT_DEVICE_INDEX,
            frames_per_buffer=self.config.CHUNK_SIZE
        )
        
        # Collect audio in segments for robust analysis
        segment_duration = duration / num_segments
        chunks_per_segment = int(self.config.SAMPLE_RATE * segment_duration / self.config.CHUNK_SIZE)
        
        all_audio = []
        segment_rms_values = []
        segment_percentiles = []
        
        for seg in range(num_segments):
            segment_frames = []
            for _ in range(chunks_per_segment):
                data = stream.read(self.config.CHUNK_SIZE, exception_on_overflow=False)
                chunk = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
                segment_frames.append(chunk)
            
            segment_audio = np.concatenate(segment_frames)
            all_audio.append(segment_audio)
            
            # Compute segment statistics
            seg_rms = self._compute_rms(segment_audio)
            seg_percentile = self._compute_percentile_amplitude(segment_audio, self.config.NOISE_PERCENTILE)
            segment_rms_values.append(seg_rms)
            segment_percentiles.append(seg_percentile)
            
            if show_progress:
                progress = int((seg + 1) / num_segments * 100)
                print(f"\r  Progress: {progress}%", end="")
        
        stream.stop_stream()
        stream.close()
        p.terminate()
        
        if show_progress:
            print()
        
        # Combine all audio for overall statistics
        full_audio = np.concatenate(all_audio)
        
        # Compute robust statistics
        overall_rms = self._compute_rms(full_audio)
        overall_percentile = self._compute_percentile_amplitude(full_audio, self.config.NOISE_PERCENTILE)
        
        # Use median of segment values for robustness against transient noises
        median_rms = float(np.median(segment_rms_values))
        median_percentile = float(np.median(segment_percentiles))
        
        # Check for anomalies (segments with very different values)
        rms_std = float(np.std(segment_rms_values))
        is_stable = rms_std < median_rms * 0.5  # Less than 50% variation
        
        # Compute threshold using the more robust metric
        base_noise = max(median_percentile, median_rms)
        computed_threshold = base_noise * self.config.NOISE_MULTIPLIER
        
        # Clamp to valid range
        final_threshold = np.clip(
            computed_threshold,
            self.config.MIN_SILENCE_THRESHOLD,
            self.config.MAX_SILENCE_THRESHOLD
        )
        
        self.silence_threshold = float(final_threshold)
        
        # Store statistics
        self.noise_stats = {
            'overall_rms': overall_rms,
            'overall_percentile': overall_percentile,
            'median_rms': median_rms,
            'median_percentile': median_percentile,
            'rms_std': rms_std,
            'is_stable': is_stable,
            'computed_threshold': computed_threshold,
            'final_threshold': final_threshold,
            'segment_rms_values': segment_rms_values
        }
        
        # Print results
        if show_progress:
            status = "OK" if is_stable else "Unstable"
            print(f"Calibration complete [{status}]")
            print(f"  Noise RMS: {median_rms:.5f}, Threshold: {final_threshold:.5f}")
            
            if overall_rms < 0.0001:
                print("  Warning: Very low noise - check if mic is muted")
        
        return self.noise_stats
    
    def get_threshold(self) -> float:
        """Get the current silence threshold."""
        return self.silence_threshold

## 5. Text Classification Function

In [15]:
def classify_text(text: str) -> Dict:
    """
    Classify text using the mDeBERTa hate speech classifier.
    
    Args:
        text: Vietnamese text to classify
        
    Returns:
        Dict with label, confidence, and probabilities
    """
    if not text or not text.strip():
        return {
            "text": text,
            "predicted_label": "unknown",
            "confidence": 0.0,
            "probabilities": {}
        }
    
    # Tokenize
    inputs = classifier_tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        max_length=512,
        padding=True
    )
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    # Inference
    with torch.no_grad():
        outputs = classifier_model(**inputs)
        probabilities = torch.softmax(outputs.logits, dim=-1)
    
    # Get prediction
    predicted_class = torch.argmax(probabilities, dim=-1).item()
    confidence = probabilities[0][predicted_class].item()
    
    # Map labels
    id2label = classifier_model.config.id2label
    def get_label(idx):
        return id2label.get(idx, id2label.get(str(idx), f"label_{idx}"))
    
    return {
        "text": text,
        "predicted_label": get_label(predicted_class),
        "confidence": confidence,
        "probabilities": {
            get_label(i): prob.item()
            for i, prob in enumerate(probabilities[0])
        }
    }

## 6. Real-time ASR + Classification Pipeline

In [16]:
class RealtimeASRClassifierWidget:
    """
    Real-time Vietnamese Speech Recognition with Hate Speech Classification.
    Uses ipywidgets to display interactive controls and results.
    """
    
    def __init__(self, config: Config, calibrator: RobustNoiseCalibrator):
        self.config = config
        self.calibrator = calibrator
        
        # State
        self.audio_buffer = np.array([], dtype=np.float32)
        self.recording = False
        self.paused = False
        self.speech_active = False
        self.last_speech_time = time.time()
        self.lock = threading.Lock()
        self._start_time = None
        
        # Results storage
        self.results: List[Dict] = []
        self._results_html = ""
        
        # Build UI widgets
        self._build_ui()
        
    def _build_ui(self):
        """Build the widget UI."""
        # Status indicator
        self.status_label = widgets.HTML(
            value="<b style='color: gray;'>Ready</b>",
            layout=widgets.Layout(margin='5px 0')
        )
        
        # Timer display
        self.timer_label = widgets.HTML(
            value="<b>Time: 00:00</b>",
            layout=widgets.Layout(margin='5px 0')
        )
        
        # Recording indicator
        self.recording_indicator = widgets.HTML(
            value="",
            layout=widgets.Layout(margin='5px 0')
        )
        
        # Results counter
        self.results_counter = widgets.HTML(
            value="<b>Results: 0</b>",
            layout=widgets.Layout(margin='5px 0')
        )
        
        # Control buttons
        self.pause_button = widgets.Button(
            description='Pause',
            button_style='warning',
            layout=widgets.Layout(width='100px'),
            disabled=True
        )
        self.pause_button.on_click(self._on_pause_click)
        
        self.stop_button = widgets.Button(
            description='Stop',
            button_style='danger',
            layout=widgets.Layout(width='100px'),
            disabled=True
        )
        self.stop_button.on_click(self._on_stop_click)
        
        self.start_button = widgets.Button(
            description='Start',
            button_style='success',
            layout=widgets.Layout(width='100px')
        )
        self.start_button.on_click(self._on_start_click)
        
        self.clear_button = widgets.Button(
            description='Clear',
            button_style='',
            layout=widgets.Layout(width='100px')
        )
        self.clear_button.on_click(self._on_clear_click)
        
        # Results display area - use HTML widget instead of Output for thread safety
        self.results_html = widgets.HTML(
            value="",
            layout=widgets.Layout(
                height='400px',
                overflow_y='auto',
                border='1px solid #ccc',
                padding='10px',
                margin='10px 0'
            )
        )
        
        # Header section
        header = widgets.HBox([
            self.status_label,
            widgets.HTML(value=" | "),
            self.timer_label,
            widgets.HTML(value=" | "),
            self.results_counter,
            widgets.HTML(value=" "),
            self.recording_indicator
        ])
        
        # Button row
        button_row = widgets.HBox([
            self.start_button,
            self.pause_button,
            self.stop_button,
            self.clear_button
        ], layout=widgets.Layout(margin='10px 0'))
        
        # Main container
        self.main_widget = widgets.VBox([
            widgets.HTML(value="<h3>Real-time ASR + Classification</h3>"),
            header,
            button_row,
            widgets.HTML(value="<b>Results:</b>"),
            self.results_html
        ], layout=widgets.Layout(padding='10px', border='2px solid #333', border_radius='10px'))
    
    def _on_start_click(self, b):
        self.start()
    
    def _on_pause_click(self, b):
        if self.paused:
            self.resume()
        else:
            self.pause()
    
    def _on_stop_click(self, b):
        self.stop()
    
    def _on_clear_click(self, b):
        self.results = []
        self._results_html = ""
        self.results_html.value = ""
        self._update_results_counter()
        
    def _update_status(self, status: str, color: str = "gray"):
        self.status_label.value = f"<b style='color: {color};'>{status}</b>"
    
    def _update_timer(self, elapsed: float):
        mins, secs = divmod(int(elapsed), 60)
        self.timer_label.value = f"<b>Time: {mins:02d}:{secs:02d}</b>"
    
    def _update_results_counter(self):
        self.results_counter.value = f"<b>Results: {len(self.results)}</b>"
    
    def _update_recording_indicator(self, is_recording: bool):
        if is_recording:
            self.recording_indicator.value = "<span style='color: red;'>Recording...</span>"
        else:
            self.recording_indicator.value = ""
    
    def _render_result(self, result: Dict) -> str:
        """Render a single result as HTML string."""
        label = result['classification']['predicted_label']
        confidence = result['classification']['confidence']
        text = result['text']
        timestamp = result['timestamp']
        
        if label == 'hate':
            bg_color = "#ffdddd"
            label_color = "red"
        else:
            bg_color = "#ddffdd"
            label_color = "green"
        
        probs = result['classification']['probabilities']
        prob_str = " | ".join([f"{k}: {v:.1%}" for k, v in probs.items()])
        
        return f"""
        <div style='background: {bg_color}; padding: 8px; margin: 4px 0; border-radius: 6px; border-left: 3px solid {label_color};'>
            <div style='color: #666; font-size: 11px;'>@{timestamp:.1f}s</div>
            <div style='font-size: 14px; margin: 4px 0;'>{text}</div>
            <div style='font-size: 12px;'>
                <span style='color: {label_color}; font-weight: bold;'>{label.upper()}</span> ({confidence:.1%})
                <span style='color: #888; font-size: 11px;'> - {prob_str}</span>
            </div>
        </div>
        """
    
    def _update_results_display(self):
        """Update the results HTML widget with all results."""
        self.results_html.value = self._results_html
    
    def _add_message(self, msg: str, color: str = "gray"):
        """Add a status message to the results."""
        self._results_html += f"<div style='color: {color}; margin: 4px 0;'>{msg}</div>"
        self._update_results_display()
    
    def _transcribe_audio(self, audio: np.ndarray) -> str:
        """Transcribe audio array to text using ChunkFormer."""
        temp_filename = None
        try:
            with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
                temp_filename = f.name
                with wave.open(temp_filename, 'wb') as wf:
                    wf.setnchannels(self.config.CHANNELS)
                    wf.setsampwidth(2)
                    wf.setframerate(self.config.SAMPLE_RATE)
                    audio_int16 = (audio * 32767).astype(np.int16)
                    wf.writeframes(audio_int16.tobytes())
            
            text = asr_model.endless_decode(
                audio_path=temp_filename,
                chunk_size=64,
                left_context_size=128,
                right_context_size=128,
                total_batch_duration=14400,
                return_timestamps=False
            )
            return str(text).strip()
            
        except Exception as e:
            self._add_message(f"Transcription error: {e}", "red")
            return ""
        finally:
            if temp_filename and os.path.exists(temp_filename):
                os.remove(temp_filename)
    
    def _record_thread(self):
        """Background thread for audio recording."""
        p = pyaudio.PyAudio()
        stream = p.open(
            format=pyaudio.paInt16,
            channels=self.config.CHANNELS,
            rate=self.config.SAMPLE_RATE,
            input=True,
            input_device_index=self.config.INPUT_DEVICE_INDEX,
            frames_per_buffer=self.config.CHUNK_SIZE
        )
        
        threshold = self.calibrator.get_threshold()
        
        while self.recording:
            if self.paused:
                time.sleep(0.1)
                continue
                
            try:
                data = stream.read(self.config.CHUNK_SIZE, exception_on_overflow=False)
                audio_chunk = np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
                
                rms = np.sqrt(np.mean(audio_chunk ** 2))
                
                if rms > threshold:
                    self.last_speech_time = time.time()
                    if not self.speech_active:
                        self.speech_active = True
                        self._update_recording_indicator(True)
                
                if self.speech_active:
                    with self.lock:
                        self.audio_buffer = np.concatenate((self.audio_buffer, audio_chunk))
                    
                    if time.time() - self.last_speech_time > self.config.SILENCE_DURATION:
                        self.speech_active = False
                        self._update_recording_indicator(False)
                        
            except Exception as e:
                self._add_message(f"Recording error: {e}", "red")
                break
        
        stream.stop_stream()
        stream.close()
        p.terminate()
    
    def _process_thread(self):
        """Background thread for transcription and classification."""
        while self.recording:
            if self.paused:
                time.sleep(0.1)
                continue
                
            # Update timer
            if self._start_time:
                elapsed = time.time() - self._start_time
                self._update_timer(elapsed)
            
            with self.lock:
                buffer_duration = len(self.audio_buffer) / self.config.SAMPLE_RATE
            
            should_process = False
            
            if buffer_duration >= self.config.MAX_RECORDING_DURATION:
                should_process = True
            elif not self.speech_active and buffer_duration > 0.5:
                should_process = True
            
            if should_process:
                with self.lock:
                    audio_to_process = self.audio_buffer.copy()
                    self.audio_buffer = np.array([], dtype=np.float32)
                
                if len(audio_to_process) > 0:
                    self._update_status("Transcribing...", "blue")
                    
                    transcribed_text = self._transcribe_audio(audio_to_process)
                    
                    if transcribed_text:
                        self._update_status("Classifying...", "purple")
                        
                        classification = classify_text(transcribed_text)
                        
                        elapsed = time.time() - self._start_time
                        result = {
                            "timestamp": elapsed,
                            "text": transcribed_text,
                            "classification": classification
                        }
                        self.results.append(result)
                        
                        # Add result to HTML and update display
                        self._results_html += self._render_result(result)
                        self._update_results_display()
                        self._update_results_counter()
                    
                    self._update_status("Listening...", "green")
            
            time.sleep(0.1)
    
    def display(self):
        """Display the widget UI."""
        display(self.main_widget)
    
    def start(self):
        """Start recording."""
        if self.recording:
            return
            
        self.recording = True
        self.paused = False
        self.results = []
        self._results_html = ""
        self._start_time = time.time()
        self.audio_buffer = np.array([], dtype=np.float32)
        
        # Update UI
        self._update_status("Listening...", "green")
        self.start_button.disabled = True
        self.pause_button.disabled = False
        self.stop_button.disabled = False
        
        # Start threads
        self.record_thread = threading.Thread(target=self._record_thread, daemon=True)
        self.process_thread = threading.Thread(target=self._process_thread, daemon=True)
        self.record_thread.start()
        self.process_thread.start()
        
        self._add_message("Recording started...", "green")
    
    def pause(self):
        """Pause recording."""
        self.paused = True
        self._update_status("Paused", "orange")
        self._update_recording_indicator(False)
        self.pause_button.description = "Resume"
        self.pause_button.button_style = "success"
        self._add_message("Paused", "orange")
    
    def resume(self):
        """Resume recording."""
        self.paused = False
        self._update_status("Listening...", "green")
        self.pause_button.description = "Pause"
        self.pause_button.button_style = "warning"
        self._add_message("Resumed", "green")
    
    def stop(self) -> List[Dict]:
        """Stop recording and return results."""
        self.recording = False
        self._update_status("Stopped", "gray")
        self._update_recording_indicator(False)
        
        # Update UI
        self.start_button.disabled = False
        self.pause_button.disabled = True
        self.pause_button.description = "Pause"
        self.pause_button.button_style = "warning"
        self.stop_button.disabled = True
        
        if hasattr(self, 'record_thread'):
            self.record_thread.join(timeout=2.0)
        if hasattr(self, 'process_thread'):
            self.process_thread.join(timeout=2.0)
        
        hate_count = sum(1 for r in self.results if r['classification']['predicted_label'] == 'hate')
        clean_count = len(self.results) - hate_count
        self._results_html += f"""
            <div style='background: #f0f0f0; padding: 8px; margin-top: 8px; border-radius: 6px;'>
                <b>Session Complete</b> - Total: {len(self.results)} | Clean: {clean_count} | Hate: {hate_count}
            </div>
        """
        self._update_results_display()
        
        return self.results
    
    def get_results(self) -> List[Dict]:
        """Get all results."""
        return self.results.copy()

## 7. Initialize and Calibrate

In [17]:
# Initialize calibrator
calibrator = RobustNoiseCalibrator(config)

# List available devices
devices = calibrator.list_audio_devices()

# Set device index if needed (uncomment and modify)
# config.INPUT_DEVICE_INDEX = 0  # Change to your desired device index

Available Audio Input Devices:
  [0] Loopback: PCM (hw:0,0)
  [1] Loopback: PCM (hw:0,1)
  [6] sof-hda-dsp: - (hw:2,0)
  [10] sof-hda-dsp: - (hw:2,6)
  [11] sof-hda-dsp: - (hw:2,7)
  [13] sysdefault
  [15] surround21
  [21] lavrate
  [22] samplerate
  [23] speexrate
  [24] pipewire
  [25] pulse
  [26] speex
  [27] upmix
  [28] vdownmix
  [30] default


ALSA lib pcm.c:2722:(snd_pcm_open_noupdate) [error.pcm] Unknown PCM cards.pcm.rear
ALSA lib pcm.c:2722:(snd_pcm_open_noupdate) [error.pcm] Unknown PCM cards.pcm.center_lfe
ALSA lib pcm.c:2722:(snd_pcm_open_noupdate) [error.pcm] Unknown PCM cards.pcm.side
ALSA lib confmisc.c:1377:(snd_func_refer) [error.core] Unable to find definition 'cards.0.pcm.iec958.0:CARD=0,AES0=4,AES1=130,AES2=0,AES3=2'
ALSA lib conf.c:5207:(_snd_config_evaluate) [error.core] function snd_func_refer returned error: No such file or directory
ALSA lib conf.c:5730:(snd_config_expand) [error.core] Evaluate error: No such file or directory
ALSA lib pcm.c:2722:(snd_pcm_open_noupdate) [error.pcm] Unknown PCM iec958
ALSA lib confmisc.c:1377:(snd_func_refer) [error.core] Unable to find definition 'cards.0.pcm.iec958.0:CARD=0,AES0=4,AES1=130,AES2=0,AES3=2'
ALSA lib conf.c:5207:(_snd_config_evaluate) [error.core] function snd_func_refer returned error: No such file or directory
ALSA lib conf.c:5730:(snd_config_expand) [erro

In [18]:
# Perform noise calibration
noise_stats = calibrator.calibrate(show_progress=True)

ALSA lib pcm.c:2722:(snd_pcm_open_noupdate) [error.pcm] Unknown PCM cards.pcm.rear
ALSA lib pcm.c:2722:(snd_pcm_open_noupdate) [error.pcm] Unknown PCM cards.pcm.center_lfe
ALSA lib pcm.c:2722:(snd_pcm_open_noupdate) [error.pcm] Unknown PCM cards.pcm.side
ALSA lib confmisc.c:1377:(snd_func_refer) [error.core] Unable to find definition 'cards.0.pcm.iec958.0:CARD=0,AES0=4,AES1=130,AES2=0,AES3=2'
ALSA lib conf.c:5207:(_snd_config_evaluate) [error.core] function snd_func_refer returned error: No such file or directory
ALSA lib conf.c:5730:(snd_config_expand) [error.core] Evaluate error: No such file or directory
ALSA lib pcm.c:2722:(snd_pcm_open_noupdate) [error.pcm] Unknown PCM iec958
ALSA lib confmisc.c:1377:(snd_func_refer) [error.core] Unable to find definition 'cards.0.pcm.iec958.0:CARD=0,AES0=4,AES1=130,AES2=0,AES3=2'
ALSA lib conf.c:5207:(_snd_config_evaluate) [error.core] function snd_func_refer returned error: No such file or directory
ALSA lib conf.c:5730:(snd_config_expand) [erro

Noise Calibration - Stay silent for 3.0s...
  Progress: 100%
Calibration complete [OK]
  Noise RMS: 0.13669, Threshold: 0.10000


In [19]:
# Optional: Recalibrate if the environment changes
# noise_stats = calibrator.calibrate(show_progress=True)

## 8. Run Real-time ASR + Classification

Run the cell below to display the interactive widget with:
- **‚ñ∂Ô∏è Start** - Begin continuous recording
- **‚è∏Ô∏è Pause / ‚ñ∂Ô∏è Resume** - Temporarily pause/resume recording  
- **‚èπÔ∏è Stop** - Stop and show session summary
- **üóëÔ∏è Clear** - Clear all results

The system will automatically:
1. Detect when you start/stop speaking
2. Transcribe speech segments (max 30s each)
3. Classify each segment and display the result

In [20]:
# Create the pipeline with widget UI
pipeline = RealtimeASRClassifierWidget(config, calibrator)

# Display the interactive widget
# Use the Start/Pause/Stop buttons to control recording
# Results will appear in the scrollable area below
pipeline.display()

VBox(children=(HTML(value='<h3>Real-time ASR + Classification</h3>'), HBox(children=(HTML(value="<b style='col‚Ä¶

## 10. Test Classification on Custom Text

In [23]:
# Test the classifier on sample texts
test_texts = [
    "Xin ch√†o, h√¥m nay tr·ªùi ƒë·∫πp qu√°",
    "C·∫£m ∆°n b·∫°n r·∫•t nhi·ªÅu",
    "T√¥i r·∫•t vui ƒë∆∞·ª£c g·∫∑p b·∫°n",
    "M√†y ngu"
]

print("Testing Text Classification")
print("-" * 50)
for text in test_texts:
    result = classify_text(text)
    label = result['predicted_label']
    confidence = result['confidence']
    
    print(f"Text: {text}")
    print(f"  -> {label} ({confidence:.1%})\n")

Testing Text Classification
--------------------------------------------------
Text: Xin ch√†o, h√¥m nay tr·ªùi ƒë·∫πp qu√°
  -> clean (100.0%)

Text: C·∫£m ∆°n b·∫°n r·∫•t nhi·ªÅu
  -> clean (100.0%)

Text: T√¥i r·∫•t vui ƒë∆∞·ª£c g·∫∑p b·∫°n
  -> clean (100.0%)

Text: M√†y ngu
  -> hate (99.9%)



In [22]:
# Interactive text classification
def classify_interactive(text):
    if text.strip():
        result = classify_text(text)
        label = result['predicted_label']
        confidence = result['confidence']
        probs = result['probabilities']
        
        print(f"{label.upper()} ({confidence:.1%})")
        print(f"Probs: {' | '.join([f'{k}: {v:.1%}' for k, v in probs.items()])}")

text_input = widgets.Text(
    placeholder='Enter Vietnamese text to classify...',
    description='Text:',
    layout=widgets.Layout(width='80%')
)

classify_button = widgets.Button(description='Classify', button_style='primary')
output = widgets.Output()

def on_button_click(b):
    with output:
        clear_output()
        classify_interactive(text_input.value)

classify_button.on_click(on_button_click)
display(widgets.VBox([text_input, classify_button, output]))

VBox(children=(Text(value='', description='Text:', layout=Layout(width='80%'), placeholder='Enter Vietnamese t‚Ä¶