In [1]:
import sys
import os
import numpy as np
import tensorflow as tf
import librosa
import sounddevice as sd
import queue
import threading
from datetime import datetime
from IPython.display import clear_output

# Add Helpers to path
sys.path.append(os.path.abspath('Model'))
import model_utils as mu

# --- CONFIGURATION ---

In [2]:
SR = mu.SR                  # 22050
DURATION = mu.DURATION      # 1.0
N_MELS = mu.N_MELS          # 128
FMAX = mu.FMAX              # 8000
HOP_LENGTH = mu.HOP_LENGTH  # 512

# App parameters
UPDATE_INTERVAL = 0.5       # How often to run prediction (in seconds)
BUFFER_SIZE = int(SR * DURATION) # Samples to keep in memory (1 seconds)
MODEL_PATH = "Model/results/model_Baseline_Adam.keras"

# --- 1. LOAD MODEL ---

In [3]:
if not os.path.exists(MODEL_PATH):
    raise FileNotFoundError(f"Model not found at {MODEL_PATH}. Please run training first!")

print(f"Loading model from: {MODEL_PATH}...")
model = tf.keras.models.load_model(MODEL_PATH)

# Warmup prediction
dummy_input = np.zeros((1, 128, 130, 1))
_ = model.predict(dummy_input, verbose=0)
print("Model loaded and ready!")

Loading model from: Model/results/model_Baseline_Adam.keras...
Model loaded and ready!


# --- 2. RECORDING FUNCTION ---
We use a thread-safe queue or a simple ring buffer.
Since we need a rolling window of numpy data, a manual RingBuffer is best.

In [4]:
class AudioRingBuffer:
    def __init__(self, size):
        self.size = size
        self.buffer = np.zeros(size, dtype=np.float32)
        self.lock = threading.Lock()

    def extend(self, new_data):
        """Adds new data to the end, pushing old data out."""
        with self.lock:
            # Flatten just in case
            new_data = new_data.flatten()
            n = len(new_data)

            if n >= self.size:
                # If new data is bigger than buffer, just replace it
                self.buffer = new_data[-self.size:]
            else:
                # Shift buffer left by n
                self.buffer = np.roll(self.buffer, -n)
                # Overwrite the end with new data
                self.buffer[-n:] = new_data

    def get(self):
        """Returns the current buffer content."""
        with self.lock:
            return self.buffer.copy()

# Initialize global buffer
audio_buffer = AudioRingBuffer(BUFFER_SIZE)

# --- 3. INPUT STREAM CALLBACK ---

In [5]:
def callback(indata, frames, time, status):
    """This function is called by sounddevice for every audio block."""
    if status:
        print(status, file=sys.stderr)
    # Add incoming audio to our rolling buffer
    audio_buffer.extend(indata)

# --- 4. PREDICTION LOOP ---

In [6]:
def run_continuous_monitoring():
    print("\n" + "="*50)
    print("   CONTINUOUS VOICE SECURITY SYSTEM - ACTIVE   ")
    print("="*50)
    print(f"Listening... (Press 'Interrupt' or Stop button to quit)")

    # Start the audio stream in the background
    # channels=1 (Mono), blocksize can be auto or fixed (e.g. 2048)
    stream = sd.InputStream(
        samplerate=SR,
        channels=1,
        callback=callback,
        blocksize=int(SR * UPDATE_INTERVAL) # Pass chunks of UPDATE_INTERVAL size
    )

    with stream:
        try:
            while True:
                # 1. Get latest audio window (e.g., last 3 seconds)
                raw_audio = audio_buffer.get()

                # Check if buffer is actually filled (not silence at start)
                # Simple check: if max amplitude is very low, it might be silence/initialization
                if np.max(np.abs(raw_audio)) < 0.005:
                    label = "Silence / Initializing..."
                    confidence = 0.0
                    bar = "..."
                else:
                    # 2. Preprocess
                    # Normalize & Trim using your utils
                    processed_audio = mu.preprocess_audio(raw_audio, sr=SR)

                    # 3. Spectrogram
                    mel_spec = librosa.feature.melspectrogram(
                        y=processed_audio, sr=SR, n_mels=N_MELS, fmax=FMAX, hop_length=HOP_LENGTH
                    )
                    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)

                    # 4. Reshape (1, 128, 130, 1)
                    # Note: We enforce the exact shape the model expects
                    target_width = 130
                    if mel_spec_db.shape[1] < target_width:
                        # Pad width if too short
                        mel_spec_db = np.pad(mel_spec_db, ((0,0), (0, target_width - mel_spec_db.shape[1])))
                    else:
                        # Crop width if too long
                        mel_spec_db = mel_spec_db[:, :target_width]

                    model_input = mel_spec_db.reshape(1, N_MELS, target_width, 1)

                    # 5. Predict
                    prediction_prob = model.predict(model_input, verbose=0)[0][0]

                    # 6. Display Logic
                    if prediction_prob > 0.5:
                        label = "ACCEPTED (ACCESS GRANTED)"
                        confidence = prediction_prob
                        color = "\033[92m" # Green
                    else:
                        label = "REJECTED (ACCESS DENIED) "
                        confidence = 1 - prediction_prob
                        color = "\033[91m" # Red

                    reset = "\033[0m"

                    # Create a visual confidence bar
                    bar_len = 20
                    filled = int(confidence * bar_len)
                    bar = "[" + "="*filled + " "*(bar_len-filled) + "]"

                    # Clear previous output to create animation effect
                    clear_output(wait=True)

                    print("="*50)
                    print(f"üì° STATUS: MONITORING MICROPHONE")
                    print(f"‚è∞ {datetime.now().strftime('%H:%M:%S.%f')[:-4]}")
                    print("-" * 50)
                    print(f"{color}RESULT: {label} {reset}")
                    print(f"CONFIDENCE: {confidence:.1%} {bar}")
                    print("-" * 50)

                # Wait for next update
                sd.sleep(int(UPDATE_INTERVAL * 1000))

        except KeyboardInterrupt:
            print("\nüõë Monitoring stopped by user.")

# --- 5. START ---
 Run the continuous monitor

In [None]:
run_continuous_monitoring()


   CONTINUOUS VOICE SECURITY SYSTEM - ACTIVE   
Listening... (Press 'Interrupt' or Stop button to quit)
