# Interrupt Handling Logic ‚Äî Colab Demo
This notebook implements the interruption-handling logic required by the assignment:
- Distinguish *soft acknowledgements* (e.g., "yeah", "okay") from *interrupt commands* (e.g., "stop", "wait").
- While the agent is speaking: ignore soft-acks; interrupt on commands or mixed sentences (e.g., "yeah wait").
- While the agent is silent: accept user utterances as normal input.
- Simulates VAD + STT timing and demonstrates behavior with example scenarios.

Run the Python cells below to see the behavior and sample logs.


In [1]:
import re, time, threading
from typing import Optional, Callable, Dict

class STTResult:
    def __init__(self, text: str, vad_id: Optional[str] = None, confidence: Optional[float] = None):
        self.text = text
        self.vad_id = vad_id
        self.confidence = confidence

class InterruptHandler:
    """
    Logic layer for interruption handling:
    - Soft-ack ignore when speaking
    - Accept input when silent
    - Mixed-sentence or command -> interrupt
    - Does NOT modify VAD kernel; sits in event loop
    """

    def __init__(self,
                 soft_ack_list=None,
                 interrupt_keywords=None,
                 stt_wait_ms: int = 120,
                 on_interrupt=None,
                 on_ignore_while_speaking=None,
                 on_accept_while_silent=None,
                 logger=lambda m: print(m)):

        if soft_ack_list is None:
            soft_ack_list = ["yeah","ok","okay","hmm","uh-huh","right","yep","mm","mhm"]
        if interrupt_keywords is None:
            interrupt_keywords = ["stop","wait","no","pause","hold","cancel","cut"]

        self.soft_ack_set = set([s.lower().strip() for s in soft_ack_list])
        self.interrupt_set = set([s.lower().strip() for s in interrupt_keywords])
        self.stt_wait_ms = max(50, stt_wait_ms)
        self.agent_speaking = False

        self.on_interrupt = on_interrupt
        self.on_ignore_while_speaking = on_ignore_while_speaking
        self.on_accept_while_silent = on_accept_while_silent
        self.logger = logger

        self._pending_timers: Dict[str, threading.Timer] = {}
        self._stt_map: Dict[str, STTResult] = {}
        self._lock = threading.Lock()

    def set_agent_speaking(self, speaking: bool):
        self.agent_speaking = speaking
        self.logger(f"[STATE] agent_speaking={speaking}")

    def on_vad(self, vad_id: str):
        self.logger(f"[VAD] Received vad_id={vad_id}")
        with self._lock:
            if vad_id in self._pending_timers:
                self._pending_timers[vad_id].cancel()

            t = threading.Timer(self.stt_wait_ms/1000.0, lambda: self._evaluate(vad_id))
            self._pending_timers[vad_id] = t
            t.start()

    def on_stt(self, stt: STTResult):
        vad_id = stt.vad_id or f"vad_{int(time.time()*1000)}"
        self.logger(f"[STT] vad_id={vad_id}, text='{stt.text}'")

        with self._lock:
            self._stt_map[vad_id] = stt

            if vad_id not in self._pending_timers:
                threading.Thread(target=lambda: self._evaluate(vad_id)).start()
                return

            if self.agent_speaking and self._contains_interrupt(stt.text):
                self._pending_timers[vad_id].cancel()
                del self._pending_timers[vad_id]
                threading.Thread(target=lambda: self._evaluate(vad_id)).start()

    def _evaluate(self, vad_id):
        with self._lock:
            if vad_id in self._pending_timers:
                self._pending_timers[vad_id].cancel()
                del self._pending_timers[vad_id]

            stt = self._stt_map.get(vad_id, None)

        if not stt or not stt.text.strip():
            if self.agent_speaking:
                self.logger("[DECISION] IGNORE (no STT, agent speaking)")
                if self.on_ignore_while_speaking: self.on_ignore_while_speaking(None)
            else:
                self.logger("[DECISION] IGNORE (no STT, agent silent)")
            return

        txt = stt.text.lower().strip()

        if self.agent_speaking:
            if self._contains_interrupt(txt):
                self.logger("[DECISION] INTERRUPT (command detected)")
                if self.on_interrupt: self.on_interrupt(stt)
                return

            if self._is_only_soft(txt):
                self.logger("[DECISION] IGNORE (soft ack)")
                if self.on_ignore_while_speaking: self.on_ignore_while_speaking(stt)
                return

            if self._contains_non_soft(txt):
                self.logger("[DECISION] INTERRUPT (mixed sentence)")
                if self.on_interrupt: self.on_interrupt(stt)
                return

            self.logger("[DECISION] IGNORE (fallback)")
            if self.on_ignore_while_speaking: self.on_ignore_while_speaking(stt)
            return

        self.logger("[DECISION] ACCEPT (agent silent)")
        if self.on_accept_while_silent: self.on_accept_while_silent(stt)

    def _tokenize(self, txt):
        return [t for t in re.split(r"[^a-z0-9\-]+", txt) if t]

    def _is_only_soft(self, txt):
        return all(t in self.soft_ack_set for t in self._tokenize(txt))

    def _contains_interrupt(self, txt):
        return any(t in self.interrupt_set for t in self._tokenize(txt))

    def _contains_non_soft(self, txt):
        return any(t not in self.soft_ack_set for t in self._tokenize(txt))


