In [None]:
import cv2
import numpy as np
import math
import time
import speech_recognition as sr
from gtts import gTTS
import os
import sounddevice as sd
import soundfile as sf
import threading
import subprocess
import asyncio
import tempfile
import nest_asyncio
from concurrent.futures import ThreadPoolExecutor
import uuid
import pygame
from cvzone.HandTrackingModule import HandDetector
from cvzone.ClassificationModule import Classifier
from keras.models import load_model
from tensorflow.keras.preprocessing.image import img_to_array
import traceback



# Initialize components
recognizer = sr.Recognizer()
in_cmd_mode = False
assistant_active = False
wake_words = ["luna", "danna"]
response_cache = {}
temp_dir = tempfile.mkdtemp()
executor = ThreadPoolExecutor()

# Initialize hand detection
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
hand_detector = HandDetector(maxHands=1)
hand_classifier = Classifier(r"C:\Users\ihebm\Desktop\CleanEmotion\models\mod1\signG4.h5", r"C:\Users\ihebm\Desktop\CleanEmotion\models\mod1\labels.txt")
hand_labels = ["Hello", "Bye", "Okay", "Thanks"]
offset = 20
imgSize = 300

# Initialize emotion detection
face_classifier = cv2.CascadeClassifier(r'haarcascade_frontalface_default.xml')
emotion_classifier = load_model(r'final_emotion_modelG1.h5')
emotion_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad', 'Surprise']

emotion_start_time = time.time()
hand_sign_start_time = time.time()
last_dominant_emotion = None
last_hand_sign = None

# Initialize pygame for audio
pygame.mixer.init()

