In [3]:
import numpy as np
import librosa
import soundfile as sf
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
import joblib
import os
import matplotlib.pyplot as plt
import pyaudio
import wave
import threading
import time
import queue
from datetime import datetime
import matplotlib.animation as animation
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import tkinter as tk
from tkinter import ttk
from tkinter import Scale, HORIZONTAL

class RealtimeSpeechEmotionRecognizer:
    def __init__(self, model_path=None, scaler_path=None):
        """
        Initialize the Real-time Speech Emotion Recognition model
        
        Parameters:
        -----------
        model_path : str, optional
            Path to a saved model file to load (default: None)
        scaler_path : str, optional
            Path to a saved scaler file to load (default: None)
        """
        self.emotions = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
        self.emotion_colors = {
            'angry': 'red',
            'disgust': 'brown',
            'fear': 'purple',
            'happy': 'green',
            'neutral': 'gray',
            'sad': 'blue',
            'surprise': 'orange'
        }
        
        # Audio recording parameters
        self.format = pyaudio.paFloat32
        self.channels = 1
        self.rate = 16000  # sample rate
        self.chunk = 512  # smaller chunks for faster response
        self.record_seconds = 1.5  # shorter window for quicker response
        
        # Sensitivity parameters
        self.sensitivity = 1.2  # Amplification factor (higher = more sensitive)
        self.threshold = 0.01  # Lower threshold for sound detection (lower = more sensitive)
        self.smoothing_factor = 0.3  # Lower value means less smoothing (0-1)
        
        # Recording state
        self.is_recording = False
        self.audio_queue = queue.Queue()
        self.emotion_history = []
        self.current_emotion = "neutral"
        self.current_probabilities = {emotion: 0.0 for emotion in self.emotions}
        self.previous_probabilities = {emotion: 0.0 for emotion in self.emotions}
        
        # Initialize PyAudio
        self.audio = pyaudio.PyAudio()
        
        # Create or load the model
        if model_path and os.path.exists(model_path):
            self._load_model(model_path, scaler_path)
        else:
            self.model = MLPClassifier(
                hidden_layer_sizes=(256, 128, 64),
                activation='relu',
                solver='adam',
                alpha=0.0001,
                batch_size=256,
                learning_rate='adaptive',
                max_iter=300,
                early_stopping=True,
                verbose=True
            )
            self.scaler = StandardScaler()
            print("New model created. You need to train it before use.")
    
    def set_sensitivity(self, value):
        """Set the sensitivity amplification factor"""
        self.sensitivity = value
        print(f"Sensitivity set to: {value}")
    
    def set_threshold(self, value):
        """Set the audio detection threshold"""
        self.threshold = value
        print(f"Threshold set to: {value}")
    
    def set_smoothing(self, value):
        """Set the smoothing factor for emotion transitions"""
        self.smoothing_factor = value
        print(f"Smoothing set to: {value}")
        
    def extract_features(self, audio_data, sample_rate=16000, n_mfcc=13, n_mels=40, n_fft=1024, hop_length=256):
        """
        Extract acoustic features from audio data with increased sensitivity
        
        Parameters:
        -----------
        audio_data : np.ndarray
            Audio data
        sample_rate : int
            Sample rate of the audio
        n_mfcc : int
            Number of MFCCs to extract
        n_mels : int
            Number of Mel bands
        n_fft : int
            FFT window size (smaller for better time resolution)
        hop_length : int
            Number of samples between frames (smaller for more features)
            
        Returns:
        --------
        np.ndarray
            Feature vector
        """
        try:
            # Amplify the signal to increase sensitivity
            audio_data = audio_data * self.sensitivity
            
            # Check if audio level is above threshold
            if np.abs(audio_data).mean() < self.threshold:
                return None
            
            # Trim silent parts with higher top_db (less aggressive trimming)
            audio_data, _ = librosa.effects.trim(audio_data, top_db=15)
            
            # Extract features
            # MFCCs with more coefficients for better detail
            mfccs = librosa.feature.mfcc(y=audio_data, sr=sample_rate, n_mfcc=n_mfcc+2, n_fft=n_fft, hop_length=hop_length)
            mfccs_mean = np.mean(mfccs.T, axis=0)
            mfccs_std = np.std(mfccs.T, axis=0)
            mfccs_delta = librosa.feature.delta(mfccs)  # Add delta features for better sensitivity
            mfccs_delta_mean = np.mean(mfccs_delta.T, axis=0)
            
            # Mel spectrogram with more bands
            mel = librosa.feature.melspectrogram(y=audio_data, sr=sample_rate, n_mels=n_mels+10, n_fft=n_fft, hop_length=hop_length)
            mel_mean = np.mean(librosa.power_to_db(mel).T, axis=0)
            mel_std = np.std(librosa.power_to_db(mel).T, axis=0)
            
            # Root Mean Square Energy with smaller frame size
            rmse = librosa.feature.rms(y=audio_data, hop_length=hop_length)[0]
            rmse_mean = np.mean(rmse)
            rmse_std = np.std(rmse)
            
            # Zero Crossing Rate
            zcr = librosa.feature.zero_crossing_rate(audio_data, hop_length=hop_length)[0]
            zcr_mean = np.mean(zcr)
            zcr_std = np.std(zcr)
            
            # Spectral centroid
            centroid = librosa.feature.spectral_centroid(y=audio_data, sr=sample_rate, n_fft=n_fft, hop_length=hop_length)[0]
            centroid_mean = np.mean(centroid)
            centroid_std = np.std(centroid)
            
            # Spectral contrast with more bands
            contrast = librosa.feature.spectral_contrast(y=audio_data, sr=sample_rate, n_fft=n_fft, hop_length=hop_length)
            contrast_mean = np.mean(contrast.T, axis=0)
            contrast_std = np.std(contrast.T, axis=0)
            
            # Chroma features with more pitch classes
            chroma = librosa.feature.chroma_stft(y=audio_data, sr=sample_rate, n_fft=n_fft, hop_length=hop_length)
            chroma_mean = np.mean(chroma.T, axis=0)
            chroma_std = np.std(chroma.T, axis=0)
            
            # Spectral roll-off (additional feature)
            rolloff = librosa.feature.spectral_rolloff(y=audio_data, sr=sample_rate, n_fft=n_fft, hop_length=hop_length)[0]
            rolloff_mean = np.mean(rolloff)
            rolloff_std = np.std(rolloff)
            
            # Concatenate all features
            features = np.concatenate([
                mfccs_mean, mfccs_std, mfccs_delta_mean,
                mel_mean, mel_std,
                [rmse_mean, rmse_std],
                [zcr_mean, zcr_std],
                [centroid_mean, centroid_std],
                contrast_mean, contrast_std,
                chroma_mean, chroma_std,
                [rolloff_mean, rolloff_std]
            ])
            
            return features
            
        except Exception as e:
            print(f"Error extracting features: {str(e)}")
            return None
    
    def save_model(self, model_path, scaler_path):
        """
        Save the trained model and scaler
        
        Parameters:
        -----------
        model_path : str
            Path to save the model
        scaler_path : str
            Path to save the scaler
        """
        joblib.dump(self.model, model_path)
        
        if self.scaler:
            joblib.dump(self.scaler, scaler_path)
        
        print(f"Model saved to {model_path} and scaler saved to {scaler_path}")
    
    def _load_model(self, model_path, scaler_path=None):
        """
        Load a trained model and scaler
        
        Parameters:
        -----------
        model_path : str
            Path to the saved model
        scaler_path : str, optional
            Path to the saved scaler (default: None)
        """
        try:
            self.model = joblib.load(model_path)
            print(f"Model loaded from {model_path}")
            
            if scaler_path and os.path.exists(scaler_path):
                self.scaler = joblib.load(scaler_path)
                print(f"Scaler loaded from {scaler_path}")
            else:
                self.scaler = StandardScaler()
                print("No scaler found. Using a new StandardScaler.")
        except Exception as e:
            print(f"Error loading model: {str(e)}")
            self.model = MLPClassifier(
                hidden_layer_sizes=(256, 128, 64),
                activation='relu',
                solver='adam'
            )
            self.scaler = StandardScaler()
    
    def predict_emotion(self, audio_data):
        """
        Predict emotion from audio data with smoothing
        
        Parameters:
        -----------
        audio_data : np.ndarray
            Audio data
            
        Returns:
        --------
        tuple
            (predicted_emotion, probabilities_dict)
        """
        features = self.extract_features(audio_data)
        
        if features is None:
            # If no features were extracted (silent audio)
            # Return previous emotion with decreased confidence
            decayed_probs = {
                emotion: prob * 0.8 for emotion, prob in self.current_probabilities.items()
            }
            # Find emotion with highest probability
            max_emotion = max(decayed_probs, key=decayed_probs.get)
            return max_emotion, decayed_probs
        
        try:
            # Scale features
            features = self.scaler.transform(features.reshape(1, -1))
            
            # Predict
            prediction = self.model.predict(features)[0]
            raw_probabilities = self.model.predict_proba(features)[0]
            
            # Create raw probabilities dictionary
            raw_probs = {emotion: 0.0 for emotion in self.emotions}
            for emotion, prob in zip(self.model.classes_, raw_probabilities):
                if emotion in raw_probs:
                    raw_probs[emotion] = prob
            
            # Apply sensitivity amplification to the probabilities
            # This increases the contrast between probabilities
            amplified_probs = {}
            for emotion, prob in raw_probs.items():
                # Non-linear amplification that preserves the 0-1 range
                amplified_probs[emotion] = 1 - (1 - prob) ** self.sensitivity
            
            # Apply smoothing between current and previous emotions
            smoothed_probs = {}
            for emotion in self.emotions:
                prev_prob = self.current_probabilities.get(emotion, 0.0)
                new_prob = amplified_probs.get(emotion, 0.0)
                # Apply smoothing factor (lower = less smoothing, more responsive)
                smoothed_probs[emotion] = (self.smoothing_factor * prev_prob) + ((1 - self.smoothing_factor) * new_prob)
            
            # Find emotion with highest probability
            max_emotion = max(smoothed_probs, key=smoothed_probs.get)
            
            return max_emotion, smoothed_probs
        except Exception as e:
            print(f"Prediction error: {str(e)}")
            return "neutral", {emotion: 0.0 for emotion in self.emotions}
    
    def audio_callback(self, in_data, frame_count, time_info, status):
        """
        Callback function for PyAudio
        
        Parameters:
        -----------
        in_data : bytes
            Input audio data
        frame_count : int
            Number of frames
        time_info : dict
            Timing information
        status : int
            Status flag
            
        Returns:
        --------
        tuple
            (in_data, pyaudio.paContinue)
        """
        audio_data = np.frombuffer(in_data, dtype=np.float32)
        self.audio_queue.put(audio_data)
        return (in_data, pyaudio.paContinue)
    
    def process_audio(self):
        """
        Process audio from the queue and predict emotions
        """
        buffer = np.array([])
        buffer_duration = 0  # in seconds
        
        while self.is_recording:
            if not self.audio_queue.empty():
                audio_chunk = self.audio_queue.get()
                
                # Add chunk to buffer
                buffer = np.append(buffer, audio_chunk)
                chunk_duration = len(audio_chunk) / self.rate
                buffer_duration += chunk_duration
                
                # If buffer is long enough, process it
                if buffer_duration >= self.record_seconds:
                    # Predict emotion
                    emotion, probabilities = self.predict_emotion(buffer)
                    
                    # Update current emotion and probabilities
                    self.previous_probabilities = self.current_probabilities.copy()
                    self.current_emotion = emotion
                    self.current_probabilities = probabilities
                    self.emotion_history.append((emotion, probabilities))
                    
                    print(f"Detected emotion: {emotion} - {probabilities[emotion]:.2f}")
                    
                    # Reset buffer (with 75% overlap for smoother transitions)
                    overlap_samples = int(self.rate * self.record_seconds * 0.75)
                    if len(buffer) > overlap_samples:
                        buffer = buffer[-overlap_samples:]
                        buffer_duration = overlap_samples / self.rate
                    else:
                        buffer = np.array([])
                        buffer_duration = 0
            
            time.sleep(0.01)  # Small delay to prevent CPU overuse
    
    def start_recording(self):
        """
        Start recording audio from microphone
        """
        if self.is_recording:
            print("Already recording")
            return
        
        self.is_recording = True
        
        # Start stream
        self.stream = self.audio.open(
            format=self.format,
            channels=self.channels,
            rate=self.rate,
            input=True,
            frames_per_buffer=self.chunk,
            stream_callback=self.audio_callback
        )
        
        # Start processing thread
        self.process_thread = threading.Thread(target=self.process_audio)
        self.process_thread.daemon = True
        self.process_thread.start()
        
        print("Recording started")
    
    def stop_recording(self):
        """
        Stop recording audio
        """
        if not self.is_recording:
            print("Not recording")
            return
        
        self.is_recording = False
        
        # Stop stream
        self.stream.stop_stream()
        self.stream.close()
        
        # Clear queue
        while not self.audio_queue.empty():
            self.audio_queue.get()
        
        print("Recording stopped")
    
    def close(self):
        """
        Clean up resources
        """
        if self.is_recording:
            self.stop_recording()
        
        self.audio.terminate()
        print("Resources released")