In [2]:
def cb_interrupt(stt):
    print(">>> ACTION: INTERRUPT ‚Äî stop speaking. Text:", repr(stt.text if stt else ""))

def cb_ignore(stt):
    print(">>> ACTION: IGNORE during agent speech. Text:", repr(stt.text if stt else ""))

def cb_accept(stt):
    print(">>> ACTION: ACCEPT user input (agent silent). Text:", repr(stt.text if stt else ""))

handler = InterruptHandler(
    stt_wait_ms=120,
    on_interrupt=cb_interrupt,
    on_ignore_while_speaking=cb_ignore,
    on_accept_while_silent=cb_accept,
    logger=lambda m: print(m)
)

print("Handler initialized.")


Handler initialized.


In [4]:
from IPython.display import Javascript, HTML, display
from google.colab import output
from base64 import b64decode
import os

def _save_audio_named(fname_b64):
    fname, b64_audio = fname_b64
    data = b64_audio.split(',')[1]
    with open(fname, "wb") as f:
        f.write(b64decode(data))
    print(f"[Colab] saved {fname}")

output.register_callback("notebook.save_audio_named", _save_audio_named)

js = """
(async () => {
  function recorder(label, fname) {
    const box = document.createElement('div');
    box.style="padding:10px;margin:5px;border:1px solid #ccc";

    const rec = document.createElement('button'); rec.innerText = "Record " + label;
    const stop = document.createElement('button'); stop.innerText = "Stop"; stop.disabled = true;

    box.appendChild(rec); box.appendChild(stop);

    let mediaRecorder; let chunks=[];

    rec.onclick = async () => {
      const stream = await navigator.mediaDevices.getUserMedia({audio:true});
      chunks=[];
      mediaRecorder = new MediaRecorder(stream);
      mediaRecorder.ondataavailable = e => chunks.push(e.data);
      mediaRecorder.onstop = () => {
        const blob = new Blob(chunks, {type: 'audio/webm'});
        const reader = new FileReader();
        reader.onloadend = () => {
          google.colab.kernel.invokeFunction(
            'notebook.save_audio_named',
            [[fname, reader.result]],
            {}
          );
        };
        reader.readAsDataURL(blob);
      };
      mediaRecorder.start();
      rec.disabled=true; stop.disabled=false;
    };

    stop.onclick = () => {
      if (mediaRecorder && mediaRecorder.state !== 'inactive') mediaRecorder.stop();
      rec.disabled=false; stop.disabled=true;
    };

    return box;
  }

  const root=document.createElement('div');
  root.appendChild(recorder("Clip 1 (say 'yeah' while agent speaking)", "clip1.wav"));
  root.appendChild(recorder("Clip 2 (say 'yeah' while agent silent)", "clip2.wav"));
  root.appendChild(recorder("Clip 3 (say 'stop')", "clip3.wav"));
  document.body.appendChild(root);
})();
"""

display(HTML("<h3>üé§ Record your three clips below</h3>"))
display(Javascript(js))


<IPython.core.display.Javascript object>

[Colab] saved clip1.wav
[Colab] saved clip2.wav
[Colab] saved clip3.wav


In [5]:
import subprocess

clips = ["clip1.wav","clip2.wav","clip3.wav"]

