In [None]:
import cv2
import numpy as np
import os
import librosa
import pyaudio
import wave
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import load_model
import tkinter as tk
from PIL import Image, ImageTk
import warnings
import threading
import queue

warnings.filterwarnings("ignore", category=DeprecationWarning)

MFCC_LENGTH = 20
MAX_PAD_LEN = 100
AUDIO_DURATION = 2
RATE = 44100
CHUNK = 1024

IMAGE_SIZE = (48, 48)
NUM_CLASSES = 7
NUM_CHANNELS = 1

speech_model = load_model('speech_emotion_model.h5')
facial_expression_model = load_model('facial_emotion_model.h5')
#for speech
emotion_labels = {0: 'angry', 1: 'disgust', 2: 'fear', 3: 'happy', 4: 'neutral', 5: 'sad', 6: 'surprise'}
#for facial
expression_labels = {0: 'Angry', 1: 'Disgust', 2: 'Fear', 3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

audio_queue = queue.Queue()
stop_recording = False

def record_audio():
    global stop_recording
    try:
        while not stop_recording:
            p = pyaudio.PyAudio()
            stream = p.open(format=pyaudio.paInt16, channels=1, rate=RATE, input=True, frames_per_buffer=CHUNK)
            frames = []
            print("Recording...")
            for _ in range(0, int(RATE / CHUNK * AUDIO_DURATION)):
                if stop_recording:
                    break
                data = stream.read(CHUNK)
                frames.append(data)
            print("Finished recording.")
            stream.stop_stream()
            stream.close()
            p.terminate()

            if not stop_recording:
                wf = wave.open('temp_audio.wav', 'wb')
                wf.setnchannels(1)
                wf.setsampwidth(p.get_sample_size(pyaudio.paInt16))
                wf.setframerate(RATE)
                wf.writeframes(b''.join(frames))
                wf.close()

                audio_queue.put('temp_audio.wav')
    except Exception as e:
        print("Error in record_audio:", e)
        audio_queue.put('error')

def predict_facial_expression(image):
    try:
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        resized_image = cv2.resize(gray_image, IMAGE_SIZE)
        resized_image = np.expand_dims(resized_image, axis=-1)
        resized_image = np.expand_dims(resized_image, axis=0)
        resized_image = resized_image.astype('float32') / 255.0
        
        prediction = facial_expression_model.predict(resized_image)
        max_index = np.argmax(prediction)
        if max_index in expression_labels:
            expression_label = expression_labels[max_index]
        else:
            expression_label = "Unknown"
        return expression_label
    except Exception as e:
        print("Error in predict_facial_expression:", e)
        return "Error"

def combine_emotions(facial_emotion, speech_emotion):
    # a mapping between individual emotions to combined emotions
    combined_emotions_mapping = {
        ('angry', 'angry'): 'angry',
        ('angry', 'disgust'): 'angry',
        ('angry', 'fear'): 'angry',
        ('angry', 'happy'): 'angry',
        ('angry', 'neutral'): 'angry',
        ('angry', 'sad'): 'angry',
        ('angry', 'surprise'): 'angry',
        ('disgust', 'angry'): 'angry',
        ('disgust', 'disgust'): 'disgust',
        ('disgust', 'fear'): 'disgust',
        ('disgust', 'happy'): 'disgust',
        ('disgust', 'neutral'): 'disgust',
        ('disgust', 'sad'): 'disgust',
        ('disgust', 'surprise'): 'disgust',
        ('fear', 'angry'): 'angry',
        ('fear', 'disgust'): 'disgust',
        ('fear', 'fear'): 'fear',
        ('fear', 'happy'): 'fear',
        ('fear', 'neutral'): 'fear',
        ('fear', 'sad'): 'fear',
        ('fear', 'surprise'): 'fear',
        ('happy', 'angry'): 'angry',
        ('happy', 'disgust'): 'disgust',
        ('happy', 'fear'): 'fear',
        ('happy', 'happy'): 'happy',
        ('happy', 'neutral'): 'happy',
        ('happy', 'sad'): 'sad',
        ('happy', 'surprise'): 'surprise',
        ('neutral', 'angry'): 'angry',
        ('neutral', 'disgust'): 'disgust',
        ('neutral', 'fear'): 'fear',
        ('neutral', 'happy'): 'happy',
        ('neutral', 'neutral'): 'neutral',
        ('neutral', 'sad'): 'sad',
        ('neutral', 'surprise'): 'surprise',
        ('sad', 'angry'): 'angry',
        ('sad', 'disgust'): 'disgust',
        ('sad', 'fear'): 'fear',
        ('sad', 'happy'): 'sad',
        ('sad', 'neutral'): 'sad',
        ('sad', 'sad'): 'sad',
        ('sad', 'surprise'): 'sad',
        ('surprise', 'angry'): 'angry',
        ('surprise', 'disgust'): 'disgust',
        ('surprise', 'fear'): 'fear',
        ('surprise', 'happy'): 'surprise',
        ('surprise', 'neutral'): 'surprise',
        ('surprise', 'sad'): 'sad',
        ('surprise', 'surprise'): 'surprise',
    }
    
    # Combine the facial and speech emotions
    combined_emotion = combined_emotions_mapping.get((facial_emotion, speech_emotion), 'unknown')
    return combined_emotion

def update_gui():
    cap = cv2.VideoCapture(0)
    
    root = tk.Tk()
    root.title("Emotion Recognition")
    
    frame_label = tk.Label(root, text="Facial Expression: ")
    frame_label.pack()
    frame_image_label = tk.Label(root)
    frame_image_label.pack()

    speech_label = tk.Label(root, text="Speech Emotion: ")
    speech_label.pack()
    
    expression_label = tk.Label(root, text="")
    expression_label.pack()
    
    speech_emotion_label = tk.Label(root, text="")
    speech_emotion_label.pack()

    combined_emotion_label = tk.Label(root, text="")
    combined_emotion_label.pack()

    def quit_application():
        global stop_recording
        stop_recording = True

        # Wait for the recording thread to stop
        audio_thread.join()

        cap.release()
        cv2.destroyAllWindows()
        root.quit()
        root.destroy()

    quit_button = tk.Button(root, text="Quit", command=quit_application)
    quit_button.pack()
    
    def on_closing():
        global stop_recording
        stop_recording = True

        # Wait for the recording thread to stop
        audio_thread.join()
        cap.release()
        root.destroy()

    root.protocol("WM_DELETE_WINDOW", on_closing)


    def update():
        ret, frame = cap.read()
        if not ret:
            root.after(10, update)
            return
        
        facial_expression = predict_facial_expression(frame)
        expression_label.config(text="Facial Expression: {}".format(facial_expression))

        if not audio_queue.empty():
            audio_file = audio_queue.get()
            if audio_file == 'error':
                speech_emotion_label.config(text="Speech Emotion: Error")
            else:
                signal, sr = librosa.load(audio_file, duration=AUDIO_DURATION, sr=None)
                mfcc = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=MFCC_LENGTH)
                if mfcc.shape[1] < MAX_PAD_LEN:
                    mfcc_padded = np.pad(mfcc, ((0, 0), (0, MAX_PAD_LEN - mfcc.shape[1])), mode='constant')
                else:
                    mfcc_padded = mfcc[:, :MAX_PAD_LEN]
                mfcc_input = np.expand_dims(mfcc_padded, axis=0)
                prediction = speech_model.predict(mfcc_input)
                max_index = np.argmax(prediction)
                if max_index in emotion_labels:
                    emotion_label = emotion_labels[max_index]
                else:
                    emotion_label = "Unknown"
                speech_emotion_label.config(text="Speech Emotion: {}".format(emotion_label))

                combined_emotion = combine_emotions(facial_expression.lower(), emotion_label.lower())
                combined_emotion_label.config(text="Combined Emotion: {}".format(combined_emotion))

        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = Image.fromarray(frame)
        imgtk = ImageTk.PhotoImage(image=frame)
        frame_image_label.imgtk = imgtk
        frame_image_label.config(image=imgtk)
        
        root.after(10, update)  # Update every 10 milliseconds

    root.after(10, update)  # Start the update loop
    root.mainloop()



def main():
    global audio_thread
    audio_thread = threading.Thread(target=record_audio)
    audio_thread.start()

    gui_thread = threading.Thread(target=update_gui)
    gui_thread.start()

    # Join the audio thread to ensure it finishes before exiting
    audio_thread.join()

if __name__ == "__main__":
    main()


Recording...
Finished recording.
Recording...