class EmotionRecognitionApp:
    def __init__(self, root, model_path=None, scaler_path=None):
        """
        GUI application for real-time emotion recognition
        
        Parameters:
        -----------
        root : tk.Tk
            Root window
        model_path : str, optional
            Path to a saved model file (default: None)
        scaler_path : str, optional
            Path to a saved scaler file (default: None)
        """
        self.root = root
        self.root.title("Real-time Speech Emotion Recognition")
        self.root.geometry("800x700")  # Increased height for controls
        
        # Initialize recognizer
        self.recognizer = RealtimeSpeechEmotionRecognizer(model_path, scaler_path)
        
        # Create GUI
        self.create_widgets()
        
        # Animation update interval (ms)
        self.update_interval = 50  # Faster updates for more responsiveness
        
        # Set up animation
        self.ani = animation.FuncAnimation(
            self.fig, self.update_plot, interval=self.update_interval)
        
        # When window is closed
        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
    
    def create_widgets(self):
        """
        Create GUI widgets
        """
        # Main frame
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.pack(fill=tk.BOTH, expand=True)
        
        # Control frame
        control_frame = ttk.Frame(main_frame, padding="5")
        control_frame.pack(fill=tk.X)
        
        # Start/Stop button
        self.recording_state = tk.BooleanVar(value=False)
        self.btn_record = ttk.Button(control_frame, text="Start Recording", command=self.toggle_recording)
        self.btn_record.pack(side=tk.LEFT, padx=5)
        
        # Status label
        self.lbl_status = ttk.Label(control_frame, text="Not recording")
        self.lbl_status.pack(side=tk.LEFT, padx=10)
        
        # Current emotion label
        self.lbl_emotion = ttk.Label(control_frame, text="Emotion: --", font=("Arial", 14))
        self.lbl_emotion.pack(side=tk.RIGHT, padx=10)
        
        # Sensitivity controls frame
        sensitivity_frame = ttk.LabelFrame(main_frame, text="Sensitivity Controls", padding="10")
        sensitivity_frame.pack(fill=tk.X, pady=5)
        
        # Sensitivity slider
        ttk.Label(sensitivity_frame, text="Sensitivity:").grid(row=0, column=0, sticky=tk.W, padx=5)
        self.sensitivity_slider = Scale(sensitivity_frame, from_=1.0, to=3.0, resolution=0.1, 
                                  orient=HORIZONTAL, length=300, command=self.update_sensitivity)
        self.sensitivity_slider.set(1.2)
        self.sensitivity_slider.grid(row=0, column=1, padx=5, pady=5)
        
        # Threshold slider
        ttk.Label(sensitivity_frame, text="Detection Threshold:").grid(row=1, column=0, sticky=tk.W, padx=5)
        self.threshold_slider = Scale(sensitivity_frame, from_=0.001, to=0.05, resolution=0.001, 
                                orient=HORIZONTAL, length=300, command=self.update_threshold)
        self.threshold_slider.set(0.01)
        self.threshold_slider.grid(row=1, column=1, padx=5, pady=5)
        
        # Smoothing slider
        ttk.Label(sensitivity_frame, text="Smoothing:").grid(row=2, column=0, sticky=tk.W, padx=5)
        self.smoothing_slider = Scale(sensitivity_frame, from_=0.0, to=0.9, resolution=0.05, 
                                orient=HORIZONTAL, length=300, command=self.update_smoothing)
        self.smoothing_slider.set(0.3)
        self.smoothing_slider.grid(row=2, column=1, padx=5, pady=5)
        
        # Matplotlib figure
        self.fig = Figure(figsize=(8, 6), dpi=100)
        
        # Create two subplots
        self.ax1 = self.fig.add_subplot(211)  # Audio waveform
        self.ax2 = self.fig.add_subplot(212)  # Emotion probabilities
        
        # Create canvas
        self.canvas = FigureCanvasTkAgg(self.fig, master=main_frame)
        self.canvas.draw()
        self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
        
        # Initialize plots
        self.waveform_line, = self.ax1.plot([], [], 'b-', linewidth=0.5)
        self.ax1.set_title('Audio Waveform')
        self.ax1.set_xlabel('Time (s)')
        self.ax1.set_ylabel('Amplitude')
        self.ax1.set_ylim(-1, 1)
        self.ax1.set_xlim(0, self.recognizer.record_seconds)
        
        self.ax2.set_title('Emotion Probabilities')
        self.ax2.set_xlabel('Emotion')
        self.ax2.set_ylabel('Probability')
        self.ax2.set_ylim(0, 1)
        
        self.bars = self.ax2.bar(self.recognizer.emotions, 
                                 [0] * len(self.recognizer.emotions),
                                 color=[self.recognizer.emotion_colors.get(e, 'blue') for e in self.recognizer.emotions])
        
        self.fig.tight_layout()
    
    def update_sensitivity(self, value):
        """
        Update sensitivity parameter
        
        Parameters:
        -----------
        value : str
            New sensitivity value
        """
        self.recognizer.set_sensitivity(float(value))
    
    def update_threshold(self, value):
        """
        Update threshold parameter
        
        Parameters:
        -----------
        value : str
            New threshold value
        """
        self.recognizer.set_threshold(float(value))
    
    def update_smoothing(self, value):
        """
        Update smoothing parameter
        
        Parameters:
        -----------
        value : str
            New smoothing value
        """
        self.recognizer.set_smoothing(float(value))
    
    def toggle_recording(self):
        """
        Toggle recording state
        """
        if self.recording_state.get():
            # Stop recording
            self.recognizer.stop_recording()
            self.recording_state.set(False)
            self.btn_record.config(text="Start Recording")
            self.lbl_status.config(text="Not recording")
        else:
            # Start recording
            self.recognizer.start_recording()
            self.recording_state.set(True)
            self.btn_record.config(text="Stop Recording")
            self.lbl_status.config(text="Recording...")
    
    def update_plot(self, frame):
        """
        Update plot with current audio and emotion data
        
        Parameters:
        -----------
        frame : int
            Animation frame number
        """
        # Get the latest data from the queue for waveform display
        audio_data = []
        queue_copy = list(self.recognizer.audio_queue.queue)
        
        if queue_copy:
            audio_data = np.concatenate(queue_copy)
            
            # Ensure we only show the latest data that fits our window
            max_samples = int(self.recognizer.rate * self.recognizer.record_seconds)
            if len(audio_data) > max_samples:
                audio_data = audio_data[-max_samples:]
            
            # Update time axis
            time_axis = np.linspace(0, len(audio_data) / self.recognizer.rate, len(audio_data))
            self.ax1.set_xlim(0, time_axis[-1] if len(time_axis) > 0 else self.recognizer.record_seconds)
            
            # Update waveform plot
            self.waveform_line.set_data(time_axis, audio_data)
        
        # Update emotion probabilities
        for i, bar in enumerate(self.bars):
            emotion = self.recognizer.emotions[i]
            probability = self.recognizer.current_probabilities.get(emotion, 0)
            bar.set_height(probability)
            bar.set_color(self.recognizer.emotion_colors.get(emotion, 'blue'))
        
        # Update current emotion label with confidence
        top_emotion = self.recognizer.current_emotion
        top_probability = self.recognizer.current_probabilities.get(top_emotion, 0)
        self.lbl_emotion.config(
            text=f"Emotion: {top_emotion.upper()} ({top_probability:.2f})",
            foreground=self.recognizer.emotion_colors.get(top_emotion, 'black')
        )
        
        # Return the artists that were modified
        return [self.waveform_line] + list(self.bars) + [self.lbl_emotion]
    
    def on_closing(self):
        """
        Handle window closing event
        """
        if self.recording_state.get():
            self.recognizer.stop_recording()
        
        self.recognizer.close()
        self.root.destroy()


