# Voice Recognition Security System
Interface for voice-based access control using ML classification.

In [None]:
import os
import torch
import torch.nn as nn
import librosa
import numpy as np
from PIL import Image
from IPython.display import Audio, display
from torchvision import transforms
import tkinter as tk
from tkinter import filedialog
import sounddevice as sd
import scipy.io.wavfile as wav
import time
from datetime import datetime
from resample_audio_and_clear_of_noise import re_sample_audio, is_valid_wav_file
from torchvision.models import resnet18
from silence_removal import process_audio_file
from create_spectrogram import process_audio_file as spectrogram_process
from df.enhance import enhance, init_df, load_audio, save_audio
from ipywidgets import Button, Output, HBox, VBox, Label

# Constants
LOCATORS_SPEAKERS_LIST = ["f1", "f7", "f8", "m3", "m6", "m8"]
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
MODEL_PATH = 'trained_model9.pth'
SAMPLE_RATE = 48000  # Sample rate for recording

# Initialize DeepFilter for noise reduction
model_df, df_state, _ = init_df()

# Transform for spectrogram processing
spec_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
def initialize_model():
    """Initialize and load the ResNet18 model"""
    model = resnet18()
    model.fc = nn.Linear(model.fc.in_features, len(LOCATORS_SPEAKERS_LIST) + 1)
    state_dict = torch.load(MODEL_PATH, map_location=DEVICE)
    model.load_state_dict(state_dict)
    model.to(DEVICE)
    model.eval()
    return model

# Initialize the classification model
classification_model = initialize_model()

In [None]:
def delete_noise_for_file(audio_path, model, df_state):
    """Process and remove noise from a single audio file."""
    try:
        if not is_valid_wav_file(audio_path):
            print(f"Skipping invalid WAV file: {audio_path}")
            return
        
        audio, _ = load_audio(audio_path, sr=df_state.sr())
        enhanced = enhance(model, df_state, audio)
        
        enhanced_audio_path = audio_path.replace('.wav', '_enhanced.wav') 
        save_audio(enhanced_audio_path, enhanced, df_state.sr())
        
        print(f"Processed: {audio_path}")
        return enhanced_audio_path
    except Exception as e:
        print(f"Error processing {audio_path}: {e}")

def classify_segment_from_path(spectrogram_path, model):
    """Classify a spectrogram image from a file path using the ResNet model."""
    try:
        spec_image = Image.open(spectrogram_path).convert('RGB')
        spec_tensor = spec_transform(spec_image).unsqueeze(0).to(DEVICE)
        
        with torch.no_grad():
            output = model(spec_tensor)
            probs = torch.nn.functional.softmax(output, dim=1)
            pred_idx = torch.argmax(probs, dim=1).item()
            confidence = probs[0][pred_idx].item() * 100
        
        return pred_idx, confidence
    
    except Exception as e:
        print(f"Error during classification: {e}")
        return None, None

In [None]:
def process_file(file_path):
    """Process an audio file through the complete pipeline: enhance, split, and classify."""
    try:
        print("Starting audio processing...")
        
        # Resample the audio
        print("Resampling audio...")
        resampled_path = re_sample_audio(file_path)
        
        # Remove noise
        print("Removing noise...")
        enhanced_path = delete_noise_for_file(resampled_path, model_df, df_state)
        
        # Split into segments
        print("Splitting into segments...")
        audio_paths = process_audio_file(enhanced_path)
        
        # Process each segment
        authorized_count = 0
        total_segments = 0
        
        print("\nAnalyzing segments:")
        for audio_path in audio_paths:
            spectrogram_path = spectrogram_process(audio_path, "temp")
            predicted_class, confidence = classify_segment_from_path(spectrogram_path, classification_model)
            
            if predicted_class is not None:
                total_segments += 1
                speaker = LOCATORS_SPEAKERS_LIST[predicted_class] if predicted_class < 6 else "Unauthorized"
                print(f"Segment {total_segments}: {speaker} (Confidence: {confidence:.2f}%)")
                
                if predicted_class < 6:
                    authorized_count += 1
        
        # Make final decision
        if total_segments > 0:
            print("\nAccess Decision:")
            if authorized_count > total_segments / 2:
                print("✅ ACCESS GRANTED")
            else:
                print("❌ ACCESS DENIED")
            print(f"Authorized segments: {authorized_count}/{total_segments}")
            
            # Play the original audio
            display(Audio(file_path))
        else:
            print("No valid segments found for analysis")
            
    except Exception as e:
        print(f"Error in processing: {str(e)}")

In [None]:
upload_button = Button(description='Upload and Process WAV File')
record_button = Button(description='Record Audio')
stop_button = Button(description='Stop Recording')
stop_button.disabled = True
status_label = Label(value='Ready to record or upload file')
output = Output()

# Recording variables
recording = False

def record_audio():
    """Record audio from the microphone"""
    global recording
    recording = True
    
    # Generate unique filename based on timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f'recording_{timestamp}.wav'
    
    # Update UI
    record_button.disabled = True
    stop_button.disabled = False
    status_label.value = 'Recording... Press Stop when done'
    
    # Start recording
    audio_data = []
    
    def callback(indata, frames, time, status):
        if recording:
            audio_data.append(indata.copy())
    
    with sd.InputStream(samplerate=SAMPLE_RATE, channels=1, callback=callback):
        while recording:
            time.sleep(0.1)
    
    # Combine all audio chunks and save
    if audio_data:
        audio = np.concatenate(audio_data)
        wav.write(filename, SAMPLE_RATE, audio)
        print(f"Recording saved as {filename}")
        process_file(filename)
    
    # Reset UI
    record_button.disabled = False
    stop_button.disabled = True

def stop_recording(b):
    """Stop the current recording"""
    global recording
    recording = False
    
def start_recording(b):
    """Start a new recording in a separate thread"""
    import threading
    thread = threading.Thread(target=record_audio)
    thread.start()

def on_button_click(b):
    """Handle file upload button click"""
    with output:
        output.clear_output()
        root = tk.Tk()
        root.withdraw()
        file_path = filedialog.askopenfilename(
            title='Select WAV File',
            filetypes=[('WAV files', '*.wav')]
        )
        if file_path:
            process_file(file_path)

# Connect button handlers
upload_button.on_click(on_button_click)
record_button.on_click(start_recording)
stop_button.on_click(stop_recording)

# Create layout
buttons = HBox([upload_button, record_button, stop_button])
controls = VBox([buttons, status_label])
display(controls, output)