async def run_in_executor(func, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(executor, func, *args)

async def listen():
    global in_cmd_mode, assistant_active
    while True:
        if not in_cmd_mode:
            with sr.Microphone() as source:
                print("Listening for wake word..." if not assistant_active else "Listening for commands...")
                audio = await run_in_executor(recognizer.listen, source)
                try:
                    command = await run_in_executor(recognizer.recognize_google, audio)
                    command = command.lower()
                    print(f"Command received: {command}")
                    if not assistant_active and any(wake_word in command for wake_word in wake_words):
                        assistant_active = True
                        await greet()
                    elif assistant_active:
                        if command in ["command", "cmd"]:
                            in_cmd_mode = True
                            print("Switched to command mode")
                        else:
                            await process_voice_command(command)
                except sr.UnknownValueError:
                    print("Sorry, I did not understand that.")
                except sr.RequestError:
                    print("Sorry, there was an error with the request.")
        await asyncio.sleep(0.1)

async def greet():
    greeting_text = "Greeting, Eheb. How can I assist you, my friend?"
    print(greeting_text)
    await speak(greeting_text)


async def generate_response(prompt):
    if prompt in response_cache:
        return response_cache[prompt]
    
    try:
        print(f"Attempting to run Ollama with prompt: {prompt}")
        process = await asyncio.create_subprocess_exec(
            'ollama', 'run', 'mistral', prompt,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await process.communicate()
        
        print(f"Ollama process return code: {process.returncode}")
        print(f"Stdout: {stdout.decode()}")
        print(f"Stderr: {stderr.decode()}")
        
        if process.returncode != 0:
            error_msg = stderr.decode() or "Unknown error occurred"
            print(f"Ollama error: {error_msg}")
            return f"I encountered an error while processing your request: {error_msg}"
        
        response = stdout.decode().strip()
        response_cache[prompt] = response
        return response
    except Exception as e:
        print(f"Exception in generate_response: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        return f"I'm sorry, but I encountered an error: {str(e)}"


async def speak(text):
    print("Assistant:", text)
    filename = f"response_{uuid.uuid4()}.mp3"
    tts = gTTS(text=text, lang='en', tld='co.uk')
    mp3_path = os.path.join(temp_dir, filename)
    wav_path = os.path.join(temp_dir, f"{filename}.wav")
    await run_in_executor(tts.save, mp3_path)
    
    ffmpeg_command = [
        "ffmpeg",
        "-loglevel", "error",
        "-i", mp3_path,
        "-acodec", "pcm_s16le",
        "-ar", "44100",
        "-ac", "1",
        wav_path,
        "-y"
    ]
    
    try:
        await run_in_executor(lambda: subprocess.run(ffmpeg_command, check=True, capture_output=True))
        data, samplerate = await run_in_executor(sf.read, wav_path)
        await run_in_executor(sd.play, data, samplerate)
        await run_in_executor(sd.wait)
    except subprocess.CalledProcessError as e:
        print(f"Error during audio conversion: {e.stderr}")
    finally:
        os.remove(mp3_path)
        os.remove(wav_path)

async def process_voice_command(command):
    response_text = await generate_response(command)
    if response_text:
        print(f"Processing voice command: {command}")
        await speak(response_text)
    else:
        print("No response text generated.")

async def handle_command_mode():
    global in_cmd_mode, assistant_active
    while True:
        if in_cmd_mode:
            cmd_input = await run_in_executor(input, "Command mode active. Type 'ex' to switch back to voice mode: ")
            if cmd_input.lower() == "ex":
                in_cmd_mode = False
                assistant_active = True
                print("Switched back to listening mode")
            else:
                await process_voice_command(cmd_input)
        await asyncio.sleep(0.1)

def draw_progress_bar(img, emotion, value, x, y, width=200, height=20):
    cv2.rectangle(img, (x, y), (x + width, y + height), (255, 255, 255), 1)
    filled_width = int(width * value)
    cv2.rectangle(img, (x, y), (x + filled_width, y + height), (0, 255, 0), -1)
    
    text = f"{emotion}: {value:.2f}"
    (text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
    text_x = x + (width - text_width) // 2
    text_y = y + height - (height - text_height) // 2
    
    cv2.putText(img, text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

async def respond_to_emotion(emotion):
    responses = {
        'Happy': "Good to see you happy!",
        'Sad': "Feeling down? Here's a joke for you: Why don't scientists trust atoms? Because they make up everything!",
        'Angry': "Take a deep breath. Let's stay calm.",
        'Surprise': "Wow! That's surprising!",
        'Fear': "Don't worry, everything will be alright.",
        'Disgust': "That doesn't look pleasant. Let's focus on something nice.",
    }
    response = responses.get(emotion, "How can I assist you today?")
    await speak(response)

async def respond_to_hand_sign(sign):
    responses = {
        "Hello": "Hello, Ehab! How are you?",
        "Bye": "Goodbye, Ehab! Have a great day!",
        "Okay": "Glad to see everything is okay!",
        "Thanks": "You're welcome, Ehab!",
    }
    response = responses.get(sign, "I see your hand sign.")
    await speak(response)

async def process_frame(frame):
    global last_dominant_emotion, last_hand_sign, emotion_start_time, hand_sign_start_time

    imgOutput = frame.copy()
    
    # Hand sign detection
    hands, _ = hand_detector.findHands(frame)
    if hands:
        hand = hands[0]
        x, y, w, h = hand['bbox']
        imgWhite = np.ones((imgSize, imgSize, 3), np.uint8) * 255
        imgCrop = frame[y-offset:y + h + offset, x-offset:x + w + offset]
        if imgCrop.size > 0:
            aspectRatio = h / w
            if aspectRatio > 1:
                k = imgSize / h
                wCal = math.ceil(k * w)
                imgResize = cv2.resize(imgCrop, (wCal, imgSize))
                wGap = math.ceil((imgSize - wCal) / 2)
                imgWhite[:, wGap:wCal + wGap] = imgResize
            else:
                k = imgSize / w
                hCal = math.ceil(k * h)
                imgResize = cv2.resize(imgCrop, (imgSize, hCal))
                hGap = math.ceil((imgSize - hCal) / 2)
                imgWhite[hGap:hCal + hGap, :] = imgResize
            
            prediction, index = hand_classifier.getPrediction(imgWhite, draw=False)
            cv2.rectangle(imgOutput, (x-offset, y-offset-70), (x-offset+400, y-offset+60-50), (0, 255, 0), cv2.FILLED)
            cv2.putText(imgOutput, hand_labels[index], (x, y-30), cv2.FONT_HERSHEY_COMPLEX, 2, (0, 0, 0), 2)
            cv2.rectangle(imgOutput, (x-offset, y-offset), (x + w + offset, y + h + offset), (0, 255, 0), 4)
            
            if hand_labels[index] != last_hand_sign:
                hand_sign_start_time = time.time()
                last_hand_sign = hand_labels[index]
            elif time.time() - hand_sign_start_time > 5:
                asyncio.create_task(respond_to_hand_sign(hand_labels[index]))
                hand_sign_start_time = time.time()

    # Emotion detection
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_classifier.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    for (x,y,w,h) in faces:
        cv2.rectangle(imgOutput, (x,y), (x+w,y+h), (0,255,255), 2)
        roi_gray = gray[y:y+h, x:x+w]
        roi_gray = cv2.resize(roi_gray, (48,48), interpolation=cv2.INTER_AREA)
        
        if np.sum([roi_gray]) != 0:
            roi = roi_gray.astype('float') / 255.0
            roi = img_to_array(roi)
            roi = np.expand_dims(roi, axis=0)
            prediction = emotion_classifier.predict(roi)[0]
            label = emotion_labels[prediction.argmax()]
            label_position = (x, y)
            cv2.putText(imgOutput, label, label_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
            
            for i, (emotion, value) in enumerate(zip(emotion_labels, prediction)):
                draw_progress_bar(imgOutput, emotion, value, 10, 50 + i * 25)
            
            dominant_emotion = emotion_labels[prediction.argmax()]
            if dominant_emotion != last_dominant_emotion:
                emotion_start_time = time.time()
                last_dominant_emotion = dominant_emotion
            elif time.time() - emotion_start_time > 10:
                asyncio.create_task(respond_to_emotion(dominant_emotion))
                emotion_start_time = time.time()
            
            circle_x = 30
            circle_y = imgOutput.shape[0] - 30
            circle_color = (255, 255, 255)
            circle_radius = 10
            emotion_duration = time.time() - emotion_start_time
            if emotion_duration > 5:
                circle_radius = 15
                if dominant_emotion == 'Happy':
                    circle_color = (0, 255, 0)
                elif dominant_emotion == 'Sad':
                    circle_color = (0, 165, 255)
                elif dominant_emotion in ['Fear', 'Angry']:
                    circle_color = (0, 0, 255)
                elif dominant_emotion in ['Surprise', 'Disgust']:
                    circle_color = (0, 255, 255)
            if emotion_duration > 60 and dominant_emotion in ['Sad', 'Neutral']:
                circle_color = (255, 0, 0)
                cv2.putText(imgOutput, "Autism risk", (circle_x - 30, circle_y + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            cv2.circle(imgOutput, (circle_x, circle_y), circle_radius, circle_color, -1)
        else:
            cv2.putText(imgOutput, 'No Faces', (30,80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)

    return imgOutput

async def main_loop():
    while True:
        ret, frame = await run_in_executor(cap.read)
        if not ret:
            print("Failed to capture image")
            break

        output_frame = await process_frame(frame)
        await run_in_executor(cv2.imshow, 'Hand Sign and Emotion Detector', output_frame)

        if await run_in_executor(lambda: cv2.waitKey(1) & 0xFF == ord('q')):
            break
        
        await asyncio.sleep(0.01)

async def main():
    listen_task = asyncio.create_task(listen())
    command_mode_task = asyncio.create_task(handle_command_mode())
    main_loop_task = asyncio.create_task(main_loop())
    
    try:
        await asyncio.gather(listen_task, command_mode_task, main_loop_task)
    finally:
        cap.release()
        cv2.destroyAllWindows()
        
        for file in os.listdir(temp_dir):
            try:
                os.remove(os.path.join(temp_dir, file))
            except Exception as e:
                print(f"Error removing file {file}: {e}")
        os.rmdir(temp_dir)
        
        executor.shutdown()

if __name__ == "__main__":
    nest_asyncio.apply()
    asyncio.run(main())


pygame 2.6.0 (SDL 2.28.4, Python 3.8.19)
Hello from the pygame community. https://www.pygame.org/contribute.html
Listening for wake word...
Command received: luna
Greeting, Eheb. How can I assist you, my friend?
Assistant: Greeting, Eheb. How can I assist you, my friend?
Listening for commands...
Assistant: How can I assist you today?
Command received: how can i assist you today
Attempting to run Ollama with prompt: how can i assist you today
Exception in generate_response: 
Traceback: Traceback (most recent call last):
  File "C:\Users\ihebm\AppData\Local\Temp\ipykernel_17200\3602961186.py", line 100, in generate_response
    process = await asyncio.create_subprocess_exec(
  File "C:\Users\ihebm\anaconda3\envs\emotionG7\lib\asyncio\subprocess.py", line 236, in create_subprocess_exec
    transport, protocol = await loop.subprocess_exec(
  File "C:\Users\ihebm\anaconda3\envs\emotionG7\lib\asyncio\base_events.py", line 1630, in subprocess_exec
    transport = await self._make_subprocess_