for fname in clips:
    if not os.path.exists(fname):
        print(fname, "not recorded yet.")
        continue
    out = fname.replace(".wav","_conv.wav")

    cmd = ["ffmpeg","-y","-i",fname,"-ar","16000","-ac","1","-acodec","pcm_s16le",out]
    print("Converting", fname, "‚Üí", out)
    subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print("Done.")


Converting clip1.wav ‚Üí clip1_conv.wav
Done.
Converting clip2.wav ‚Üí clip2_conv.wav
Done.
Converting clip3.wav ‚Üí clip3_conv.wav
Done.


In [6]:
!pip install --quiet SpeechRecognition

import speech_recognition as sr
from IPython.display import Audio, display
import os, time, re

r = sr.Recognizer()
log_lines = []

for fname in ["clip1_conv.wav","clip2_conv.wav","clip3_conv.wav"]:
    if not os.path.exists(fname):
        print(fname, "is missing.")
        continue

    print("\n---- Processing", fname)
    display(Audio(fname))

    with sr.AudioFile(fname) as source:
        audio = r.record(source)

    try:
        text = r.recognize_google(audio)
        print("[STT] ‚Üí", text)
    except:
        text = ""
        print("[STT] Could not understand.")

    vad_id = f"vad_{fname}_{int(time.time()*1000)}"

    if "clip1" in fname:
        handler.set_agent_speaking(True)
    elif "clip2" in fname:
        handler.set_agent_speaking(False)
    elif "clip3" in fname:
        handler.set_agent_speaking(True)

    handler.on_vad(vad_id)
    time.sleep(0.05)
    handler.on_stt(STTResult(text=text, vad_id=vad_id))

    decision = "UNKNOWN"
    t = text.lower().strip()
    tokens = re.split(r"[^a-z0-9\-]+", t)

    if handler.agent_speaking:
        if any(tok in handler.interrupt_set for tok in tokens):
            decision="INTERRUPT"
        elif all(tok in handler.soft_ack_set for tok in tokens if tok):
            decision="IGNORE"
        else:
            decision="INTERRUPT"
    else:
        decision="ACCEPT"

    line = f"{time.strftime('%Y-%m-%d %H:%M:%S')} | {fname} | speaking={handler.agent_speaking} | stt='{text}' | decision={decision}"
    print("LOG:", line)
    log_lines.append(line)

with open("log.txt","w") as f:
    f.write("\n".join(log_lines))

print("\nSaved log.txt:")
print("\n".join(log_lines))


[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m32.9/32.9 MB[0m [31m47.3 MB/s[0m eta [36m0:00:00[0m
[?25h
---- Processing clip1_conv.wav


[STT] ‚Üí yeah
[STATE] agent_speaking=True
[VAD] Received vad_id=vad_clip1_conv.wav_1764780736634
[STT] vad_id=vad_clip1_conv.wav_1764780736634, text='yeah'
LOG: 2025-12-03 16:52:16 | clip1_conv.wav | speaking=True | stt='yeah' | decision=IGNORE

---- Processing clip2_conv.wav


[DECISION] IGNORE (soft ack)
>>> ACTION: IGNORE during agent speech. Text: 'yeah'
[STT] ‚Üí yeah
[STATE] agent_speaking=False
[VAD] Received vad_id=vad_clip2_conv.wav_1764780737206
[STT] vad_id=vad_clip2_conv.wav_1764780737206, text='yeah'
LOG: 2025-12-03 16:52:17 | clip2_conv.wav | speaking=False | stt='yeah' | decision=ACCEPT

---- Processing clip3_conv.wav


[DECISION] ACCEPT (agent silent)
>>> ACTION: ACCEPT user input (agent silent). Text: 'yeah'
[STT] ‚Üí yeah
[STATE] agent_speaking=True
[VAD] Received vad_id=vad_clip3_conv.wav_1764780737779
[STT] vad_id=vad_clip3_conv.wav_1764780737779, text='yeah'
LOG: 2025-12-03 16:52:17 | clip3_conv.wav | speaking=True | stt='yeah' | decision=IGNORE

Saved log.txt:
2025-12-03 16:52:16 | clip1_conv.wav | speaking=True | stt='yeah' | decision=IGNORE
2025-12-03 16:52:17 | clip2_conv.wav | speaking=False | stt='yeah' | decision=ACCEPT
2025-12-03 16:52:17 | clip3_conv.wav | speaking=True | stt='yeah' | decision=IGNORE