def download_pretrained_model():
    """
    Function to download or create a pre-trained model
    
    This is a placeholder - in a real implementation, you would:
    1. Check if a model exists locally
    2. If not, download from a repository or create a dummy model
    
    Returns:
    --------
    tuple
        (model_path, scaler_path)
    """
    # For demonstration, we'll just create paths to where models would be stored
    model_dir = os.path.join(os.path.expanduser("~"), ".ser_models")
    os.makedirs(model_dir, exist_ok=True)
    
    model_path = os.path.join(model_dir, "ser_model.pkl")
    scaler_path = os.path.join(model_dir, "ser_scaler.pkl")
    
    if not os.path.exists(model_path):
        print("Pre-trained model not found. In a real application, you would download it here.")
        print("For this demo, you'll need to train your own model or provide a pre-trained one.")
    
    return model_path, scaler_path


def main():
    """
    Main function to run the application
    """
    # Get model paths
    model_path, scaler_path = download_pretrained_model()
    
    # Check if model exists
    if not os.path.exists(model_path):
        print("No model found. You'll need to provide a pre-trained model.")
        print("For demonstration purposes, a dummy model would be created here.")
        # In a real app, you would train a model or provide instructions to download one
    
    # Create GUI
    root = tk.Tk()
    app = EmotionRecognitionApp(root, model_path, scaler_path)
    root.mainloop()


if __name__ == "__main__":
    main()

Pre-trained model not found. In a real application, you would download it here.
For this demo, you'll need to train your own model or provide a pre-trained one.
No model found. You'll need to provide a pre-trained model.
For demonstration purposes, a dummy model would be created here.
New model created. You need to train it before use.


  self.ani = animation.FuncAnimation(


Smoothing set to: 0.3
Threshold set to: 0.01
Sensitivity set to: 1.2
Recording started
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Detected emotion: angry - 0.00
Prediction error: This StandardScaler instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.
Detected emotion: neutral - 0.00
Prediction error: This StandardScaler instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.
Detected emotion: neutral - 0.00
Prediction error: This StandardScaler instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.
Detected emotion: neutral - 0.00
Prediction error: This StandardScaler instance is not fitted yet. Call 'fi