In [72]:
import os

import numpy as np

import librosa

import librosa.display

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

from sklearn.metrics import classification_report

from sklearn.preprocessing import StandardScaler

from sklearn.utils.class_weight import compute_class_weight

import tensorflow as tf

from tensorflow.keras.utils import to_categorical

from tensorflow.keras.models import Model

from tensorflow.keras.layers import Input, Dense, RNN, Layer

from tensorflow.keras.optimizers import Adam

from tensorflow.keras.regularizers import l2

# tf.keras.backend.clear_session()

In [73]:

from sklearn.preprocessing import StandardScaler

from sklearn.utils.class_weight import compute_class_weight

import tensorflow as tf

from tensorflow.keras.utils import to_categorical

from tensorflow.keras.models import Model

from tensorflow.keras.layers import Input, Dense, RNN, Layer

from tensorflow.keras.optimizers import Adam

from tensorflow.keras.regularizers import l2

# tf.keras.backend.clear_session()

In [74]:
def clipped_relu(x):

    return tf.keras.activations.relu(x, max_value=20.0)



def clipped_sigmoid(x):

    return tf.keras.activations.sigmoid(x) * 10.0

In [75]:
import tensorflow.keras as keras
@keras.utils.register_keras_serializable(package="Custom", name="LTCCell")
class LTCCell(Layer):

    def __init__(self, units, ode_unfolds=6, l2_reg=0.001, **kwargs):

        super(LTCCell, self).__init__(**kwargs)

        self.units = units

        self.ode_unfolds = ode_unfolds

        self.state_size = units

        self.l2_reg = l2_reg  # L2 regularization parameter



    def build(self, input_shape):

        self.input_dim = input_shape[-1]



        # Trainable parameters with L2 regularization

        self.W = self.add_weight(

            shape=(self.input_dim + self.units, self.units),

            initializer='glorot_uniform',

            regularizer=l2(self.l2_reg),

            name='W'

        )

        self.bias = self.add_weight(

            shape=(self.units,),

            initializer='zeros',

            name='bias'

        )

        self.tau = self.add_weight(

            shape=(self.units,),

            initializer='ones',

            name='tau'

        )



        # Additional parameters for the LTC model

        self.C = self.add_weight(

            shape=(self.units,),

            initializer='ones',

            name='C'

        )

        self.G = self.add_weight(

            shape=(self.units,),

            initializer='ones',

            name='G'

        )



        super(LTCCell, self).build(input_shape)



    def call(self, inputs, states):

        prev_state = states[0]

        concatenated = tf.concat([inputs, prev_state], axis=1)

        dt = 0.01  # Time step



        for _ in range(self.ode_unfolds):

            dh = (-prev_state + tf.nn.tanh(tf.matmul(concatenated, self.W) + self.bias)) / tf.nn.softplus(self.tau)

            prev_state += dt * dh



        # Apply custom activation functions

        prev_state = clipped_relu(prev_state)

        prev_state = prev_state * clipped_sigmoid(self.C)

        prev_state = prev_state / clipped_sigmoid(self.G)



        return prev_state, [prev_state]
    
    def get_config(self):
        
        config = super(LTCCell, self).get_config()
        
        config.update({"units": self.units})
        
        return config

In [77]:
import pyaudio
import numpy as np
import librosa
import tensorflow as tf 
import tensorflow.keras as keras

# Load your model
model = keras.models.load_model('emotion_model_63.h5', custom_objects={"LTCCell": LTCCell})

# Preprocessing function 
def extract_features_from_audio(audio, sample_rate):
    mfcc = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=13)
    mfcc_delta = librosa.feature.delta(mfcc)
    mfcc_delta2 = librosa.feature.delta(mfcc, order=2)
    features = np.concatenate((mfcc, mfcc_delta, mfcc_delta2), axis=0)
    features = features.T  # Shape: (time_steps, features)

    # Expected shape for the model
    expected_time_steps = 911
    expected_features = 39

    # Check if the number of time steps matches the expected shape
    if features.shape[0] < expected_time_steps:
        # Pad with zeros
        pad_width = ((0, expected_time_steps - features.shape[0]), (0, 0))  # Correct format
        features = np.pad(features, pad_width, mode='constant')
    else:
        # Truncate
        features = features[:expected_time_steps, :]

    return features

# Audio stream setup
def start_audio_stream():
    p = pyaudio.PyAudio()
    stream = p.open(format=pyaudio.paFloat32,
                    channels=1,
                    rate=16000, 
                    input=True,
                    frames_per_buffer=1024)
    return p, stream

# Real-time emotion detection
def real_time_emotion_detection():
    # Start audio stream
    p, stream = start_audio_stream()
    print("Listening for real-time audio...")

    # Buffer to store audio chunks
    audio_buffer = []
    buffer_size = 3 * 16000  # 3 seconds of audio at 16kHz

    # Sliding window to store the last 3 sessions (9 seconds total)
    sliding_window = []
    max_sessions = 2

    try:
        while True:
            # Capture audio
            audio_data = stream.read(1024, exception_on_overflow=False)
            audio_data = np.frombuffer(audio_data, dtype=np.float32)

            # Append to buffer
            audio_buffer.extend(audio_data)

            # If buffer has enough data for 5 seconds
            if len(audio_buffer) >= buffer_size:
                # Extract the 5-second audio segment
                audio_segment = np.array(audio_buffer[:buffer_size])

                # Add the current segment to the sliding window
                sliding_window.append(audio_segment)

                # Ensure the sliding window contains only the last 3 sessions
                if len(sliding_window) > max_sessions:
                    sliding_window.pop(0)  # Remove the oldest session

                # If we have 3 sessions (15 seconds of audio), process them
                if len(sliding_window) == max_sessions:
                    # Concatenate the last 3 sessions (15 seconds of audio)
                    combined_audio = np.concatenate(sliding_window)

                    # Extract features from the combined audio
                    features = extract_features_from_audio(combined_audio, sample_rate=16000)

                    # Reshape features to match the model's input shape
                    features = np.expand_dims(features, axis=0)  # Add batch dimension

                    # Predict emotion
                    prediction = model.predict(features, verbose=0)
                    emotion_index = np.argmax(prediction, axis=1)[0]

                    # Map index to emotion label (customize this based on your model's labels)
                    emotion_labels = ["angry", "happy", "sad", "neutral"]  # Replace with your labels
                    detected_emotion = emotion_labels[emotion_index]

                    # Display result
                    print(f"Detected Emotion: {detected_emotion}")

                # Clear buffer for the next segment
                audio_buffer = audio_buffer[buffer_size:]

    except KeyboardInterrupt:
        print("Stopped listening.")
    finally:
        # Clean up
        stream.stop_stream()
        stream.close()
        p.terminate()

# Main function
if __name__ == "__main__":
    real_time_emotion_detection()